Flink | 六十六、FlinkCEP之处理匹配事件

作者: 雨中散步撒哈拉
  • [一、匹配事件的选择提取(select)]
    • [1. PatternSelectFunction]
    • [2. PatternFlatSelectFunction]
  • [二、匹配事件的通用处理(process)]

自己写的匹配规则怎么应用到流中呢?

一、匹配事件的选择提取(select)

处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来, 包装成想要的信息输出,这个操作就是"选择"(select)。

1. PatternSelectFunction

代码中基于 PatternStream 直接调用.select()方法,传入一个PatternSelectFunction 作为参数。

PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
DataStream<String> result = patternStream.select(new MyPatternSelectFunction());

这 里 的 MyPatternSelectFunction 是 PatternSelectFunction 的 一 个 具 体 实 现 。
PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的"事件名称"就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的value 就是一个事件的列表(List)。

下面是 MyPatternSelectFunction 的一个具体实现:

        class MyPatternSelectFunction implements PatternSelectFunction<Event, String> {
            @Override
            public String select(Map<String, List<Event>> pattern) throws Exception {
                Event startEvent = pattern.get("start").get(0);
                Event middleEvent = pattern.get("middle").get(0);
                return startEvent.toString() + " " + middleEvent.toString();
            }
        }

PatternSelectFunction 里需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它以保存了匹配复杂事件的 Map 作为输入,经自定义转换后得到输出信息返回。这里我们假设之前定义的模式序列中,有名为"start"和"middle"的两个个体模式, 于是可以通过这个名称从 Map 中选择提取出对应的事件。注意调用 Map 的.get(key)方法后得到的是一个事件的List;如果个体模式是单例的,那么List 中只有一个元素,直接调用.get(0) 就可以把它取出。

当然,如果个体模式是循环的,List 中就有可能有多个元素了。例如我们在 简单案例中对连续登录失败检测的改进,我们可以将匹配到的事件包装成 String 类型的报警信息输出,代码如下:

    @Test
    public void first2List() {
        // 读入流
        KeyedStream<LoginEvent, String> keyedStream = env.fromElements(
                new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 4000L), new LoginEvent("user_1", "171.56.23.10", "fail", 5000L), new LoginEvent("user_2", "192.168.1.29", "success", 6000L), new LoginEvent("user_2", "192.168.1.29", "fail", 7000L), new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
        )
                .assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<LoginEvent>() {
                                    @Override
                                    public long extractTimestamp(LoginEvent loginEvent, long l) {
                                        return loginEvent.timestamp;
                                    }
                                }
                        )
                )
                .keyBy(log -> log.userId);
        // 定义规则
        Pattern<LoginEvent, LoginEvent> pattern = Pattern
                // 第一次
                .<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.eventType);
                    }
                }).times(3).consecutive();
        // 规则应用到流上
        PatternStream<LoginEvent> patternStream = CEP.pattern(keyedStream, pattern);
        // 匹配出的结果
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
                        LoginEvent first = map.get("first").get(0);
                        LoginEvent second = map.get("second").get(0);
                        LoginEvent third = map.get("third").get(0);
                        return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
                    }
                })
                .print();
    }

我们定义的模式序列中只有一个循环模式 fails,它会将检测到的 3 个登录失败事件保存到一个列表(List)中。所以第三步处理匹配的复杂事件时,我们从 map 中获取模式名 fails 对应的事件,拿到的是一个 List,从中按位置索引依次获取元素就可以得到匹配的三个登录失败事件。

运行程序进行测试,会发现结果与之前完全一样。

PatternSelectFunction把流和规则结合起来后,输出结果暂存到map集合中

2. PatternFlatSelectFunction

除此之外, PatternStream 还有一个类似的方法是.flatSelect() , 传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是PatternSelectFunction 的"扁平化"版本;内部需要实现一个 flatSelect()方法,它与之前 select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用 out.collet()方法就可以实现多次发送输出数据了。

例如上面的代码可以写成:

        patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
            @Override
            public void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out) throws Exception {
                LoginEvent first = map.get("fails").get(0);
                LoginEvent second = map.get("fails").get(1);
                LoginEvent third = map.get("fails").get(2);
                out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
                        ", " + second.timestamp + ", " + third.timestamp);
            }
        }).print("warning");

可见 PatternFlatSelectFunction 使用更加灵活,完全能够覆盖PatternSelectFunction 的功能。这跟 FlatMapFunction 与 MapFunction 的区别是一样的。

二、匹配事件的通用处理(process)

自 1.8 版本之后,Flink CEP 引入了对于匹配事件的通用检测处理方式,那就是直接调用PatternStream 的.process()方法,传入一个PatternProcessFunction。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。

所以 PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,atternSelectFunction 和 PatternFlatSelectFunction 在 CEP 内部执行时也会被转换成PatternProcessFunction。

我们可以使用PatternProcessFunction 将之前的代码重写如下:

 // 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
        patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
            @Override
            public void processMatch(Map<String, List<LoginEvent>> map, Context ctx, Collector<String> out) throws Exception {
                LoginEvent first = map.get("fails").get(0);
                LoginEvent second = map.get("fails").get(1);
                LoginEvent third = map.get("fails").get(2);
                out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
                        ", " + second.timestamp + ", " + third.timestamp);
            }
        }).print("warning");

可以看到,PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time);还可以调用.output()方法将数据输出到侧输出流。侧输出流的功能是处理函数的一大特性,我们已经非常熟悉;而在 CEP 中,侧输出流一般被用来处理超时事件

无论是map、flatmap以及更为底层的process,其本质都是从匹配出的结果中处理结果

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多