Flink DataStream-RabbitMQ连接器

作者: ApacheFlink

RabbitMQ 连接器的许可证

Flink 的 RabbitMQ 连接器依赖了 “RabbitMQ AMQP Java Client”,它基于三种协议下发行:Mozilla Public License 1.1 (“MPL”)、GNU General Public License version 2 (“GPL”) 和 Apache License version 2 (“ASL”)。 Flink 自身既没有复用 “RabbitMQ AMQP Java Client” 的代码,也没有将 “RabbitMQ AMQP Java Client” 打二进制包。

如果用户发布的内容是基于 Flink 的 RabbitMQ 连接器的(进而重新发布了 “RabbitMQ AMQP Java Client” ),那么一定要注意这可能会受到 Mozilla Public License 1.1 (“MPL”)、GNU General Public License version 2 (“GPL”)、Apache License version 2 (“ASL”) 协议的限制.

RabbitMQ 连接器

这个连接器可以访问 RabbitMQ 的数据流。使用这个连接器,需要在工程里添加下面的依赖:

注意连接器现在没有包含在二进制发行版中。集群执行的相关信息请参考 [这里].

安装 RabbitMQ

安装 RabbitMQ 请参考 RabbitMQ 下载页面。安装完成之后,服务会自动拉起,应用程序就可以尝试连接到 RabbitMQ 了。

RabbitMQ Source

RMQSource 负责从 RabbitMQ 中消费数据,可以配置三种不同级别的保证:

  1. 精确一次: 保证精确一次需要以下条件 -
  2. 开启 checkpointing: 开启 checkpointing 之后,消息在 checkpoints 完成之后才会被确认(然后从 RabbitMQ 队列中删除).
  3. 使用关联标识(Correlation ids): 关联标识是 RabbitMQ 的一个特性,消息写入 RabbitMQ 时在消息属性中设置。 从 checkpoint 恢复时有些消息可能会被重复处理,source 可以利用关联标识对消息进行去重。
  4. 非并发 source: 为了保证精确一次的数据投递,source 必须是非并发的(并行度设置为1)。 这主要是由于 RabbitMQ 分发数据时是从单队列向多个消费者投递消息的。
  5. 至少一次: 在 checkpointing 开启的条件下,如果没有使用关联标识或者 source 是并发的,

那么 source 就只能提供至少一次的保证。

  1. 无任何保证: 如果没有开启 checkpointing,source 就不能提供任何的数据投递保证。

使用这种设置时,source 一旦接收到并处理消息,消息就会被自动确认。

下面是一个保证 exactly-once 的 RabbitMQ source 示例。 注释部分展示了更加宽松的保证应该如何配置。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();

final DataStream<String> stream = env
    .addSource(new RMQSource<String>(
        connectionConfig,            // config for the RabbitMQ connection
        "queueName",                 // name of the RabbitMQ queue to consume
        true,                        // use correlation ids; can be false if only at-least-once is required
        new SimpleStringSchema()))   // deserialization schema to turn messages into Java objects
    .setParallelism(1);              // non-parallel source is only required for exactly-once
val env = StreamExecutionEnvironment.getExecutionEnvironment
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...)
val connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build

val stream = env
        connectionConfig,            // config for the RabbitMQ connection
        "queueName",                 // name of the RabbitMQ queue to consume
        true,                        // use correlation ids; can be false if only at-least-once is required
        new SimpleStringSchema))     // deserialization schema to turn messages into Java objects
    .setParallelism(1)               // non-parallel source is only required for exactly-once
env = StreamExecutionEnvironment.get_execution_environment()
# checkpointing is required for exactly-once or at-least-once guarantees
env.enable_checkpointing(...)
connection_config = RMQConnectionConfig.Builder() \
    .set_host("localhost") \
    .set_port(5000) \
    ...
    .build()
stream = env \
    .add_source(RMQSource(
        connection_config,
        "queueName",
        True,
        SimpleStringSchema(),
    )) \
    .set_parallelism(1)

服务质量 (QoS) / 消费者预取(Consumer Prefetch)

RabbitMQ Source 通过 RMQConnectionConfig 类提供了一种简单的方式,来设置 source channel 上的 basicQos(见下方示例)。要注意的是这里的 prefetch count 是对单个 channel 设置的,并且由于每个并发的 source 都持有一个 connection/channel,因此这个值实际上会乘以 source 的并行度,来表示同一时间可以向这个 job 总共发送多少条未确认的消息。如果需要更复杂的配置,可以通过重写 RMQSource#setupChannel(Connection) 方法来实现手动配置。

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setPrefetchCount(30_000)
    ...
    .build();
val connectionConfig = new RMQConnectionConfig.Builder()
    .setPrefetchCount(30000)
    ...
    .build
connection_config = RMQConnectionConfig.Builder() \
    .set_prefetch_count(30000) \
    ...
    .build()

RabbitMQ Source 默认情况下是不设置 prefetch count 的,这意味着 RabbitMQ 服务器将会无限制地向 source 发送消息。因此在生产环境中,最好要设置它。当消费海量数据的队列并且启用 checkpointing 时,消息只有在做完 checkpoint 后才会被确认,因此也许需要对 prefetch count 做一些调整来减少不必要的循环。

更多关于 QoS 以及 prefetch 相关的内容可以参考 这里.

更多关于在 AMQP 0-9-1 中可选的选项可以参考 这里.

RabbitMQ Sink

该连接器提供了一个 RMQSink 类,用来向 RabbitMQ 队列发送数据。下面是设置 RabbitMQ sink 的代码示例:

final DataStream<String> stream = ...
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();

stream.addSink(new RMQSink<String>(
    connectionConfig,            // config for the RabbitMQ connection
    "queueName",                 // name of the RabbitMQ queue to send messages to
    new SimpleStringSchema()));  // serialization schema to turn Java objects to messages
val stream: DataStream[String] = ...
val connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build

    connectionConfig,         // config for the RabbitMQ connection
    "queueName",              // name of the RabbitMQ queue to send messages to
    new SimpleStringSchema))  // serialization schema to turn Java objects to messages
stream = ...
connection_config = RMQConnectionConfig.Builder() \
    .set_host("localhost") \
    .set_port(5000) \
    ...
    .build()
stream.add_sink(RMQSink(
# config for the RabbitMQ connection
# name of the RabbitMQ queue to send messages to
# serialization schema to turn Java objects to messages

更多关于 RabbitMQ 的信息请参考 这里.

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多