Flink | 六十九、FlinkCEP之CEP 的状态机实现

作者: 雨中散步撒哈拉
  • [一、CEP 的状态机]
  • [二、状态转移的过程]
  • [三、代码实现]

一、CEP 的状态机

Flink CEP 中对复杂事件的检测,关键在模式的定义。我们会发现 CEP 中模式的定义方式比较复杂,而且与正则表达式非常相似:正则表达式在字符串上匹配符合模板的字符序列,而Flink CEP 则是在事件流上匹配符合模式定义的复杂事件。

前面我们分析过 CEP 检测处理的流程,可以认为检测匹配事件的过程中会有"初始(没有任何匹配)"“检测中(部分匹配成功)”“匹配成功”“匹配失败"等不同的"状态”。随着每个事件的到来,都会改变当前检测的"状态";而这种改变跟当前事件的特性有关、也跟当前所处的状态有关。这样的系统,其实就是一个"状态机"(state machine)。这也正是正则表达式底层引擎的实现原理。

所以 Flink CEP 的底层工作原理其实与正则表达式是一致的,是一个"非确定有限状态自动机"(Nondeterministic Finite Automaton,NFA)。NFA 的原理涉及到较多数学知识,我们这里不做详细展开,而是用一个具体的例子来说明一下状态机的工作方式,以更好地理解 CEP 的原理。

我们回顾一下 简单案例的应用案例,检测用户连续三次登录失败的复杂事件。用 FlinkCEP 中的 Pattern API 可以很方便地把它定义出来;如果我们现在不用 CEP,而是用 DataStreamAPI 和处理函数来实现,应该怎么做呢?

这需要设置状态,并根据输入的事件不断更新状态。当然因为这个需求不是很复杂,我们也可以用嵌套的 if-else 条件判断将它实现,不过这样做的代码可读性和扩展性都会很差。更好的方式,就是实现一个状态机。

二、状态转移的过程

image.png

从初始状态(INITIAL)出发,遇到一个类型为fail 的登录失败事件,就开始进入部分匹配的状态;目前只有一个 fail 事件,我们把当前状态记作 S1。基于 S1 状态,如果继续遇到 fail 事件,那么就有两个 fail 事件,记作 S2。基于 S2 状态如果再次遇到 fail 事件,那么就找到了一组匹配的复杂事件,把当前状态记作 Matched, 就可以输出报警信息了。需要注意的是,报警完毕,需要立即重置状态回 S2;因为如果接下来再遇到 fail 事件,就又满足了新的连续三次登录失败,需要再次报警。

而不论是初始状态,还是 S1、S2 状态,只要遇到类型为 success 的登录成功事件,就会跳转到结束状态,记作 Terminal。此时当前检测完毕,之前的部分匹配应该全部清空,所以需要立即重置状态到 Initial,重新开始下一轮检测。所以这里我们真正参与状态转移的,其实只有 Initial、S1、S2 三个状态,Matched 和Terminal 是为了方便我们做其他操作(比如输出报警、清空状态)的"临时标记状态",不等新事件到来马上就会跳转。

三、代码实现

完整代码如下:

package my.learn.flink.demoTest;
import my.learn.flink.domain.LoginEvent;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class NFAExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 获取登录事件流,这里与时间无关,就不生成水位线了
        KeyedStream<LoginEvent, String> stream = env.fromElements(
                new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
                new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
                new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
        )
                .keyBy(r -> r.userId);
        // 将数据依次输入状态机进行处理
        DataStream<String> alertStream = stream
                .flatMap(new StateMachineMapper());
        alertStream.print("warning");
        env.execute();
    }
    @SuppressWarnings("serial")
    public static class StateMachineMapper extends RichFlatMapFunction<LoginEvent, String> {
        // 声明当前用户对应的状态
        private ValueState<State> currentState;
        @Override
        public void open(Configuration conf) {
            // 获取状态对象
            currentState = getRuntimeContext().getState(new ValueStateDescriptor<>("state", State.class));
        }
        @Override
        public void flatMap(LoginEvent event, Collector<String> out) throws Exception {
            // 获取状态,如果状态为空,置为初始状态
            State state = currentState.value();
            if (state == null) {
                state = State.Initial;
            }
            // 基于当前状态,输入当前事件时跳转到下一状态
            State nextState = state.transition(event.eventType);
            if (nextState == State.Matched) {
                // 如果检测到匹配的复杂事件,输出报警信息
                out.collect(event.userId + " 连续三次登录失败");
                // 需要跳转回 S2 状态,这里直接不更新状态就可以了
            } else if (nextState == State.Terminal) {
                // 如果到了终止状态,就重置状态,准备重新开始
                currentState.update(State.Initial);
                // 如果还没结束,更新状态(状态跳转),继续读取事件
                currentState.update(nextState);
            }
        }
    }
    // 状态机实现
    public enum State {
        Terminal,    // 匹配失败,当前匹配终止
        Matched,    // 匹配成功
        // S2 状态
        S2(new Transition("fail", Matched), new Transition("success", Terminal)),
        // S1 状态
        S1(new Transition("fail", S2), new Transition("success", Terminal)),
        // 初始状态
        Initial(new Transition("fail", S1), new Transition("success", Terminal));
        private final Transition[] transitions;    // 状态转移规则
        // 状态的构造方法,可以传入一组状态转移规则来定义状态
        State(Transition... transitions) {
            this.transitions = transitions;
        }
        // 状态的转移方法,根据当前输入事件类型,从定义好的转移规则中找到下一个状态
        public State transition(String eventType) {
            for (Transition t : transitions) {
                if (t.getEventType().equals(eventType)) {
                    return t.getTargetState();
                }
            }
            // 如果没有找到转移规则,说明已经结束,回到初始状态
            return Initial;
        }
    }
    // 定义状态转移类,包括两个属性:当前事件类型和目标状态
    public static class Transition implements Serializable {
        private static final long serialVersionUID = 1L;
        // 触发状态转移的当前事件类型
        private final String eventType;
        // 转移的目标状态
        private final State targetState;
        public Transition(String eventType, State targetState) {
            this.eventType = checkNotNull(eventType);
            this.targetState = checkNotNull(targetState);
        }
        public String getEventType() {
            return eventType;
        }
        public State getTargetState() {
            return targetState;
        }
    }
}

image.png

运行代码,可以看到输出与之前 CEP 的实现是完全一样的。显然,如果所有的复杂事件处理都自己设计状态机来实现是非常繁琐的,而且中间逻辑非常容易出错;所以 Flink CEP 将底层NFA 全部实现好并封装起来,这样我们处理复杂事件时只要调上层的 Pattern API 就可以, 无疑大大降低了代码的复杂度,提高了编程的效率。

文章作者: 雨中散步撒哈拉

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多