Flink | 六十七、FlinkCEP之处理超时事件

作者: 雨中散步撒哈拉
  • [一、处理超时事件]
    • [1. 使用 PatternProcessFunction 的侧输出流]
    • [2. 使用 PatternTimeoutFunction]
    • [3. 应用实例]
      • [1. 需求场景]
      • [2. 编写订单实体类]
      • [3. 编写逻辑代码]

一、处理超时事件

复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下:

  1. 如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的 Map 中;
  2. 如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用PatternProcessFunction 的processMatch()方法进行处理;
  3. 如果当前事件不符合模式匹配的条件,就丢弃该事件;
  4. 如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么当前已检测的一组部分匹配事件都被丢弃,重新开始检测。

不过在有时间限制的情况下,需要考虑的问题会有一点特别。比如我们用.within()指定了模式检测的时间间隔,超出这个时间当前这组检测就应该失败了。然而这种"超时失败"跟真正的"匹配失败"不同,它其实是一种"部分成功匹配";因为只有在开头能够正常匹配的前提下,没有等到后续的匹配事件才会超时。所以往往不应该直接丢弃,而是要输出一个提示或报警信息。这就要求我们有能力捕获并处理超时事件。

1. 使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中, 提供了一个专门捕捉超时的部分匹配事件的接口, 叫作
TimedOutPartialMatchHandler。这个接口需要实现一个processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是PatternProcessFunction 的上下文Context。所以这个接口必须与PatternProcessFunction 结合使用,对处理结果的输出则需要利用侧输出流来进行。

代码中的调用方式如下:

        class MyPatternProcessFunction extends PatternProcessFunction<Event, String>
                implements TimedOutPartialMatchHandler<Event> {
            // 正常匹配事件的处理
            @Override
            public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception {
        ...
            }
            // 超时部分匹配事件的处理
            @Override
            public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception {
                Event startEvent = match.get("start").get(0);
                OutputTag<Event> outputTag = new OutputTag<Event>("time-out") {
                };
                ctx.output(outputTag, startEvent);
            }
        }

我们在 processTimedOutMatch()方法中定义了一个输出标签(OutputTag)。调用 ctx.output()方法,就可以将超时的部分匹配事件输出到标签所标识的侧输出流了。

2. 使用 PatternTimeoutFunction

上文提到的PatternProcessFunction 通过实现TimedOutPartialMatchHandler 接口扩展出了处理超时事件的能力,这是官方推荐的做法。此外, Flink CEP 中也保留了简化版的PatternSelectFunction , 它无法直接处理超时事件, 不过我们可以通过调用 PatternStream的.select()方法时多传入一个PatternTimeoutFunction 参数来实现这一点。
PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout() 方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。

由于调用.select()方法后会得到唯一的 DataStream,所以正常匹配事件和超时事件的处理结果不应该放在同一条流中。正常匹配事件的处理结果会进入转换后得到的 DataStream,而超时事件的处理结果则会进入侧输出流;这个侧输出流需要另外传入一个侧输出标签(OutputTag) 来指定。

所以最终我们在调用 PatternStream 的.select()方法时需要传入三个参数:侧输出流标签( OutputTag ), 超 时 事 件 处 理 函 数 PatternTimeoutFunction , 匹 配 事 件 提 取 函 数PatternSelectFunction。下面是一个代码中的调用方式:
// 定义一个侧输出流标签,用于标识超时侧输出流
OutputTag timeoutTag = new OutputTag(“timeout”) {
};
// 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出

SingleOutputStreamOperator<String> resultStream = patternStream
        .select(timeoutTag,
// 超时部分匹配事件的处理
                new PatternTimeoutFunction<Event, String>() {
                    @Override
                    public String timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
                        Event event = pattern.get("start").get(0);
                        return "超时:" + event.toString();
                    }
                },
// 正常匹配事件的处理
                new PatternSelectFunction<Event, String>() {
                    @Override
                    public String select(Map<String, List<Event>> pattern) throws Exception {
            ...
                    }
                });
// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");

这里需要注意的是,在超时事件处理的过程中,从 Map 里只能取到已经检测到匹配的那些事件; 如果取可能未匹配的事件并调用它的对象方法, 则可能会报空指针异常(NullPointerException)。另外,超时事件处理的结果进入侧输出流,正常匹配事件的处理结果进入主流,两者的数据类型可以不同。

3. 应用实例

  1. 需求场景

接下来我们看一个具体的应用场景。

在电商平台中,最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后, 用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。
2. 编写订单实体类

首先定义出要处理的数据类型。我们面对的是订单事件,主要包括用户对订单的创建(下单)和支付两种行为。因此可以定义 POJO 类 OrderEvent 如下,其中属性字段包括用户 ID、订单 ID、事件类型(操作类型)以及时间戳。

package my.learn.flink.domain;
public class OrderEvent {
    public String userId;
    public String orderId;
    public String eventType;
    public Long timestamp;
    public OrderEvent() {
    }
    public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {
        this.userId = userId;
        this.orderId = orderId;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }
    @Override
    public String toString() {
        return "OrderEvent{" +
                "userId='" + userId + '\'' + "orderId='" + orderId + '\'' +
                ", eventType='" + eventType + '\'' + ", timestamp=" + timestamp + '}';
    }
}
  1. 编写逻辑代码

当前需求的重点在于对超时未支付的用户进行监控提醒,也就是需要检测有下单行为、但15 分钟内没有支付行为的复杂事件。在下单和支付之间,可以有其他操作(比如对订单的修改),所以两者之间是宽松近邻关系。可以定义 Pattern 如下:

        // 定义规则
        Pattern<OrderEvent, OrderEvent> pattern = Pattern
                .<OrderEvent>begin("create")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "create".equals(value.eventType);
                    }
                })
                .followedBy("pay")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "pay".equals(value.eventType);
                    }
                })
                // 下单和支付限制在15分钟之内
                .within(Time.minutes(15));

很明显,我们重点要处理的是超时的部分匹配事件。对原始的订单事件流按照订单 ID 进行分组,然后检测每个订单的"下单-支付"复杂事件,如果出现超时事件需要输出报警提示信息。

整体代码实现如下:

package my.learn.flink.demoTest;
import my.learn.flink.domain.LoginEvent;
import my.learn.flink.domain.OrderEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class OrderPayTest {
    StreamExecutionEnvironment env = null;
    /**
     * 获取环境
     */
    @Before
    public void getEnv() {
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    }
    /**
     * 订单测试
     */
    @Test
    public void orderPayTest() {
        // 读入流
        KeyedStream<OrderEvent, String> keyedStream = env.fromElements(
                new OrderEvent("user_1", "order_1", "create", 1000L),
                new OrderEvent("user_2", "order_2", "create", 2000L),
                new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
                new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
                new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
                new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
        )
                .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<OrderEvent>() {
                                    @Override
                                    public long extractTimestamp(OrderEvent loginEvent, long l) {
                                        return loginEvent.timestamp;
                                    }
                                }
                        )
                )
                .keyBy(log -> log.userId);
        // 定义规则
        Pattern<OrderEvent, OrderEvent> pattern = Pattern
                .<OrderEvent>begin("create")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "create".equals(value.eventType);
                    }
                })
                .followedBy("pay")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "pay".equals(value.eventType);
                    }
                })
                // 下单和支付限制在15分钟之内
                .within(Time.minutes(15));
        // 规则应用到流上
        PatternStream<OrderEvent> patternStream = CEP.pattern(keyedStream, pattern);
        // 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
        SingleOutputStreamOperator<String> process = patternStream.process(new OrderPayPatternProcessFunction());
        // 将正常匹配和超时部分匹配的处理结果流打印输出
        process.print("payed");
        // 自定义侧输出标签
        OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {
        };
        process.getSideOutput(timeoutTag).print("timeout");
    }
    // 实现自定义的 PatternProcessFunction,需实现 TimedOutPartialMatchHandler 接口
    public static class OrderPayPatternProcessFunction extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {
        // 处理正常匹配事件
        @Override
        public void processMatch(Map<String, List<OrderEvent>> match, Context ctx, Collector<String> out) throws Exception {
            OrderEvent payEvent = match.get("pay").get(0);
            out.collect("订单 " + payEvent.orderId + " 已支付!");
        }
        // 处理超时未支付事件
        @Override
        public void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context ctx) throws Exception {
            OrderEvent createEvent = match.get("create").get(0);
            ctx.output(new OutputTag<String>("timeout") {
                       },
                    "订单 " + createEvent.orderId + " 超时未支付!用户为:" + createEvent.userId);
        }
    }
    /**
     * 执行任务
     */
    @After
    public void jobSubmit() {
        if (null != env) {
            try {
                env.execute();
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }
}

image.png

分析测试数据可以很直观地发现,订单 1 和订单 3 都在 15 分钟进行了支付,订单 1 中间的修改行为不会影响结果;而订单 2 未能支付,因此侧输出流输出了一条报警信息。且同一用户可以下多个订单,最后的判断只是基于同一订单做出的。这与我们预期的效果完全一致。用处理函数进行状态编程,结合定时器也可以实现同样的功能,但明显 CEP 的实现更加方便, 也更容易迁移和扩展。

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多