Apache FlinkCEP 实现超时状态监控的步骤详解

这篇文章主要介绍了Apache FlinkCEP 实现超时状态监控的步骤,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下

 

CEP - Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

 public static  SingleOutputStreamOperator  createPatternStream(...){...} public static  SingleOutputStreamOperator  createTimeoutPatternStream(...){...} final SingleOutputStreamOperator  patternStream;

SingleOutputStreamOperator

 @Public public class SingleOutputStreamOperator  extends DataStream  {...}

PatternStream的构造方法:

 PatternStream ( final DataStream  inputStream, final Pattern  pattern) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = null ; } PatternStream ( final DataStream  inputStream, final Pattern  pattern, final EventComparator  comparator) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = comparator; }

Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

 public class Pattern  { /** 模式名称 */ private final String name; /** 前面一个模式 */ private final Pattern  previous; /** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */ private IterativeCondition  condition; /** 时间窗口长度,在时间长度内进行模式匹配 */ private Time windowTime; /** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */ private Quantifier quantifier = Quantifier .one( ConsumingStrategy .STRICT); /** 停止将事件收集到循环状态时,事件必须满足的条件 */ private IterativeCondition  untilCondition; /** * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数 */ private Times times; // 匹配到事件之后的跳过策略 private final AfterMatchSkipStrategy afterMatchSkipStrategy; ... }

Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

 public class Quantifier { ... /** * 5个属性,可以组合,但并非所有的组合都是有效的 */ public enum QuantifierProperty { SINGLE, LOOPING, TIMES, OPTIONAL, GREEDY } /** * 描述在此模式中匹配哪些事件的策略 */ public enum ConsumingStrategy { STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT } /** * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到 */ public static class Times { private final int from; private final int to; private Times ( int from, int to) { Preconditions .checkArgument(from > 0 , "The from should be a positive number greater than 0." ); Preconditions .checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "." ); this .from = from; this .to = to; } public int getFrom() { return from; } public int getTo() { return to; } // 次数范围 public static Times of( int from, int to) { return new Times (from, to); } // 指定具体次数 public static Times of( int times) { return new Times (times, times); } @Override public boolean equals( Object o) { if ( this == o) { return true ; } if (o == null || getClass() != o.getClass()) { return false ; } Times times = ( Times ) o; return from == times.from && to == times.to; } @Override public int hashCode() { return Objects .hash(from, to); } } ... }

EventComparator,自定义事件比较器,实现EventComparator接口。

 public interface EventComparator  extends Comparator , Serializable { long serialVersionUID = 1L ; }

NFACompiler和NFA

NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

 public class NFACompiler { ... /** * NFAFactory 创建NFA的接口 * * @param  Type of the input events which are processed by the NFA */ public interface NFAFactory  extends Serializable { NFA createNFA(); } /** * NFAFactory的具体实现NFAFactoryImpl * * 

The implementation takes the input type serializer, the window time and the set of * states and their transitions to be able to create an NFA from them. * * @param Type of the input events which are processed by the NFA */ private static class NFAFactoryImpl implements NFAFactory { private static final long serialVersionUID = 8939783698296714379L ; private final long windowTime; private final Collection > states; private final boolean timeoutHandling; private NFAFactoryImpl ( long windowTime, Collection > states, boolean timeoutHandling) { this .windowTime = windowTime; this .states = states; this .timeoutHandling = timeoutHandling; } @Override public NFA createNFA() { // 一个NFA由状态集合、时间窗口的长度和是否处理超时组成 return new NFA<>(states, windowTime, timeoutHandling); } } }

NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。

更多内容参见

https://zh.wikipedia.org/wiki/非确定有限状态自动机

 public class NFA { /** * NFACompiler返回的所有有效的NFA状态集合 * These are directly derived from the user-specified pattern. */ private final Map > states; /** * Pattern.within(Time)指定的时间窗口长度 */ private final long windowTime; /** * 一个超时匹配的标记 */ private final boolean handleTimeout; ... }

 

PatternSelectFunction和PatternFlatSelectFunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

 public interface PatternSelectFunction  extends Function , Serializable { /** * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识 */ OUT select( Map > pattern) throws Exception ; }

 

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

 public interface PatternFlatSelectFunction  extends Function , Serializable { /** * 生成一个或多个结果 */ void flatSelect( Map > pattern, Collector  out) throws Exception ; }

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

 public class SelectTimeoutCepOperator  extends AbstractKeyedCEPPatternOperator > { private OutputTag  timedOutOutputTag; public SelectTimeoutCepOperator ( TypeSerializer  inputSerializer, boolean isProcessingTime, NFACompiler . NFAFactory  nfaFactory, final EventComparator  comparator, AfterMatchSkipStrategy skipStrategy, // 参数命名混淆了flat...包括SelectWrapper类中的成员命名... PatternSelectFunction  flatSelectFunction, PatternTimeoutFunction  flatTimeoutFunction, OutputTag  outputTag, OutputTag  lateDataOutputTag) { super ( inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, new SelectWrapper <>(flatSelectFunction, flatTimeoutFunction), lateDataOutputTag); this .timedOutOutputTag = outputTag; } ... } public interface PatternTimeoutFunction  extends Function , Serializable { OUT timeout( Map > pattern, long timeoutTimestamp) throws Exception ; } public interface PatternFlatTimeoutFunction  extends Function , Serializable { void timeout( Map > pattern, long timeoutTimestamp, Collector  out) throws Exception ; }

 

CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

 public class CEP { public static  PatternStream  pattern( DataStream  input, Pattern  pattern) { return new PatternStream <>(input, pattern); } public static  PatternStream  pattern( DataStream  input, Pattern  pattern, EventComparator  comparator) { return new PatternStream <>(input, pattern, comparator); } }

 

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

 public class CEPOperatorUtils { ... private static  SingleOutputStreamOperator  createPatternStream( final DataStream  inputStream, final Pattern  pattern, final TypeInformation  outTypeInfo, final boolean timeoutHandling, final EventComparator  comparator, final OperatorBuilder  operatorBuilder) { final TypeSerializer  inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // check whether we use processing time final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic . ProcessingTime ; // compile our pattern into a NFAFactory to instantiate NFAs later on final NFACompiler . NFAFactory  nfaFactory = NFACompiler .compileFactory(pattern, timeoutHandling); final SingleOutputStreamOperator  patternStream; if (inputStream instanceof KeyedStream ) { KeyedStream  keyedStream = ( KeyedStream ) inputStream; patternStream = keyedStream.transform( operatorBuilder.getKeyedOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy())); } else { KeySelector  keySelector = new NullByteKeySelector <>(); patternStream = inputStream.keyBy(keySelector).transform( operatorBuilder.getOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy() )).forceNonParallel(); } return patternStream; } ... }

FlinkCEP实现步骤

  1. IN: DataSource -> DataStream -> Transformations -> DataStream
  2. Pattern: Pattern.begin.where.next.where...times...
  3. PatternStream: CEP.pattern(DataStream, Pattern)
  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

 KeySelector  keySelector = new NullByteKeySelector <>();

Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
  2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)
  3. PatternStream: CEP.pattern(KeyedStream, Pattern)
  4. OutputTag: new OutputTag(...)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

 public class CEPTimeoutEventJob { private static final String LOCAL_KAFKA_BROKER = "localhost:9092" ; private static final String GROUP_ID = CEPTimeoutEventJob . class .getSimpleName(); private static final String GROUP_TOPIC = GROUP_ID; public static void main( String [] args) throws Exception { // 参数 ParameterTool params = ParameterTool .fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // 使用事件时间 env.setStreamTimeCharacteristic( TimeCharacteristic . EventTime ); env.enableCheckpointing( 5000 ); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig . ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy( RestartStrategies .fixedDelayRestart( 5 , 10000 )); // 不使用POJO的时间 final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor (); // 与Kafka Topic的Partition保持一致 env.setParallelism( 3 ); Properties kafkaProps = new Properties (); kafkaProps.setProperty( "bootstrap.servers" , LOCAL_KAFKA_BROKER); kafkaProps.setProperty( "group.id" , GROUP_ID); // 接入Kafka的消息 FlinkKafkaConsumer011  consumer = new FlinkKafkaConsumer011 <>(GROUP_TOPIC, new POJOSchema (), kafkaProps); DataStream  pojoDataStream = env.addSource(consumer) .assignTimestampsAndWatermarks(extractor); pojoDataStream.print(); // 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】 // 1. DataStream  keyedPojos = pojoDataStream .keyBy( "aid" ); // 从初始化到终态-一个完整的POJO事件序列 // 2. Pattern  completedPojo = Pattern .begin( "init" ) .where( new SimpleCondition () { private static final long serialVersionUID = - 6847788055093903603L ; @Override public boolean filter(POJO pojo) throws Exception { return "02" .equals(pojo.getAstatus()); } }) .followedBy( "end" ) //            .next("end") .where( new SimpleCondition () { private static final long serialVersionUID = - 2655089736460847552L ; @Override public boolean filter(POJO pojo) throws Exception { return "00" .equals(pojo.getAstatus()) || "01" .equals(pojo.getAstatus()); } }); // 找出1分钟内【便于测试】都没有到终态的事件aid // 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream // 3. PatternStream  patternStream = CEP.pattern(keyedPojos, completedPojo.within( Time .minutes( 1 ))); // 定义侧面输出timedout // 4. OutputTag  timedout = new OutputTag ( "timedout" ) { private static final long serialVersionUID = 773503794597666247L ; }; // OutputTag timeoutOutputTag, PatternFlatTimeoutFunction patternFlatTimeoutFunction, PatternFlatSelectFunction patternFlatSelectFunction // 5. SingleOutputStreamOperator  timeoutPojos = patternStream.flatSelect( timedout, new POJOTimedOut (), new FlatSelectNothing () ); // 打印输出超时的POJO // 6.7. timeoutPojos.getSideOutput(timedout).print(); timeoutPojos.print(); env.execute( CEPTimeoutEventJob . class .getSimpleName()); } /** * 把超时的事件收集起来 */ public static class POJOTimedOut implements PatternFlatTimeoutFunction  { private static final long serialVersionUID = - 4214641891396057732L ; @Override public void timeout( Map > map, long l, Collector  collector) throws Exception { if ( null != map.get( "init" )) { for (POJO pojoInit : map.get( "init" )) { System .out.println( "timeout init:" + pojoInit.getAid()); collector.collect(pojoInit); } } // 因为end超时了,还没收到end,所以这里是拿不到end的 System .out.println( "timeout end: " + map.get( "end" )); } } /** * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了 * 一分钟时间内走完init和end的数据 * * @param  */ public static class FlatSelectNothing  implements PatternFlatSelectFunction  { private static final long serialVersionUID = - 3029589950677623844L ; @Override public void flatSelect( Map > pattern, Collector  collector) { System .out.println( "flatSelect: " + pattern); } } }

测试结果(followedBy):

 3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null }]} timeout init:ID000- 1 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419829639 , energy= 467.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419841394 , energy= 107.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419979567 , energy= 32.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null }]} 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420078008 , energy= 275.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } timeout init:ID000- 4 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null

总结

以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

以上就是Apache FlinkCEP 实现超时状态监控的步骤详解的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » 服务器