Flink | 六十三、FlinkCEP之简单案例

作者: 雨中散步撒哈拉
  • [一、案例需求]
  • [二、引入依赖]
  • [三、编写代码]
  • [1. 创建访问事件类]
  • [2. 编写逻辑代码]

    生活中的FlinkCEP
    有一篇文章,我要搜索关键词为:刘东东
    那么你搜索的流程是什么呢??
    1.确定关键词–这里已经给定为:刘东东
    2.windows系统快捷键为CTRL+F,进行弹窗,把关键词输入
    3.进行点击查询

一、案例需求

考虑一个具体的需求:检测用户行为,如果连续三次登录失败,就输出报警信息。很显然,这是一个复杂事件的检测处理,我们可以使用 Flink CEP 来实现。

二、引入依赖

想要在代码中使用 Flink CEP,需要在项目的pom 文件中添加相关依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_$</artifactId>
            <version>$</version>
        </dependency>

为了精简和避免依赖冲突,Flink 会保持尽量少的核心依赖。所以核心依赖中并不包括任何的连接器(conncetor)和库,这里的库就包括了 SQL、CEP 以及 ML 等等。所以如果想要在 Flink 集群中提交运行CEP 作业,应该向 Flink SQL 那样将依赖的jar 包放在/lib 目录下。

从这个角度来看,Flink CEP 和 Flink SQL 一样,都是最顶层的应用级 API。

三、编写代码

需求知道了,环境准备好了,接下来就是编写代码的时候了

  1. 创建访问事件类

首先定义数据的类型。这里的用户行为不再是之前的访问事件 Event 了,所以应该单独定义一个登录事件POJO 类。

package my.learn.flink.domain;
public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;
    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }
    public LoginEvent() {
    }
    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId='" + userId + '\'' +
                ", ipAddress='" + ipAddress + '\'' + ", eventType='" + eventType + '\'' + ", timestamp=" + timestamp +
                '}';
    }
}
  1. 编写逻辑代码

接下来就是业务逻辑的编写。Flink CEP 在代码中主要通过 Pattern API 来实现。之前我们已经介绍过,CEP 的主要处理流程分为三步,对应到 Pattern API 中就是:

  1. 定义一个模式(Pattern);

  2. 将Pattern 应用到DataStream 上,检测满足规则的复杂事件,得到一个PatternStream;

  3. 对 PatternStream 进行转换处理,将检测到的复杂事件提取出来,包装成报警信息输出。
    具体代码实现如下:

    package my.learn.flink.demoTest; import my.learn.flink.domain.LoginEvent; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.List; import java.util.Map; public class FlinkCEPTest {

    StreamExecutionEnvironment env = null;
    /**
     * 获取环境
     */
    @Before
    public void getEnv() {
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    }
    /**
     * 从集合中获取数据
     */
    @Test
    public void firstList() {
        // 读入流
        KeyedStream<LoginEvent, String> keyedStream = env.fromElements(
                new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 4000L), new LoginEvent("user_1", "171.56.23.10", "fail", 5000L), new LoginEvent("user_2", "192.168.1.29", "success", 6000L), new LoginEvent("user_2", "192.168.1.29", "fail", 7000L), new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
        )
                .assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<LoginEvent>() {
                                    @Override
                                    public long extractTimestamp(LoginEvent loginEvent, long l) {
                                        return loginEvent.timestamp;
                                    }
                                }
                        )
                )
                .keyBy(log -> log.userId);
        // 定义规则
        Pattern<LoginEvent, LoginEvent> pattern = Pattern
                // 第一次
                .<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.eventType);
                    }
                })
                // 第二次
                .next("second")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.eventType);
                    }
                })
                // 第三次
                .next("third")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.eventType);
                    }
                });
        // 规则应用到流上
        PatternStream<LoginEvent> patternStream = CEP.pattern(keyedStream, pattern);
        // 匹配出的结果
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
                        LoginEvent first = map.get("first").get(0);
                        LoginEvent second = map.get("second").get(0);
                        LoginEvent third = map.get("third").get(0);
                        return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
                    }
                })
                .print();
    }
    /**
     * 执行任务
     */
    @After
    public void jobSubmit() {
        if (null != env) {
            try {
                env.execute();
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }
    

    }

image.png

在上面的程序中,模式中的每个简单事件,会用一个.where()方法来指定一个约束条件, 指明每个事件的特征,这里就是 eventType 为"fail”。

而模式里表示事件之间的关系时,使用了 .next() 方法。next 是"下一个"的意思,表示紧挨着、中间不能有其他事件(比如登录成功),这是一个严格近邻关系。第一个事件用.begin()方法表示开始。所有这些"连接词"都可以有一个字符串作为参数,这个字符串就可以认为是当前简单事件的名称。所以我们如果检测到一组匹配的复杂事件,里面就会有连续的三个登录失败事件,它们的名称分别叫作"first”“second"和"third”。

在第三步处理复杂事件时, 调用了 PatternStream 的.select() 方法, 传入一个PatternSelectFunction 对检测到的复杂事件进行处理。而检测到的复杂事件,会放在一个 Map 中;PatternSelectFunction 内.select()方法有一个类型为 Map的参数map,里面就保存了检测到的匹配事件。这里的 key 是一个字符串,对应着事件的名称,而 value是 LoginEvent 的一个列表,匹配到的登录失败事件就保存在这个列表里。最终我们提取 userId 和三次登录的时间戳,包装成字符串输出一个报警信息。

运行代码可以得到结果如下:

warning> user_1 连续三次登录失败!登录时间:2000, 3000, 5000

可以看到,user_1 连续三次登录失败被检测到了;而 user_2 尽管也有三次登录失败,但中间有一次登录成功,所以不会被匹配到。

文章作者: 雨中散步撒哈拉

文章链接: https://liudongdong.top/archives/flinkliu-shi-san-flinkcep-zhi-jian-dan-an-li

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多