Chinaunix首页 | 论坛 | 博客
  • 博客访问: 495706
  • 博文数量: 80
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1916
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-11 22:01
个人简介

从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。

文章分类

全部博文(80)

文章存档

2017年(11)

2015年(3)

2014年(33)

2013年(33)

分类: 大数据

2017-06-01 15:47:03

Flink CEP 是 Flink 的复杂处理库。它允许用户快速检测无尽数据流中的复杂模式。不过 Flink CEP 仅可用于通过 DataStream API处理。 参考细说,我们知道Flink 的每个模式包含多个状态,模式匹配的过程就是状态转换的过程,每个状态(state)可以理解成由Pattern构成,为了从当前的状态转换成下一个状态,用户可以在Pattern上指定条件,用于状态的过滤和转换。
实际上Flink CEP 首先需要用户创建定义一个个pattern,然后通过链表将由前后逻辑关系的pattern串在一起,构成模式匹配的逻辑表达。然后需要用户利用NFACompiler,将模式进行分拆,创建出NFA(非确定有限自动机)对象,NFA包含了该次模式匹配的各个状态和状态间转换的表达式。整个示意图就像如下:

上图中的三个pattern通过编译生成了NFA,NFA包含了四个状态,其中endstate是在编译的时候自动加上的,来作为终止状态。状态间转换是通过箭头表示的状态迁移边(StateTransition)来实现的,我们注意到state2做状态迁移的时候存在三条边(take,proceed,ingore),为什么有的状态只有一条边?有的状态有两条边?有的状态上有三条边?我们这里先埋个伏笔,之后我们会做解释。

Flink-cep 实例讲解

在正式开始讲解Flink CEP 具体实现过程,我们先看下面一个很简单的例子:

 List> inputEvents = new ArrayList<>(); // 构建数据源 Event event0 = new Event(0, "x", 1.0);
        Event event1 = new Event(1, "a", 1.0);
        Event event2 = new Event(2, "b", 2.0);
        Event event3 = new Event(3, "c", 3.0);
        Event event4 = new Event(4, "a", 4.0);
        Event event5 = new Event(5, "b", 5.0);

        inputEvents.add(new StreamRecord<>(event0, 3));
        inputEvents.add(new StreamRecord<>(event1, 3));
        inputEvents.add(new StreamRecord<>(event2, 3));
        inputEvents.add(new StreamRecord<>(event3, 3));

        inputEvents.add(new StreamRecord<>(event4, 3));
        inputEvents.add(new StreamRecord<>(event5, 3)); //构建start状态的pattern1,过滤条件是事件名称以"a"开头 Pattern pattern1 = Pattern.begin("pattern1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L;

            @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a");
            }
        }); //构建end状态的pattern2,过滤条件是时间名称以"b"开头。其中pattern间是链式连接 Pattern pattern2 = pattern1.next("pattern2").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L;

            @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b");
            }
        }); //将pattern编译成nfa NFA nfa = NFACompiler.compile(pattern2, Event.createTypeSerializer(), false); //接入数据源到nfa,输出结果 List> resultingPatterns = feedNFA(inputEvents, nfa); 

上述例子pattern间的连接是通过"next"指定的,next是非常严格的匹配连接关系。表示前一个pattern匹配的事件必须紧跟后面一个pattern的事件。所以上述的例子表示匹配两个连续事件[E1,E2],其中事件E1必须满足pattern1条件,事件E2必须满足pattern2条件。它的状态迁移图如下所示:

当数据源接入的时候,事件匹配是逆序的,是从已经匹配成功的下一个pattern往回推近匹配的,这里要特别注意。第一条消息的匹配是pattern1的条件,当事件event0到来,匹配失败,则state1状态不做任何更新;第二条消息event1到来,依然从pattern1的条件开始匹配,匹配成功,则state1更新状态,此时匹配pattern向前推进到pattern2;第三条消息event2到来,先对pattern2进行条件匹配,匹配成功,状态state2状态需要更新,由于state2是最后一个模式匹配状态,意味着模式匹配成功,这是将state1和state2输出,并做状态清理动作。第四条消息来的时候,又重新做新的状态匹配…… 所以最后的输出结果是[event1, event2]和[event4, event5]。当然这个例子很简单,实际上flink具体实现还是很复杂的。

Flink-cep 三种状态迁移边

在文章开始的时候,我们谈到了每个状态迁移会涉及到三类状态迁移边,分别是Take、Proceed、Ingore。
Take: 表示事件匹配成功,将当前状态更新到新状态,并前进到“下一个”状态;
Procceed: 当事件来到的时候,当前状态不发生变化,在状态转换图中事件直接“前进”到下一个目标状态;
IGNORE: 当事件来到的时候,如果匹配不成功,忽略当前事件,当前状态不发生任何变化。

为了更好的理解上述概念,我们利用下面代码,构建一个nfa

 //构建链接patterns Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c");
            }
        }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a");
            }
        }).optional(); //创建nfa NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); 

构建的nfa的状态示意图如下所示:

此时如果我们加入数据源如下, 理论上的输出结果应该是[event1] 和 [event1,event4]。

 Event event0 = new Event(40, "x", 1.0); Event event1 = new Event(40, "c", 1.0); Event event2 = new Event(42, "b", 2.0); Event event3 = new Event(43, "b", 2.0); Event event4 = new Event(44, "a", 2.0); Event event5 = new Event(45, "b", 5.0); 

下面我们来分析下,当第一条消息event0来的时候,由于start状态只有Take状态迁移边,这时event0匹配失败,消息被丢失,start状态不发生任何变化;当第二条消息event1来的时候,匹配成功,这时用event1更新start当前状态,并且进入下一个状态,既mid状态。而这是我们发现mid状态存在Proceed状态迁移边,以为着事件来临时,可以直接进入下一个状态,这里就是endstat状态,说明匹配结束,存在第一个匹配结果[event1];当第三条消息event2来临时,由于之前我们已经进入了mid状态,所以nfa会让我们先匹配mid的条件,匹配失败,由于mid状态存在Ingore状态迁移边,所以当前mid状态不发生变化,event2继续往回匹配start的条件,匹配失败,这时event2被丢弃;同样的event3也不会影响nfa的所有状态,被丢弃。当第五条消息event4来临时,匹配mid的条件成功,更新当前mid状态,并且进入“下一个状态”,那就是endstat状态,说明匹配结束,存在第二个匹配结果[event1, event4]。

Flink-cep 共享缓存SharedBuffer

在引入SharedBuffer概念之前,我们先把上图的例子改一下,将原先pattern1 和 pattern2的连接关系由next,改成followedByAny。

 Pattern pattern2 = pattern1.followedByAny("pattern2").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b");
            }
        }); 

followedByAny是非严格的匹配连接关系。表示前一个pattern匹配的事件和后面一个pattern的事件间可以间隔多个任意元素。所以上述的例子输出结果是[event1, event2]、[event4, event5]和[event1, event5]。当匹配event1成功后,由于event2还没到来,需要将event1保存到state1,这样每个状态需要缓冲堆栈来保存匹配成功的事件,我们把各个状态的对应缓冲堆栈集称之为缓冲区。由于上述例子有三种输出,理论上我们需要创建三个独立的缓冲区。

做三个独立的缓冲区实现上是没有问题,但是我们发现缓冲区3状态stat1的堆栈和缓冲区stat1的堆栈是一样的,我们完全没有必要分别占用内存。而且在实际的模式匹配场景下,每个缓冲区独立维护的堆栈中可能会有大量的数据重叠。随着流事件的不断流入,为每个匹配结果独立维护缓存区占用内存会越来越大。所以Flink CEP 提出了共享缓存区的概念(SharedBuffer),就是用一个共享的缓存区来表示上面三个缓存区。




在共享缓冲区实现里头,Flink CEP 设计了一个带版本的共享缓冲区。它会给每一次匹配分配一个版本号并使用该版本号来标记在这次匹配中的所有指针。但这里又会面临另一个问题:无法为某次匹配预分配版本号,因为任何非确定性的状态都能派生出新的匹配。而解决这一问题的技术是采用杜威十进制分类法[^1]来编码版本号,它以(.)?(1≤j≤t)的形式动态增长,这里t关联着当前状态。直观地说,它表示这次运行从状态开始被初始化然后到达状态,并从中分割出的实例,这被称之为祖先运行。这种版本号编码技术也保证一个运行的版本号v跟它的祖先运行的版本号兼容。具体而言也就是说:(1)v包含了v’作为前缀或者(2)v与v’仅最后一个数值不同,而对于版本v而言要大于版本v’。根据对这段话的理解,上述共享区从e5往回查找数据,可以达到两条路径分别是[e4,e5]和[e1, e5]。这个例子过于简单,我们下面通过以下代码创建一个比较复杂的共享缓存区吧。


 //events表示事件数组 //初始化共享缓存区 SharedBuffer sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer()); //往共享缓存区添加元素 sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
        sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 0, DeweyNumber.fromString("2"));

        sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
        sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, 1, DeweyNumber.fromString("1.0"));
        sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, 2, DeweyNumber.fromString("1.0"));
        sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, 3, DeweyNumber.fromString("1.0"));
        sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.1"));
        sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
        sharedBuffer.put("b", events[8], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.0.0"));
        sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0")); 

上面数组构建出的共享区示意图如下:




从e8往回遍历,只要满足(1)v包含了v’作为前缀或者(2)v与v’仅最后一个数值不同,而对于版本v而言要大于版本v’ 这两个条件的任意一条条件时,说明往前是兼容成功的。所以从上述共享缓冲区可以存储的结果是
[e0,e1,e2,e3,e4,e8]和[e0,e1,e2,e3,e4,e5,e6,e7]。为了基于版本号提取某个匹配的的所有元素,Flink定义了一个ExtractionState来存储提取状态的信息,该数据结构内部以栈结构来存储向前遍历的整个路径,采用的深度优先搜索。

Flink-cep 模式匹配过程

Flink CEP做模式匹配,是依赖模式流(PatternStream)来实现的,一个PatternStream对象表示模式检测到的序列所对应的流。为了使用PatternStream,我们首先要构建它,为此Flink提供了一个名为CEP的帮助类,它定义了一个pattern静态方法,将数据流和pattern作为形参传入。

DataStream<String> inputStream = ... Pattern<String, ?> pattern = ...
PatternStream<String> patternStream = CEP.pattern(inputStream, pattern); 

然后我们会在PatternStream对象上调用select或flatSelect来获取某个模式下匹配到的事件来实现我们的业务逻辑。一般select的实现如下:

 public  SingleOutputStreamOperator select(final PatternSelectFunction patternSelectFunction, TypeInformation outTypeInfo) { //创建DataStream SingleOutputStreamOperator<Map<String, List>> patternStream =
                CEPOperatorUtils.createPatternStream(inputStream, pattern); return patternStream.map( new PatternSelectMapper<>(
                patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
            .returns(outTypeInfo);
    } 

在select里头主要还是通过CEPOperatorUtils.createPatternStream创建DataStream,在对createPatternStream进行分析,首先通过

 final NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false); 

创建NFA工厂,然后将nfaFactory作为参数,创建CEP Operator对象。构建NAF工厂的过程,其实就是将pattern间的逻辑关系,转换成具体的各个状态,并用状态迁移边去描述状态间的迁移关系。具体的实现如下:

public static  NFAFactory compileFactory(
    Pattern pattern,
    TypeSerializer inputTypeSerializer,
    boolean timeoutHandling) { if (pattern == null) { //如果模式为null,返回一个NFA工厂的实现,且不传递任何状态,意味着将创建一个空的NFA对象 return new NFAFactoryImpl(inputTypeSerializer, 0, 
            Collections.>emptyList(), timeoutHandling);
    } else { //构建一个Map来存储所有生成的状态 Map<String, State> states = new HashMap<>();
        long windowTime;

        Pattern succeedingPattern;
        State succeedingState;
        Pattern currentPattern = pattern; //构建最终态,并加入到Map中,这里将会从Pattern的尾部向头部进行遍历,所以构建的第一个状态是尾部的最终态 State currentState = new State<>(currentPattern.getName(), State.StateType.Final);
        states.put(currentPattern.getName(), currentState); //提取当前Pattern对象的窗口时间 windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() 
            : 0L; //不断向前遍历(不包含第一个Pattern对象) while (currentPattern.getPrevious() != null) { //相关变量交换 succeedingPattern = currentPattern;
            succeedingState = currentState;
            currentPattern = currentPattern.getPrevious(); //获得窗口时间 Time currentWindowTime = currentPattern.getWindowTime(); //如果当前Pattern的窗口时间比其之前Pattern的窗口时间小,则将之前Pattern的窗口时间更新为新的窗口时间 if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
                windowTime = currentWindowTime.toMilliseconds();
            } //获取或构建状态 if (states.containsKey(currentPattern.getName())) {
                currentState = states.get(currentPattern.getName());
            } else {
                currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
                states.put(currentState.getName(), currentState);
            } //为当前状态设置跟后一个状态之间的转换(边),注意状态转换是“TAKE”,这里同时传入了Pattern所注入的条件 currentState.addStateTransition(new StateTransition(
                StateTransitionAction.TAKE,
                succeedingState,
                (FilterFunction) succeedingPattern.getFilterFunction())); //如果后一个模式是非紧邻模式,则为当前状态构建自循环的“IGNORE”转换 if (succeedingPattern instanceof FollowedByPattern) {
                currentState.addStateTransition(new StateTransition(
                    StateTransitionAction.IGNORE,
                    currentState, null ));
            }
        }

        final State beginningState; //获取或构建起始状态 if (states.containsKey(BEGINNING_STATE_NAME)) {
            beginningState = states.get(BEGINNING_STATE_NAME);
        } else {
            beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
            states.put(BEGINNING_STATE_NAME, beginningState);
        } //添加状态转换(起始状态,只能通过“TAKE”向下一状态转换) beginningState.addStateTransition(new StateTransition(
            StateTransitionAction.TAKE,
            currentState,
            (FilterFunction) currentPattern.getFilterFunction()
        )); //以所有的状态构建NFAFactoryImpl对象,它将用来创建NFA对象 return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    }
} 

在CEP Operator的processElement是用来处理消息的,这里头一般会调用NFA对象的process方法,逐个处理事件,所以真正处理事件的逻辑被封装在NAF对象里头。另外需要注意的是,如果是普通数据流,其并行度被设置为1,也就是整个数据流没办法分区以并行执行,而是作为一个全局数据流参与模式匹配。这一点其实不难想象,因为我们在分析模式时,其有事件选择策略(严格紧邻还是非严格紧邻),也就是说事件前后顺序是模式的一部分,那么这时候如果普通事件流再分区执行,将会打破这种顺序,从而导致匹配失效。Process方法主要借助ComputationState对象,来记录计算过的事件的状态,具体的剖析我们可以参考Flink-CEP之NFA,这里就不再赘述,这部分实现逻辑分析有点过时,但是可以帮助更好的理解。

阅读(9645) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~