Flink Table-Apache Kafka SQL连接器

作者: ApacheFlink

Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

依赖

Kafka 连接器目前并不包含在 Flink 的二进制发行版中,请查阅这里了解如何在集群运行中引用 Kafka 连接器。

如何创建 Kafka 表

以下示例展示了如何创建 Kafka 表:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

可用的元数据

以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(W)。

只读列必须声明为 VIRTUAL 以在 INSERT INTO 操作中排除它们。

数据类型 描述 R/W
topic STRING NOT NULL Kafka 记录的 Topic 名。 R
partition INT NOT NULL Kafka 记录的 partition ID。 R
headers MAP<string, bytes=""> NOT NULL</string,> 二进制 Map 类型的 Kafka 记录头(Header)。 R/W
leader-epoch INT NULL Kafka 记录的 Leader epoch(如果可用)。 R
offset BIGINT NOT NULL Kafka 记录在 partition 中的 offset。 R
timestamp TIMESTAMP_LTZ(3) NOT NULL Kafka 记录的时间戳。 R/W
timestamp-type STRING NOT NULL Kafka 记录的时间戳类型。可能的类型有 "NoTimestampType", "CreateTime"(会在写入元数据时设置),或 "LogAppendTime"。 R

以下扩展的 CREATE TABLE 示例展示了使用这些元数据字段的语法:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

格式元信息

连接器可以读出消息格式的元数据。格式元数据的配置键以 'value.' 作为前缀。

以下示例展示了如何获取 Kafka 和 Debezium 的元数据字段:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
  `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

连接器参数

#一致性保证'>一致性保证 以获取更多细节。

参数 是否必选 默认值 数据类型 描述
connector
必选 (无) String 指定使用的连接器,Kafka 连接器使用 'kafka'
topic
required for sink (无) String 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。
topic-pattern
可选 (无) String 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。
properties.bootstrap.servers
必选 (无) String 逗号分隔的 Kafka broker 列表。
properties.group.id
对 source 可选,不适用于 sink (无) String Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 "KafkaSource-{tableIdentifier}" 作为消费组 ID。
properties.*
可选 (无) String #configuration">Kafka 配置文档 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer'  'value.deserializer'
format
必选 (无) String 用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'value.format' 二者必需其一。
key.format
可选 (无) String 用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 'key.fields' 也是必需的。 否则 Kafka 记录将使用空值作为键。
key.fields
可选 [] List<String> 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'
key.fields-prefix
可选 (无) String 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'
value.format
必选 (无) String 序列化和反序列化 Kafka 消息体时使用的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'format' 二者必需其一。
value.fields-include
可选 ALL

枚举类型

可选值:[ALL, EXCEPT_KEY]
定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 'ALL' 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。
scan.startup.mode
可选 group-offsets String Kafka consumer 的启动模式。有效值为:'earliest-offset''latest-offset''group-offsets''timestamp'  'specific-offsets'。 #起始消费位点">起始消费位点 以获取更多细节。
scan.startup.specific-offsets
可选 (无) String 在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'
scan.startup.timestamp-millis
可选 (无) Long 在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。
scan.topic-partition-discovery.interval
可选 (无) Duration Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。
sink.partitioner
可选 'default' String Flink partition 到 Kafka partition 的分区映射关系,可选值有:
  • default:使用 Kafka 默认的分区器对消息进行分区。
  • fixed:每个 Flink partition 最终对应最多一个 Kafka partition。
  • round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。
  • 自定义 FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'
#sink-分区">Sink 分区 以获取更多细节。
sink.semantic
可选 at-least-once String
sink.parallelism
可选 (无) Integer 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

特性

消息键(Key)与消息体(Value)的格式

Kafka 消息的消息键和消息体部分都可以使用某种 格式 来序列化或反序列化成二进制数据。 消息体格式

由于 Kafka 消息中消息键是可选的,以下语句将使用消息体格式读取和写入消息,但不使用消息键格式。 'format' 选项与 'value.format' 意义相同。

所有的格式配置使用格式识别符作为前缀。

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  ...
  'format' = 'json',
  'json.ignore-parse-errors' = 'true'
)

消息体格式将配置为以下的数据类型:

ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>

消息键和消息体格式

以下示例展示了如何配置和使用消息键和消息体格式。

格式配置使用 'key'  'value' 加上格式识别符作为前缀。

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  ...
  'key.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',
  'key.fields' = 'user_id;item_id',
  'value.format' = 'json',
  'value.json.fail-on-missing-field' = 'false',
  'value.fields-include' = 'ALL'
)

消息键格式包含了在 'key.fields' 中列出的字段(使用 ';' 分隔)和字段顺序。

因此将配置为以下的数据类型:

ROW<`user_id` BIGINT, `item_id` BIGINT>

由于消息体格式配置为 'value.fields-include' = 'ALL',所以消息键字段也会出现在消息体格式的数据类型中:

ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>

重名的格式字段

如果消息键字段和消息体字段重名,连接器无法根据表结构信息将这些列区分开。 'key.fields-prefix' 配置项可以在表结构中为消息键字段指定一个唯一名称,并在配置消息键格式的时候保留原名。

以下示例展示了在消息键和消息体中同时包含 version 字段的情况:

CREATE TABLE KafkaTable (
  `k_version` INT,
  `k_user_id` BIGINT,
  `k_item_id` BIGINT,
  `version` INT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  ...
  'key.format' = 'json',
  'key.fields-prefix' = 'k_',
  'key.fields' = 'k_version;k_user_id;k_item_id',
  'value.format' = 'json',
  'value.fields-include' = 'EXCEPT_KEY'
)

消息体格式必须配置为 'EXCEPT_KEY' 模式。格式将被配置为以下的数据类型:

消息键格式:
ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT>

消息体格式:
ROW<`version` INT, `behavior` STRING>

Topic 和 Partition 的探测

topic  topic-pattern 配置项决定了 source 消费的 topic 或 topic 的匹配规则。topic 配置项可接受使用分号间隔的 topic 列表,例如 topic-1;topic-2 topic-pattern 配置项使用正则表达式来探测匹配的 topic。例如 topic-pattern 设置为 test-topic-[0-9],则在作业启动时,所有匹配该正则表达式的 topic(以 test-topic- 开头,以一位数字结尾)都将被 consumer 订阅。

为允许 consumer 在作业启动之后探测到动态创建的 topic,请将 scan.topic-partition-discovery.interval 配置为一个非负值。这将使 consumer 能够探测匹配名称规则的 topic 中新的 partition。

请参阅 [Kafka DataStream 连接器文档] 以获取更多关于 topic 和 partition 探测的信息。

注意 topic 列表和 topic 匹配规则只适用于 source。对于 sink 端,Flink 目前只支持单一 topic。

起始消费位点

scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。有效值为:

  • `group-offsets`:从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
  • `earliest-offset`:从可能的最早偏移量开始。
  • `latest-offset`:从最末尾偏移量开始。
  • `timestamp`:从用户为每个 partition 指定的时间戳开始。
  • `specific-offsets`:从用户为每个 partition 指定的偏移量开始。

默认值 group-offsets 表示从 Zookeeper/Kafka 中最近一次已提交的偏移量开始消费。

如果使用了 timestamp,必须使用另外一个配置项 scan.startup.timestamp-millis 来指定一个从格林尼治标准时间 1970 年 1 月 1 日 00:00:00.000 开始计算的毫秒单位时间戳作为起始时间。

如果使用了 specific-offsets,必须使用另外一个配置项 scan.startup.specific-offsets 来为每个 partition 指定起始偏移量,

例如,选项值 partition:0,offset:42;partition:1,offset:300 表示 partition 0 从偏移量 42 开始,partition 1 从偏移量 300 开始。

CDC 变更日志(Changelog) Source

Flink 原生支持使用 Kafka 作为 CDC 变更日志(changelog) source。如果 Kafka topic 中的消息是通过变更数据捕获(CDC)工具从其他数据库捕获的变更事件,则你可以使用 CDC 格式将消息解析为 Flink SQL 系统中的插入(INSERT)、更新(UPDATE)、删除(DELETE)消息。

在许多情况下,变更日志(changelog) source 都是非常有用的功能,例如将数据库中的增量数据同步到其他系统,审核日志,数据库的物化视图,时态表关联数据库表的更改历史等。 Flink 提供了几种 CDC 格式:

Sink 分区

配置项 sink.partitioner 指定了从 Flink 分区到 Kafka 分区的映射关系。

默认情况下,Flink 使用 Kafka 默认分区器 来对消息分区。默认分区器对没有消息键的消息使用 粘性分区策略(sticky partition strategy) 进行分区,对含有消息键的消息使用 murmur2 哈希算法计算分区。

为了控制数据行到分区的路由,也可以提供一个自定义的 sink 分区器。'fixed' 分区器会将同一个 Flink 分区中的消息写入同一个 Kafka 分区,从而减少网络连接的开销。

一致性保证

默认情况下,如果查询在 [启用 checkpoint] 模式下执行时,Kafka sink 按照至少一次(at-lease-once)语义保证将数据写入到 Kafka topic 中。

当 Flink checkpoint 启用时,kafka 连接器可以提供精确一次(exactly-once)的语义保证。

除了启用 Flink checkpoint,还可以通过传入对应的 sink.semantic 选项来选择三种不同的运行模式:

  • none:Flink 不保证任何语义。已经写出的记录可能会丢失或重复。
  • at-least-once (默认设置):保证没有记录会丢失(但可能会重复)。
  • exactly-once:使用 Kafka 事务提供精确一次(exactly-once)语义。当使用事务向 Kafka 写入数据时,请将所有从 Kafka 中消费记录的应用中的 isolation.level 配置项设置成实际所需的值(read_committed  read_uncommitted,后者为默认值)。

请参阅 [Kafka 文档] 以获取更多关于语义保证的信息。

Source 按分区 Watermark

Flink 对于 Kafka 支持发送按分区的 watermark。Watermark 在 Kafka consumer 中生成。

按分区 watermark 的合并方式和在流 shuffle 时合并 Watermark 的方式一致。 Source 输出的 watermark 由读取的分区中最小的 watermark 决定。

如果 topic 中的某些分区闲置,watermark 生成器将不会向前推进。

你可以在表配置中设置 ['table.exec.source.idle-timeout'] 选项来避免上述问题。

请参阅 [Kafka watermark 策略] 以获取更多细节。

安全

要启用加密和认证相关的安全配置,只需将安全配置加上 “properties.” 前缀配置在 Kafka 表上即可。下面的代码片段展示了如何配置 Kafka 表以使用 PLAIN 作为 SASL 机制并提供 JAAS 配置:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)

另一个更复杂的例子,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /* SSL 配置 */
  /* 配置服务端提供的 truststore (CA 证书) 的路径 */
  'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /* 如果要求客户端认证,则需要配置 keystore (私钥) 的路径 */
  'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /* SASL 配置 */
  /* 将 SASL 机制配置为 as SCRAM-SHA-256 */
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /* 配置 JAAS */
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
)

如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此请根据登录模块在 JAR 中实际的类路径来改写以上配置。例如在 SQL client JAR 中,Kafka client 依赖被重置在了 org.apache.flink.kafka.shaded.org.apache.kafka 路径下,

因此 plain 登录模块的类路径应写为 org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule

关于安全配置的详细描述,请参阅 Apache Kafka 文档中的"安全"一节

数据类型映射

Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。Kafka 消息使用格式配置进行序列化和反序列化,例如 csv,json,avro。

因此,数据类型映射取决于使用的格式。请参阅 格式 页面以获取更多细节。

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多