Flink | 六十四、FlinkCEP之个体模式

作者: 雨中散步撒哈拉
  • [一、基本形式]
  • [二、量词(Quantifiers )]
  • [三、条件(Conditions)]
    • [1. 限定子类型]
    • [2. 简单条件(Simple Conditions)]
    • [3. 迭代条件(Iterative Conditions)]
    • [4. 组合条件(Combining Conditions)]
    • [5. 终止条件(Stop Conditions)]

模式(Pattern)其实就是将一组简单事件组合成复杂事件的"匹配规则",其中把每个简单事件的匹配规则,叫作"个体模式"(Individual Pattern)。

一、基本形式

在之前三次登入失败案例中,每一个登录失败事件的选取规则,就都是一个个体模式。比如:

                // 第一次
                .<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.eventType);
                    }
                })

或者后面的:

                // 第二次
                .next("second")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.eventType);
                    }
                })

这些都是个体模式。个体模式一般都会匹配接收一个事件。

每个个体模式都以一个"连接词"开始定义的,比如 begin、next 等等,这是 Pattern 对象的一个方法(begin 是 Pattern 类的静态方法),返回的还是一个 Pattern。这些"连接词"方法有一个 String 类型参数,这就是当前个体模式唯一的名字,比如这里的"first"、“second”。在之后检测到匹配事件时,就会以这个名字来指代匹配事件。

个体模式需要一个"过滤条件",用来指定具体的匹配规则。这个条件一般是通过调用.where()方法来实现的,具体的过滤逻辑则通过传入的 SimpleCondition 内的.filter()方法来定义。

另外,个体模式可以匹配接收一个事件,也可以接收多个事件。这听起来有点奇怪,一个单独的匹配规则可能匹配到多个事件吗?这是可能的,我们可以给个体模式增加一个"量词"(quantifier),就能够让它进行循环匹配,接收多个事件。接下来我们就对量词和条件(condition)进行展开说明。

二、量词(Quantifiers )

个体模式后面可以跟一个"量词",用来指定循环的次数。从这个角度分类,个体模式可以包括"单例(singleton)模式"和"循环(looping)模式"。默认情况下,个体模式是单例模式,匹配接收一个事件;当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。

在循环模式中,对同样特征的事件可以匹配多次。比如我们定义个体模式为"匹配形状为三角形的事件",再让它循环多次,就变成了"匹配连续多个三角形的事件"。注意这里的"连续",只要保证前后顺序即可,中间可以有其他事件,所以是"宽松近邻"关系。

在 Flink CEP 中,可以使用不同的方法指定循环模式,主要有:

  1. .oneOrMore()
    匹配事件出现一次或多次,假设 a 是一个个体模式,a.oneOrMore()表示可以匹配 1 个或多个 a 的事件组合。我们有时会用 a+来简单表示。
  2. .times(times)
    匹配事件发生特定次数(times),例如 a.times(3)表示 aaa;
  3. .times(fromTimes,toTimes)
    指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2,4)可以匹配 aa,aaa 和 aaaa。
  4. .greedy()
    只能用在循环模式后,使当前循环模式变得"贪心"(greedy),也就是总是尽可能多地去匹配。例如 a.times(2, 4).greedy(),如果出现了连续 4 个 a,那么会直接把 aaaa 检测出来进行处理,其他任意 2 个 a 是不算匹配事件的。
  5. .optional()
    使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。对于一个个体模式pattern 来说,后面所有可以添加的量词如下:
    // 匹配事件出现 4 次
    pattern.times(4);
    // 匹配事件出现 4 次,或者不出现
    pattern.times(4).optional();
    // 匹配事件出现 2, 3 或者 4 次
    pattern.times(2, 4);
    // 匹配事件出现 2, 3 或者 4 次,并且尽可能多地匹配
    pattern.times(2, 4).greedy();
    // 匹配事件出现 2, 3, 4 次,或者不出现
    pattern.times(2, 4).optional();
    // 匹配事件出现 2, 3, 4 次,或者不出现;并且尽可能多地匹配
    pattern.times(2, 4).optional().greedy();
    // 匹配事件出现 1 次或多次
    pattern.oneOrMore();
    // 匹配事件出现 1 次或多次,并且尽可能多地匹配
    pattern.oneOrMore().greedy();
    // 匹配事件出现 1 次或多次,或者不出现
    pattern.oneOrMore().optional();
    // 匹配事件出现 1 次或多次,或者不出现;并且尽可能多地匹配
    pattern.oneOrMore().optional().greedy();
    // 匹配事件出现 2 次或多次
    pattern.timesOrMore(2);
    // 匹配事件出现 2 次或多次,并且尽可能多地匹配
    pattern.timesOrMore(2).greedy();
    // 匹配事件出现 2 次或多次,或者不出现
    pattern.timesOrMore(2).optional()
    // 匹配事件出现 2 次或多次,或者不出现;并且尽可能多地匹配
    pattern.timesOrMore(2).optional().greedy();

正是因为个体模式可以通过量词定义为循环模式,一个模式能够匹配到多个事件,所以之前代码中事件的检测接收才会用 Map 中的一个列表(List)来保存。而之前代码中没有定义量词,都是单例模式,所以只会匹配一个事件,每个List 中也只有一个元素:
LoginEvent first = map.get(“first”).get(0);

三、条件(Conditions)

对于每个个体模式,匹配事件的核心在于定义匹配条件,也就是选取事件的规则。Flink CEP 会按照这个规则对流中的事件进行筛选,判断是否接受当前的事件。

对于条件的定义,主要是通过调用 Pattern 对象的.where()方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用 Pattern 对象的.subtype() 方法来限定匹配事件的子类型。接下来我们就分别进行介绍。

1. 限定子类型

调用.subtype()方法可以为当前模式增加子类型限制条件。例如:
pattern.subtype(SubEvent.class);

这里 SubEvent 是流中数据类型 Event 的子类型。这时,只有当事件是 SubEvent 类型时, 才可以满足当前模式pattern 的匹配条件。

2. 简单条件(Simple Conditions)

简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个filter 操作。

代码中我们为.where()方法传入一个 SimpleCondition 的实例作为参数。SimpleCondition 是表示"简单条件"的抽象类,内部有一个.filter()方法,唯一的参数就是当前事件。所以它可以当作 FilterFunction 来使用。

下面是一个具体示例:

                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.eventType);
                    }
                })

3. 迭代条件(Iterative Conditions)

简单条件只能基于当前事件做判断,能够处理的逻辑比较有限。在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作"迭代条件"(Iterative Condition)。

在 Flink CEP 中,提供了 IterativeCondition 抽象类。这其实是更加通用的条件表达,查看源码可以发现, .where() 方法本身要求的参数类型就是IterativeCondition ; 而之前的SimpleCondition 是它的一个子类。

在 IterativeCondition 中同样需要实现一个 filter()方法,不过与SimpleCondition 中不同的是,这个方法有两个参数:除了当前事件之外,还有一个上下文 Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。

下面是一个具体示例:

middle.oneOrMore()
        .where(new IterativeCondition<Event>() {
            @Override
            public boolean filter(Event value, Context<Event> ctx) throws Exception {
                // 事件中的 user 必须以 A 开头
                if (!value.user.startsWith("A")) {
                    return false;
                }
                int sum = value.amount;
                // 获取当前模式之前已经匹配的事件,求所有事件 amount 之和
                for (Event event : ctx.getEventsForPattern("middle")) {
                    sum += event.amount;
                }
                // 在总数量小于 100 时,当前事件满足匹配规则,可以匹配成功
                return sum < 100;
            }
        });

上面代码中当前模式名称就叫作"middle",这是一个循环模式,可以接受事件发生一次

或多次。于是下面的迭代条件中,我们通过ctx.getEventsForPattern(“middle”)获取当前模式已经接受的事件,计算它们的数量(amount)之和;再加上当前事件中的数量,如果总和小于100,就接受当前事件,否则就不匹配。当然,在迭代条件中我们也可以基于当前事件做出判断,比如代码中要求 user 必须以A 开头。最终我们的匹配规则就是:事件的 user 必须以A 开头;并且循环匹配的所有事件 amount 之和必须小于 100。这里的 Event 与之前定义的 POJO 不同,增加了amount 属性。

可以看到,迭代条件能够获取已经匹配的事件,如果自身又是循环模式(比如量词oneOrMore),那么两者结合就可以捕获自身之前接收的数据,据此来判断是否接受当前事件。这个功能非常强大,我们可以由此实现更加复杂的需求,比如可以要求"只有大于之前数据的平均值,才接受当前事件"。

另外迭代条件中的上下文Context 也可以获取到时间相关的信息,比如事件的时间戳和当前的处理时间(processing time)。

4. 组合条件(Combining Conditions)

如果一个个体模式有多个限定条件,又该怎么定义呢?

最直接的想法是,可以在简单条件或者迭代条件的.filter()方法中,增加多个判断逻辑。可以通过 if-else 的条件分支分别定义多个条件,也可以直接在 return 返回时给一个多条件的逻辑组合(与、或、非)。不过这样会让代码变得臃肿,可读性降低。更好的方式是独立定义多个条件,然后在外部把它们连接起来,构成一个"组合条件"(Combining Condition)。

最简单的组合条件,就是.where()后面再接一个.where()。因为前面提到过,一个条件就像是一个 filter 操作,所以每次调用.where()方法都相当于做了一次过滤,连续多次调用就表示多重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的"逻辑与"

(AND)。

而多个条件的逻辑或(OR),则可以通过.where()后加一个.or()来实现。这里的.or()方法与.where()一样,传入一个 IterativeCondition 作为参数,定义一个独立的条件;它和之前.where() 定义的条件只要满足一个,当前事件就可以成功匹配。

当然,子类型限定条件(subtype)也可以和其他条件结合起来,成为组合条件,如下所示:

pattern.subtype(SubEvent.class)
        .where(new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent value) {
                return ... // some condition
            }
        });

这里可以看到,SimpleCondition 的泛型参数也变成了 SubEvent,所以匹配出的事件就既满足子类型限制,又符合过滤筛选的简单条件;这也是一个逻辑与的关系。

5. 终止条件(Stop Conditions)

对于循环模式而言,还可以指定一个"终止条件"(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了。

终止条件的定义是通过调用模式对象的.until() 方法来实现的, 同样传入一个
IterativeCondition 作为参数。需要注意的是,终止条件只与 oneOrMore() 或者oneOrMore().optional()结合使用。因为在这种循环模式下,我们不知道后面还有没有事件可以匹配,只好把之前匹配的事件作为状态缓存起来继续等待,这等待无穷无尽;如果一直等下去, 缓存的状态越来越多,最终会耗尽内存。所以这种循环模式必须有个终点,当.until()指定的条件满足时,循环终止,这样就可以清空状态释放内存了。

个体模式更像语言中的运算符

文章作者: 雨中散步撒哈拉

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多