从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。
分类: 大数据
2017-06-01 15:47:03
Flink CEP 是 Flink 的复杂处理库。它允许用户快速检测无尽数据流中的复杂模式。不过 Flink CEP 仅可用于通过 DataStream API处理。 参考细说,我们知道Flink 的每个模式包含多个状态,模式匹配的过程就是状态转换的过程,每个状态(state)可以理解成由Pattern构成,为了从当前的状态转换成下一个状态,用户可以在Pattern上指定条件,用于状态的过滤和转换。
实际上Flink CEP 首先需要用户创建定义一个个pattern,然后通过链表将由前后逻辑关系的pattern串在一起,构成模式匹配的逻辑表达。然后需要用户利用NFACompiler,将模式进行分拆,创建出NFA(非确定有限自动机)对象,NFA包含了该次模式匹配的各个状态和状态间转换的表达式。整个示意图就像如下:
上图中的三个pattern通过编译生成了NFA,NFA包含了四个状态,其中endstate是在编译的时候自动加上的,来作为终止状态。状态间转换是通过箭头表示的状态迁移边(StateTransition)来实现的,我们注意到state2做状态迁移的时候存在三条边(take,proceed,ingore),为什么有的状态只有一条边?有的状态有两条边?有的状态上有三条边?我们这里先埋个伏笔,之后我们会做解释。
在正式开始讲解Flink CEP 具体实现过程,我们先看下面一个很简单的例子:
List> inputEvents = new ArrayList<>(); // 构建数据源 Event event0 = new Event(0, "x", 1.0);
Event event1 = new Event(1, "a", 1.0);
Event event2 = new Event(2, "b", 2.0);
Event event3 = new Event(3, "c", 3.0);
Event event4 = new Event(4, "a", 4.0);
Event event5 = new Event(5, "b", 5.0);
inputEvents.add(new StreamRecord<>(event0, 3));
inputEvents.add(new StreamRecord<>(event1, 3));
inputEvents.add(new StreamRecord<>(event2, 3));
inputEvents.add(new StreamRecord<>(event3, 3));
inputEvents.add(new StreamRecord<>(event4, 3));
inputEvents.add(new StreamRecord<>(event5, 3)); //构建start状态的pattern1,过滤条件是事件名称以"a"开头 Pattern pattern1 = Pattern.begin("pattern1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L;
@Override public boolean filter(Event value) throws Exception { return value.getName().equals("a");
}
}); //构建end状态的pattern2,过滤条件是时间名称以"b"开头。其中pattern间是链式连接 Pattern pattern2 = pattern1.next("pattern2").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L;
@Override public boolean filter(Event value) throws Exception { return value.getName().equals("b");
}
}); //将pattern编译成nfa NFA nfa = NFACompiler.compile(pattern2, Event.createTypeSerializer(), false); //接入数据源到nfa,输出结果 List> resultingPatterns = feedNFA(inputEvents, nfa);
上述例子pattern间的连接是通过"next"指定的,next是非常严格的匹配连接关系。表示前一个pattern匹配的事件必须紧跟后面一个pattern的事件。所以上述的例子表示匹配两个连续事件[E1,E2],其中事件E1必须满足pattern1条件,事件E2必须满足pattern2条件。它的状态迁移图如下所示:
当数据源接入的时候,事件匹配是逆序的,是从已经匹配成功的下一个pattern往回推近匹配的,这里要特别注意。第一条消息的匹配是pattern1的条件,当事件event0到来,匹配失败,则state1状态不做任何更新;第二条消息event1到来,依然从pattern1的条件开始匹配,匹配成功,则state1更新状态,此时匹配pattern向前推进到pattern2;第三条消息event2到来,先对pattern2进行条件匹配,匹配成功,状态state2状态需要更新,由于state2是最后一个模式匹配状态,意味着模式匹配成功,这是将state1和state2输出,并做状态清理动作。第四条消息来的时候,又重新做新的状态匹配…… 所以最后的输出结果是[event1, event2]和[event4, event5]。当然这个例子很简单,实际上flink具体实现还是很复杂的。
在文章开始的时候,我们谈到了每个状态迁移会涉及到三类状态迁移边,分别是Take、Proceed、Ingore。
Take: 表示事件匹配成功,将当前状态更新到新状态,并前进到“下一个”状态;
Procceed: 当事件来到的时候,当前状态不发生变化,在状态转换图中事件直接“前进”到下一个目标状态;
IGNORE: 当事件来到的时候,如果匹配不成功,忽略当前事件,当前状态不发生任何变化。
为了更好的理解上述概念,我们利用下面代码,构建一个nfa
//构建链接patterns Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c");
}
}).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a");
}
}).optional(); //创建nfa NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
构建的nfa的状态示意图如下所示:
此时如果我们加入数据源如下, 理论上的输出结果应该是[event1] 和 [event1,event4]。
Event event0 = new Event(40, "x", 1.0); Event event1 = new Event(40, "c", 1.0); Event event2 = new Event(42, "b", 2.0); Event event3 = new Event(43, "b", 2.0); Event event4 = new Event(44, "a", 2.0); Event event5 = new Event(45, "b", 5.0);
下面我们来分析下,当第一条消息event0来的时候,由于start状态只有Take状态迁移边,这时event0匹配失败,消息被丢失,start状态不发生任何变化;当第二条消息event1来的时候,匹配成功,这时用event1更新start当前状态,并且进入下一个状态,既mid状态。而这是我们发现mid状态存在Proceed状态迁移边,以为着事件来临时,可以直接进入下一个状态,这里就是endstat状态,说明匹配结束,存在第一个匹配结果[event1];当第三条消息event2来临时,由于之前我们已经进入了mid状态,所以nfa会让我们先匹配mid的条件,匹配失败,由于mid状态存在Ingore状态迁移边,所以当前mid状态不发生变化,event2继续往回匹配start的条件,匹配失败,这时event2被丢弃;同样的event3也不会影响nfa的所有状态,被丢弃。当第五条消息event4来临时,匹配mid的条件成功,更新当前mid状态,并且进入“下一个状态”,那就是endstat状态,说明匹配结束,存在第二个匹配结果[event1, event4]。
在引入SharedBuffer概念之前,我们先把上图的例子改一下,将原先pattern1 和 pattern2的连接关系由next,改成followedByAny。
Pattern pattern2 = pattern1.followedByAny("pattern2").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b");
}
});
followedByAny是非严格的匹配连接关系。表示前一个pattern匹配的事件和后面一个pattern的事件间可以间隔多个任意元素。所以上述的例子输出结果是[event1, event2]、[event4, event5]和[event1, event5]。当匹配event1成功后,由于event2还没到来,需要将event1保存到state1,这样每个状态需要缓冲堆栈来保存匹配成功的事件,我们把各个状态的对应缓冲堆栈集称之为缓冲区。由于上述例子有三种输出,理论上我们需要创建三个独立的缓冲区。
做三个独立的缓冲区实现上是没有问题,但是我们发现缓冲区3状态stat1的堆栈和缓冲区stat1的堆栈是一样的,我们完全没有必要分别占用内存。而且在实际的模式匹配场景下,每个缓冲区独立维护的堆栈中可能会有大量的数据重叠。随着流事件的不断流入,为每个匹配结果独立维护缓存区占用内存会越来越大。所以Flink CEP 提出了共享缓存区的概念(SharedBuffer),就是用一个共享的缓存区来表示上面三个缓存区。
//events表示事件数组 //初始化共享缓存区 SharedBuffer sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer()); //往共享缓存区添加元素 sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 0, DeweyNumber.fromString("2"));
sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, 1, DeweyNumber.fromString("1.0"));
sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, 2, DeweyNumber.fromString("1.0"));
sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, 3, DeweyNumber.fromString("1.0"));
sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.1"));
sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
sharedBuffer.put("b", events[8], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.0.0"));
sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
上面数组构建出的共享区示意图如下:
从e8往回遍历,只要满足(1)v包含了v’作为前缀或者(2)v与v’仅最后一个数值不同,而对于版本v而言要大于版本v’ 这两个条件的任意一条条件时,说明往前是兼容成功的。所以从上述共享缓冲区可以存储的结果是
[e0,e1,e2,e3,e4,e8]和[e0,e1,e2,e3,e4,e5,e6,e7]。为了基于版本号提取某个匹配的的所有元素,Flink定义了一个ExtractionState来存储提取状态的信息,该数据结构内部以栈结构来存储向前遍历的整个路径,采用的深度优先搜索。
Flink CEP做模式匹配,是依赖模式流(PatternStream)来实现的,一个PatternStream对象表示模式检测到的序列所对应的流。为了使用PatternStream,我们首先要构建它,为此Flink提供了一个名为CEP的帮助类,它定义了一个pattern静态方法,将数据流和pattern作为形参传入。
DataStream<String> inputStream = ... Pattern<String, ?> pattern = ...
PatternStream<String> patternStream = CEP.pattern(inputStream, pattern);
然后我们会在PatternStream对象上调用select或flatSelect来获取某个模式下匹配到的事件来实现我们的业务逻辑。一般select的实现如下:
public SingleOutputStreamOperator select(final PatternSelectFunction patternSelectFunction, TypeInformation outTypeInfo) { //创建DataStream SingleOutputStreamOperator<Map<String, List>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern); return patternStream.map( new PatternSelectMapper<>(
patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
.returns(outTypeInfo);
}
在select里头主要还是通过CEPOperatorUtils.createPatternStream创建DataStream,在对createPatternStream进行分析,首先通过
final NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);
创建NFA工厂,然后将nfaFactory作为参数,创建CEP Operator对象。构建NAF工厂的过程,其实就是将pattern间的逻辑关系,转换成具体的各个状态,并用状态迁移边去描述状态间的迁移关系。具体的实现如下:
public static NFAFactory compileFactory(
Pattern pattern,
TypeSerializer inputTypeSerializer,
boolean timeoutHandling) { if (pattern == null) { //如果模式为null,返回一个NFA工厂的实现,且不传递任何状态,意味着将创建一个空的NFA对象 return new NFAFactoryImpl(inputTypeSerializer, 0,
Collections.>emptyList(), timeoutHandling);
} else { //构建一个Map来存储所有生成的状态 Map<String, State> states = new HashMap<>();
long windowTime;
Pattern succeedingPattern;
State succeedingState;
Pattern currentPattern = pattern; //构建最终态,并加入到Map中,这里将会从Pattern的尾部向头部进行遍历,所以构建的第一个状态是尾部的最终态 State currentState = new State<>(currentPattern.getName(), State.StateType.Final);
states.put(currentPattern.getName(), currentState); //提取当前Pattern对象的窗口时间 windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds()
: 0L; //不断向前遍历(不包含第一个Pattern对象) while (currentPattern.getPrevious() != null) { //相关变量交换 succeedingPattern = currentPattern;
succeedingState = currentState;
currentPattern = currentPattern.getPrevious(); //获得窗口时间 Time currentWindowTime = currentPattern.getWindowTime(); //如果当前Pattern的窗口时间比其之前Pattern的窗口时间小,则将之前Pattern的窗口时间更新为新的窗口时间 if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
windowTime = currentWindowTime.toMilliseconds();
} //获取或构建状态 if (states.containsKey(currentPattern.getName())) {
currentState = states.get(currentPattern.getName());
} else {
currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
states.put(currentState.getName(), currentState);
} //为当前状态设置跟后一个状态之间的转换(边),注意状态转换是“TAKE”,这里同时传入了Pattern所注入的条件 currentState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
succeedingState,
(FilterFunction) succeedingPattern.getFilterFunction())); //如果后一个模式是非紧邻模式,则为当前状态构建自循环的“IGNORE”转换 if (succeedingPattern instanceof FollowedByPattern) {
currentState.addStateTransition(new StateTransition(
StateTransitionAction.IGNORE,
currentState, null ));
}
}
final State beginningState; //获取或构建起始状态 if (states.containsKey(BEGINNING_STATE_NAME)) {
beginningState = states.get(BEGINNING_STATE_NAME);
} else {
beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
states.put(BEGINNING_STATE_NAME, beginningState);
} //添加状态转换(起始状态,只能通过“TAKE”向下一状态转换) beginningState.addStateTransition(new StateTransition(
StateTransitionAction.TAKE,
currentState,
(FilterFunction) currentPattern.getFilterFunction()
)); //以所有的状态构建NFAFactoryImpl对象,它将用来创建NFA对象 return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
}
}
在CEP Operator的processElement是用来处理消息的,这里头一般会调用NFA对象的process方法,逐个处理事件,所以真正处理事件的逻辑被封装在NAF对象里头。另外需要注意的是,如果是普通数据流,其并行度被设置为1,也就是整个数据流没办法分区以并行执行,而是作为一个全局数据流参与模式匹配。这一点其实不难想象,因为我们在分析模式时,其有事件选择策略(严格紧邻还是非严格紧邻),也就是说事件前后顺序是模式的一部分,那么这时候如果普通事件流再分区执行,将会打破这种顺序,从而导致匹配失效。Process方法主要借助ComputationState对象,来记录计算过的事件的状态,具体的剖析我们可以参考Flink-CEP之NFA,这里就不再赘述,这部分实现逻辑分析有点过时,但是可以帮助更好的理解。