- [一、匹配事件的选择提取(select)]
- [1. PatternSelectFunction]
- [2. PatternFlatSelectFunction]
- [二、匹配事件的通用处理(process)]
自己写的匹配规则怎么应用到流中呢?
一、匹配事件的选择提取(select)
处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来, 包装成想要的信息输出,这个操作就是"选择"(select)。
1. PatternSelectFunction
代码中基于 PatternStream 直接调用.select()方法,传入一个PatternSelectFunction 作为参数。
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
DataStream<String> result = patternStream.select(new MyPatternSelectFunction());
这 里 的 MyPatternSelectFunction 是 PatternSelectFunction 的 一 个 具 体 实 现 。
PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的"事件名称"就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的value 就是一个事件的列表(List)。
下面是 MyPatternSelectFunction 的一个具体实现:
class MyPatternSelectFunction implements PatternSelectFunction<Event, String> {
@Override
public String select(Map<String, List<Event>> pattern) throws Exception {
Event startEvent = pattern.get("start").get(0);
Event middleEvent = pattern.get("middle").get(0);
return startEvent.toString() + " " + middleEvent.toString();
}
}
PatternSelectFunction 里需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它以保存了匹配复杂事件的 Map 作为输入,经自定义转换后得到输出信息返回。这里我们假设之前定义的模式序列中,有名为"start"和"middle"的两个个体模式, 于是可以通过这个名称从 Map 中选择提取出对应的事件。注意调用 Map 的.get(key)方法后得到的是一个事件的List;如果个体模式是单例的,那么List 中只有一个元素,直接调用.get(0) 就可以把它取出。
当然,如果个体模式是循环的,List 中就有可能有多个元素了。例如我们在 简单案例中对连续登录失败检测的改进,我们可以将匹配到的事件包装成 String 类型的报警信息输出,代码如下:
@Test
public void first2List() {
// 读入流
KeyedStream<LoginEvent, String> keyedStream = env.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L), new LoginEvent("user_1", "171.56.23.10", "fail", 5000L), new LoginEvent("user_2", "192.168.1.29", "success", 6000L), new LoginEvent("user_2", "192.168.1.29", "fail", 7000L), new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<LoginEvent>() {
@Override
public long extractTimestamp(LoginEvent loginEvent, long l) {
return loginEvent.timestamp;
}
}
)
)
.keyBy(log -> log.userId);
// 定义规则
Pattern<LoginEvent, LoginEvent> pattern = Pattern
// 第一次
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.eventType);
}
}).times(3).consecutive();
// 规则应用到流上
PatternStream<LoginEvent> patternStream = CEP.pattern(keyedStream, pattern);
// 匹配出的结果
patternStream
.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent first = map.get("first").get(0);
LoginEvent second = map.get("second").get(0);
LoginEvent third = map.get("third").get(0);
return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
}
})
.print();
}
我们定义的模式序列中只有一个循环模式 fails,它会将检测到的 3 个登录失败事件保存到一个列表(List)中。所以第三步处理匹配的复杂事件时,我们从 map 中获取模式名 fails 对应的事件,拿到的是一个 List,从中按位置索引依次获取元素就可以得到匹配的三个登录失败事件。
运行程序进行测试,会发现结果与之前完全一样。
PatternSelectFunction把流和规则结合起来后,输出结果暂存到map集合中
2. PatternFlatSelectFunction
除此之外, PatternStream 还有一个类似的方法是.flatSelect() , 传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是PatternSelectFunction 的"扁平化"版本;内部需要实现一个 flatSelect()方法,它与之前 select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用 out.collet()方法就可以实现多次发送输出数据了。
例如上面的代码可以写成:
patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
@Override
public void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out) throws Exception {
LoginEvent first = map.get("fails").get(0);
LoginEvent second = map.get("fails").get(1);
LoginEvent third = map.get("fails").get(2);
out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
", " + second.timestamp + ", " + third.timestamp);
}
}).print("warning");
可见 PatternFlatSelectFunction 使用更加灵活,完全能够覆盖PatternSelectFunction 的功能。这跟 FlatMapFunction 与 MapFunction 的区别是一样的。
二、匹配事件的通用处理(process)
自 1.8 版本之后,Flink CEP 引入了对于匹配事件的通用检测处理方式,那就是直接调用PatternStream 的.process()方法,传入一个PatternProcessFunction。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。
所以 PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,atternSelectFunction 和 PatternFlatSelectFunction 在 CEP 内部执行时也会被转换成PatternProcessFunction。
我们可以使用PatternProcessFunction 将之前的代码重写如下:
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context ctx, Collector<String> out) throws Exception {
LoginEvent first = map.get("fails").get(0);
LoginEvent second = map.get("fails").get(1);
LoginEvent third = map.get("fails").get(2);
out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
", " + second.timestamp + ", " + third.timestamp);
}
}).print("warning");
可以看到,PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time);还可以调用.output()方法将数据输出到侧输出流。侧输出流的功能是处理函数的一大特性,我们已经非常熟悉;而在 CEP 中,侧输出流一般被用来处理超时事件
无论是map、flatmap以及更为底层的process,其本质都是从匹配出的结果中处理结果
文章作者: 雨中散步撒哈拉
文章链接: https://liudongdong.top/archives/flinkliu-shi-liu-flinkcep-zhi-chu-li-pi-pei-shi-jian