Apache StreamPark-JDBC

作者: Apache StreamPark

介绍

支持:

  • Scan Source: Bounded
  • Lookup Source: Sync Mode
  • Sink: Batch Sink:
  • Streaming Append & Upsert Mode
    JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。

如果在 DDL 中定义了主键,则JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。

依赖

为了使用JDBC连接器,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

注意自己使用的 flink 和 scala 版本。
JDBC 连接器并不是发布版的一部分,需要自己手动添加依赖。

在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:

Driver Group Id Artifact Id JAR
MySQL mysql mysql-connector-java 下载
从flink-1.15.x开始支持
Oracle
com.oracle.database.jdbc ojdbc8 下载
PostgreSQL org.postgresql postgresql 下载
Derby org.apache.derby derby 下载

当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅这里了解在集群上执行时何连接它们。

创建 JDBC 表

JDBC table 可以按如下定义:

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
    id BIGINT,
    name STRING,
    age INT,
    status BOOLEAN,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    'table-name' = 'users'
);
-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;
-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;

连接器参数

参数 要求 从 flink-1.15.x 开始支持
是否可传递
默认值 类型 描述
connector 必填 (none) String 指定使用什么类型的连接器,这里应该是jdbc
url 必填 (none) String JDBC 数据库 url
table-name 必填 (none) String 连接到 JDBC 表的名称。
driver 可选 (none) String 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username 可选 (none) String JDBC 用户名。如果指定了 username  password 中的任一参数,则两者必须都被指定。
password 可选 (none) String JDBC 密码。
connection.max-retry-timeout 可选 60s Duration 最大重试超时时间,以秒为单位且不应该小于 1 秒。
scan.partition.column 可选 (none) String 用于将输入数据进行分区的列名。请参阅下面的分区扫描部分了解更多详情。
scan.partition.num 可选 (none) Integer 分区数。
scan.partition.lower-bound 可选 (none) Integer 第一个分区的最小值。
scan.partition.upper-bound 可选 (none) Integer 最后一个分区的最大值。
scan.fetch-size 可选 0 Integer 每次循环读取时应该从数据库中获取的行数。如果指定的值为 0,则该配置项会被忽略。
scan.auto-commit 可选 true Boolean 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。
lookup.cache.max-rows 可选 (none) Integer lookup cache 的最大行数,若超过该值,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。请参阅下面的 Lookup Cache 部分了解更多详情。
lookup.cache.ttl 可选 (none) Duration lookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。请参阅下面的 Lookup Cache 部分了解更多详情。

| 从flink-1.15.x开始支持
lookup.cache.caching-missing-key | 可选 | 是 | true | Boolean | 是否缓存未命中的 key ,默认为 true 。 |

| lookup.max-retries | 可选 | 是 | 3 | Integer | 查询数据库失败的最大重试时间。 |
| sink.buffer-flush.max-rows | 可选 | 是 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 0 来禁用它。 |
| sink.buffer-flush.interval | 可选 | 是 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 0 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 sink.buffer-flush.max-rows 设置为 0 并配置适当的 flush 时间间隔。 |
| sink.max-retries | 可选 | 是 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
| sink.parallelism | 可选 | 否 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |

特性

键处理

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

 upsert 模式下,Flink 将根据主键判断是插入新行还是更新已存在的行,这种方式可以确保幂等性。

为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。

在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。

有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL

分区扫描

为了在并行 Source task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。

如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。

这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。

注意,scan.partition.lower-bound  scan.partition.upper-bound 用于决定分区的起始位置和结束位置,以过滤表中的数据。

如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。

  • scan.partition.column:用于进行分区的列名。
  • scan.partition.num:分区数。
  • scan.partition.lower-bound:第一个分区的最小值。
  • scan.partition.upper-bound:最后一个分区的最大值。
    注意

最大值和最小值设置,对于数据库中的时间和日期字段,该值应该使用毫秒值,而不是字符串。
flink会根据分区数对整个分区范围进行切分,然后将切分后的所有区间分配给所有source并行度。
flink生成的查询sql语句最后面会添加 where 分区字段过滤条件,使用 between 关键字。

Lookup Cache

JDBC 连接器可以在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。

默认情况下,lookup cache 是未启用的,可以通过设置 lookup.cache.max-rows  lookup.cache.ttl 参数来启用。
lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。

默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。
Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。

当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。

缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更新的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。
flink-1.15.x:默认情况下,flink 会缓存主键查询为空的结果,你可以设置参数 lookup.cache.caching-missing-key  false 来改变这个行为。

幂等写入

如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语义。upsert 语义指的是如果数据主键值在数据库中已存在,则更新现有行;如果不存在,则插入新行,这种方式确保了幂等性。

如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,但这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。

除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是符合用户期待的。
upsert 没有标准的语法,下表描述了不同数据库的 DML 语法:

Database Upsert Grammar
MySQL INSERT … ON DUPLICATE KEY UPDATE …
从flink-1.15.x开始支持
Oracle
MERGE INTO … USING (…) ON (…)
WHEN MATCHED THEN UPDATE SET (…)
WHEN NOT MATCHED THEN INSERT (…)
VALUES (…)
PostgreSQL INSERT … ON CONFLICT … DO UPDATE SET …

Postgres 数据库作为 Catalog

注意,该特性只支持到 flink-1.14.x ,在 flink-1.15.x 中,将 postgre  mysql 合并到一起实现了。
JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,PostgresCatalog 是 JDBC Catalog 的唯一实现,PostgresCatalog 只支持有限的 Catalog 方法,包括:

// Postgres Catalog 支持的方法
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)

其他的 Catalog 方法现在还是不支持的。

PostgresCatalog 的使用

请参阅 Dependencies 部分了解如何配置 JDBC 连接器和 Postgres 驱动。
Postgres catalog 支持以下参数:

  • name:必填,catalog 的名称。
  • default-database:必填,默认要连接的数据库。
  • username:必填,Postgres 账户的用户名。
  • password:必填,账户的密码。
  • base-url:必填,应该符合 jdbc:postgresql://<ip>:<port> 的格式,同时这里不应该包含数据库名。
    SQL
CREATE CATALOG mypg WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);
USE CATALOG mypg;

PostgresSQL 元空间映射

除了数据库之外,postgresSQL 还有一个额外的命名空间 schema。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为 public,每个 schema 可以包含多张表。

在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用 schema_name.table_name 或只写 table_name,其中 schema_name 是可选的,默认值为 public

因此,Flink Catalog 和 Postgres 之间的元空间映射如下:

Flink Catalog 元空间结构 Postgres 元空间结构
catalog 名称 (只能在flink中定义) N/A
database 名称 database 名称
table 名称 [schema_name.]table_name

Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义<schema.table>,也就是使用转义字符 ` 将其括起来。

这里提供了一些访问 Postgres 表的例子:

-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- 扫描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定义 schema 不能省略,并且必须与表一起转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

JDBC Catalog

从 flink-1.15.x 开始支持,并且将 postgre 和 mysql 合并到了一起实现。
JdbcCatalog 允许用户使用 flink 通过 JDBC 协议去连接关系型数据库。

目前已经有两个 JDBC catalog 的实现,Postgres Catalog  MySQL Catalog 。他们支持下面的 catalog 函数,其他函数目前还不支持。

// Postgres 和 MySQL Catalog 支持的函数
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);

其他的 catalog 函数目前还不支持。

使用 JDBC catalog

该章节描述怎么创建并使用 Postgres Catalog 或 MySQL Catalog 。怎么添加 JDBC 连接器和相关的驱动,请参考上面的依赖章节。
JDBC catalog 支持下面的选项配置:

  • name:必选,catalog的名称。
  • default-database:必选,连快将诶的默认数据库。
  • username:必选,Postgres/MySQL 账户的用户名。
  • password:必选,账户的密码。
  • base-url:必选,不要包含数据库名称。
    • 对于 Postgres Catalog ,该参数应该为:jdbc:postgresql://<ip>:<port>
    • 对于 MySQL Catalog ,该参数应该为:jdbc:mysql://<ip>:<port>
      SQL:
CREATE CATALOG my_catalog WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);
USE CATALOG my_catalog;

JDBC Catalog for PostgreSQL

PostgreSQL 元空间映射。
PostgreSQL 基于数据库有一个额外的命名空间作为 schema 。一个 Postgres 示例可以有多个数据库,每个数据可以有多个 schema ,默认的 schema 名为 public ,每个 schema 可以有多张表。

在 flink 中,当查询注册到 Postgres catalog 中的表时,用户可以使用 schema_name.table_name 或者是只使用 table_name 。schema_name 是可选的,默认为 public 

在 flink catalog 和 Postgres 之间的元空间映射如下:

Flink Catalog 元空间结构 Postgres 元空间结构
catalog name (只能在 flink 中定义) N/A
database name database name
table name [schema_name.]table_name

Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义<schema.table>,也就是使用转义字符 ` 将其括起来。

下面是一些访问 Postgres 表的案例:

-- 浏览名为 'public' 的 schema 下的 'test_table' 表,使用默认的 schema,schema 名称可以被省略。
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- 浏览名为 'custom_schema' 的 schema 下的 'test_table2' 表,自定义 schema 不能被省略,而且必须和表一起被转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

JDBC Catalog for MySQL

MySQL 元空间映射。
MySQL 实例中的数据库和注册到 MySQL catalog 中的数据库有相同的映射级别。一个 MySQL 实例可以有多个数据库,每个数据库可以有多张表。

在 flink 中,当查询注册到 MySQL catalog 中的表时,用户可以使用 database.table_name 或者只指定 table_name 。默认的数据库名为创建 MySQL Catalog 时指定的默认数据库。
flink Catalog 和 MySQL Catalog 之间的元空间映射关系如下:

Flink Catalog Metaspace Structure MySQL Metaspace Structure
catalog name (只能在 flink 中定义) N/A
database name database name
table name table_name

flink 中的 MySQL 表的完全路径应该为:

`<catalog>`.`<db>`.`<table>`

下面是一些访问 MySQL 表的案例:

-- 浏览 'test_table' 表,默认数据库为 'mydb'
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
-- 浏览指定数据库下的 'test_table' 表。
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;

数据类型映射

Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQLPostgresSQLDerby 等。

其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。

MySQL type PostgreSQL type Flink SQL type
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT BIGINT
FLOAT REAL
FLOAT4
FLOAT
DOUBLE
DOUBLE PRECISION
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEAN BOOLEAN
DATE DATE DATE
TIME [§] TIME [§] [WITHOUT TIMEZONE] TIME [§] [WITHOUT TIMEZONE]
DATETIME [§] TIMESTAMP [§] [WITHOUT TIMEZONE] TIMESTAMP [§] [WITHOUT TIMEZONE]
CHAR(n)

VARCHAR(n)
TEXT
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
BYTEA BYTES
ARRAY ARRAY

文章列表

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多