- [一、通用对接外部配置]
- [二、对接Kafka]
- [1.引入依赖]
- [2. 创建连接到 Kafka 的表]
- [3. Upsert Kafka]
- [三、文件系统]
- [四、JDBC]
- [1. 引入依赖]
- [2. 创建 JDBC 表]
- [五、Elasticsearch]
- [1. 引入依赖]
- [2. 创建连接到 Elasticsearch 的表]
- [六、HBase]
- [1. 引入依赖]
- [2. 创建连接到HBase 的表]
- [七、Hive]
- [1. 引入依赖]
- [2. 连接到Hive]
- [3. 设置 SQL 方言]
- [1. SQL 中设置]
- [2. Table API 中设置]
- [4. 读写 Hive 表]
之前测试的时候,有写过输出到系统文件中,那其它框架怎么对接呢?
是不是给定第三方的配置信息就可以了呢?一起探索吧!
一、通用对接外部配置
在 Table API 和 SQL 编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器(connector),这样就可以连接到外部系统进行数据交互了。
架构中的TableSource 负责从外部系统中读取数据并转换成表,TableSink 则负责将结果表写入外部系统。在 Flink 1.13 的API 调用中,已经不去区分 TableSource 和TableSink,我们只要建立到外部系统的连接并创建表就可以,Flink 自动会从程序的处理逻辑中解析出它们的用途。
Flink 的Table API 和 SQL 支持了各种不同的连接器。当然,最简单的其实就是之前提到的连接到控制台打印输出:
CREATE TABLE ResultTable ( user STRING,
cnt BIGINT WITH (
'connector' = 'print'
);
这里只需要在WITH 中定义 connector 为print 就可以了。而对于其它的外部系统,则需要增加一些配置项。
二、对接Kafka
Kafka 的 SQL 连接器可以从 Kafka 的主题(topic)读取数据转换成表,也可以将表数据写入Kafka 的主题。换句话说,创建表的时候指定连接器为Kafka,则这个表既可以作为输入表,也可以作为输出表。
1.引入依赖
想要在 Flink 程序中使用 Kafka 连接器,需要引入如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$</artifactId>
<version>$</version>
</dependency>
这里我们引入的 Flink 和 Kafka 的连接器,与之前DataStream API 中引入的连接器是一样的。如果想在 SQL 客户端里使用 Kafka 连接器,还需要下载对应的 jar 包放到 lib 目录下。
另外,Flink 为各种连接器提供了一系列的"表格式"(table formats),比如 CSV、JSON、Avro、Parquet 等等。这些表格式定义了底层存储的二进制数据和表的列之间的转换方式,相当于表的序列化工具。对于 Kafka 而言,CSV、JSON、Avro 等主要格式都是支持的,根据Kafka 连接器中配置的格式,我们可能需要引入对应的依赖支持。以CSV 为例:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>$</version>
</dependency>
由于 SQL 客户端中已经内置了 CSV、JSON 的支持,因此使用时无需专门引入;而对于没有内置支持的格式(比如 Avro),则仍然要下载相应的 jar 包。关于连接器的格式细节详见官网说明,我们后面就不再讨论了。
2. 创建连接到 Kafka 的表
创建一个连接到Kafka 表,需要在CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为Kafka,并定义必要的配置参数。
下面是一个具体示例:
CREATE TABLE KafkaTable (
`user` STRING,
`url` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka', 'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
这里定义了 Kafka 连接器对应的主题(topic),Kafka 服务器,消费者组 ID,消费者起始模式以及表格式。需要特别说明的是,在 KafkaTable 的字段中有一个 ts,它的声明中用到了METADATA FROM,这是表示一个"元数据列"(metadata column),它是由 Kafka 连接器的元数据"timestamp"生成的。这里的 timestamp 其实就是Kafka 中数据自带的时间戳,我们把它直接作为元数据提取出来,转换成一个新的字段 ts。
3. Upsert Kafka
正常情况下,Kafka 作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对应在表中就是仅追加(append-only)模式。如果我们想要将有更新操作(比如分组聚合)的结果表写入Kafka,就会因为 Kafka 无法识别撤回(retract)或更新插入(upsert)消息而导致异常。
为了解决这个问题,Flink 专门增加了一个"更新插入Kafka"(Upsert Kafka)连接器。这个连接器支持以更新插入(UPSERT)的方式向 Kafka 的 topic 中读写数据。
具体来说,Upsert Kafka 连接器处理的是更新日志(changlog)流。如果作为 TableSource, 连接器会将读取到的topic 中的数据(key, value),解释为对当前key 的数据值的更新(UPDATE),也就是查找动态表中key 对应的一行数据,将 value 更新为最新的值;因为是Upsert 操作,所以如果没有 key 对应的行,那么也会执行插入(INSERT)操作。另外,如果遇到 value 为空(null),连接器就把这条数据理解为对相应 key 那一行的删除(DELETE)操作。
如果作为 TableSink,Upsert Kafka 连接器会将有更新操作的结果表,转换成更新日志(changelog)流。如果遇到插入(INSERT)或者更新后(UPDATE_AFTER)的数据,对应的是一个添加(add)消息,那么就直接正常写入 Kafka 主题;如果是删除(DELETE)或者更新前的数据,对应是一个撤回(retract)消息,那么就把 value 为空(null)的数据写入 Kafka。由于 Flink 是根据键(key)的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。
下面是一个创建和使用Upsert Kafka 表的例子:
CREATE TABLE pageviews_per_region ( user_region STRING,
pv BIGINT, uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka', 'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...', 'key.format' = 'avro',
'value.format' = 'avro'
);
CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = '...', 'format' = 'json'
);
-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region SELECT
user_region, COUNT(*),
COUNT(DISTINCT user_id) FROM pageviews
GROUP BY user_region;
这里我们从 Kafka 表 pageviews 中读取数据,统计每个区域的 PV(全部浏览量)和 UV
(对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。为了将结果表写入Kafka 的 pageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中需要用PRIMARY KEY 来指定主键,并且在WITH 子句中分别指定key和value的序列化格式。
三、文件系统
另一类非常常见的外部系统就是文件系统(File System)了。Flink 提供了文件系统的连接器,支持从本地或者分布式的文件系统中读写数据。这个连接器是内置在 Flink 中的,所以使用它并不需要额外引入依赖。
下面是一个连接到文件系统的示例:
CREATE TABLE MyTable (
column_name1 INT, column_name2 STRING,
...
part_name1 INT, part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- 连接器类型
'path' = '...', -- 文件路径'
format' = '...' -- 文件格式
这里在 WITH 前使用了 PARTITIONED BY 对数据进行了分区操作。文件系统连接器支持对分区文件的访问。
四、JDBC
关系型数据表本身就是 SQL 最初应用的地方,所以我们也会希望能直接向关系型数据库中读写表数据。Flink 提供的 JDBC 连接器可以通过JDBC 驱动程序(driver)向任意的关系型数据库读写数据,比如 MySQL、PostgreSQL、Derby 等。
作为 TableSink 向数据库写入数据时,运行的模式取决于创建表的 DDL 是否定义了主键(primary key)。如果有主键,那么 JDBC 连接器就将以更新插入(Upsert)模式运行,可以向外部数据库发送按照指定键(key)的更新(UPDATE)和删除(DELETE)操作;如果没有定义主键,那么就将在追加(Append)模式下运行,不支持更新和删除操作。
1. 引入依赖
想要在 Flink 程序中使用 JDBC 连接器,需要引入如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_$</artifactId>
<version>$</version>
</dependency>
此外,为了连接到特定的数据库,我们还用引入相关的驱动器依赖,比如 MySQL:
mysql
mysql-connector-java
5.1.38
这里引入的驱动器版本是 5.1.38,读者可以依据自己的 MySQL 版本来进行选择。
2. 创建 JDBC 表
创建 JDBC 表的方法与前面 Upsert Kafka 大同小异。下面是一个具体示例:
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable ( id BIGINT,
name STRING, age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users'
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;
这里创建表的DDL 中定义了主键,所以数据会以Upsert 模式写入到 MySQL 表中;而到MySQL 的连接,是通过 WITH 子句中的 url 定义的。要注意写入 MySQL 中真正的表名称是users,而 MyTable 是注册在 Flink 表环境中的表。
五、Elasticsearch
Elasticsearch 作为分布式搜索分析引擎,在大数据应用中有非常多的场景。Flink 提供的Elasticsearch 的SQL 连接器只能作为TableSink,可以将表数据写入Elasticsearch 的索引(index)。
Elasticsearch 连接器的使用与 JDBC 连接器非常相似,写入数据的模式同样是由创建表的 DDL中是否有主键定义决定的。
1. 引入依赖
想要在 Flink 程序中使用 Elasticsearch 连接器,需要引入对应的依赖。具体的依赖与Elasticsearch 服务器的版本有关,对于 6.x 版本引入依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact Id>
<version>$</version>
</dependency>
对于Elasticsearch 7 以上的版本,引入的依赖则是:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifact Id>
<version>$</version>
</dependency>
2. 创建连接到 Elasticsearch 的表
创建Elasticsearch 表的方法与 JDBC 表基本一致。下面是一个具体示例:
-- 创建一张连接到 Elasticsearch 的 表
CREATE TABLE MyTable (
user_id STRING, user_name STRING uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'users'
);
这里定义了主键,所以会以更新插入(Upsert)模式向 Elasticsearch 写入数据。
六、HBase
作为高性能、可伸缩的分布式列存储数据库,HBase 在大数据分析中是一个非常重要的工具。Flink 提供的HBase 连接器支持面向HBase 集群的读写操作。
在流处理场景下,连接器作为 TableSink 向 HBase 写入数据时,采用的始终是更新插入(Upsert)模式。也就是说,HBase 要求连接器必须通过定义的主键(primary key)来发送更新日志(changelog)。所以在创建表的 DDL 中,我们必须要定义行键(rowkey)字段,并将它声明为主键;如果没有用PRIMARY KEY 子句声明主键,连接器会默认把 rowkey 作为主键。
1. 引入依赖
想要在 Flink 程序中使用 HBase 连接器,需要引入对应的依赖。目前 Flink 只对 HBase 的1.4.x 和 2.2.x 版本提供了连接器支持,而引入的依赖也应该与具体的 HBase 版本有关。对于1.4 版本引入依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-1.4_$</artifactId>
<version>$</version>
</dependency>
对于HBase 2.2 版本,引入的依赖则是:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_$</artifactId>
<version>$</version>
</dependency>
2. 创建连接到HBase 的表
由于HBase 并不是关系型数据库,因此转换为 Flink SQL 中的表会稍有一些麻烦。在 DDL 创建出的HBase 表中,所有的列族(column family)都必须声明为 ROW 类型,在表中占据一个字段;而每个 family 中的列(column qualifier)则对应着 ROW 里的嵌套字段。我们不需要将 HBase 中所有的 family 和 qualifier 都在 Flink SQL 的表中声明出来,只要把那些在查询中用到的声明出来就可以了。
除了所有 ROW 类型的字段(对应着 HBase 中的 family),表中还应有一个原子类型的字段,它就会被识别为 HBase 的 rowkey。在表中这个字段可以任意取名,不一定非要叫 rowkey。
下面是一个具体示例:
-- 创建一张连接到 HBase 的 表
CREATE TABLE MyTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>, PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4', 'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);
-- 假设表 T 的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6] INSERT INTO MyTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
我们将另一张T 中的数据提取出来,并用 ROW()函数来构造出对应的 column family,最终写入 HBase 中名为 mytable 的表。
七、Hive
Apache Hive 作为一个基于 Hadoop 的数据仓库基础框架,可以说已经成为了进行海量数据分析的核心组件。Hive 支持类SQL 的查询语言,可以用来方便对数据进行处理和统计分析, 而且基于HDFS 的数据存储有非常好的可扩展性,是存储分析超大量数据集的唯一选择。Hive 的主要缺点在于查询的延迟很高,几乎成了离线分析的代言人。而 Flink 的特点就是实时性强, 所以 Flink SQL 与Hive 的结合势在必行。
Flink 与Hive 的集成比较特别。Flink 提供了"Hive 目录"(HiveCatalog)功能,允许使用Hive 的"元存储"(Metastore)来管理 Flink 的元数据。这带来的好处体现在两个方面:
- Metastore 可以作为一个持久化的目录,因此使用 HiveCatalog 可以跨会话存储 Flink 特定的元数据。这样一来,我们在 HiveCatalog 中执行执行创建Kafka 表或者ElasticSearch 表, 就可以把它们的元数据持久化存储在 Hive 的 Metastore 中;对于不同的作业会话就不需要重复创建了,直接在 SQL 查询中重用就可以。
- 使用HiveCatalog,Flink 可以作为读写Hive 表的替代分析引擎。这样一来,在Hive 中进行批处理会更加高效;与此同时,也有了连续在 Hive 中读写数据、进行流处理的能力, 这也使得"实时数仓"(real-time data warehouse)成为了可能。
HiveCatalog 被设计为"开箱即用",与现有的 Hive 配置完全兼容,我们不需要做任何的修改与调整就可以直接使用。注意只有 Blink 的计划器(planner)提供了 Hive 集成的支持, 所以需要在使用Flink SQL 时选择Blink planner。下面我们就来看以下与Hive 集成的具体步骤。
1. 引入依赖
Hive 各版本特性变化比较大,所以使用时需要注意版本的兼容性。目前 Flink 支持的 Hive版本包括:
Hive 1.x:1.0.0~1.2.2;
Hive 2.x:2.0.02.2.0,2.3.02.3.6;
Hive 3.x:3.0.0~3.1.2;
目前 Flink 与 Hive 的集成程度在持续加强,支持的版本信息也会不停变化和调整,大家可以随着关注官网的更新信息。
由于Hive 是基于Hadoop 的组件,因此我们首先需要提供 Hadoop 的相关支持,在环境变量中设置HADOOP_CLASSPATH:
export HADOOP_CLASSPATH=hadoop classpath
在 Flink 程序中可以引入以下依赖:
<!-- Flink 的 Hive 连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_$</artifactId>
<version>$</version>
</dependency>
<!-- Hive 依 赖 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>$
</dependency>
建议不要把这些依赖打包到结果 jar 文件中,而是在运行时的集群环境中为不同的 Hive版本添加不同的依赖支持。具体版本对应的依赖关系,可以查询官网说明。
2. 连接到Hive
在 Flink 中连接 Hive,是通过在表环境中配置 HiveCatalog 来实现的。需要说明的是,配置 HiveCatalog 本身并不需要限定使用哪个 planner,不过对 Hive 表的读写操作只有 Blink 的planner 才支持。所以一般我们需要将表环境的 planner 设置为 Blink。
下面是代码中配置Catalog 的示例:
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase"; String hiveConfDir = "/opt/hive-conf";
// 创建一个 HiveCatalog,并在表环境中注册
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("myhive", hive);
// 使用 HiveCatalog 作为当前会话的 catalog
tableEnv.useCatalog("myhive");
当然,我们也可以直接启动SQL 客户端,用CREATE CATALOG 语句直接创建HiveCatalog:
Flink SQL> create catalog myhive with (‘type’ = ‘hive’, ‘hive-conf-dir’ = ‘/opt/hive-conf’);
[INFO] Execute statement succeed.
Flink SQL> use catalog myhive; [INFO] Execute statement succeed.
3. 设置 SQL 方言
我们知道,Hive 内部提供了类SQL 的查询语言,不过语法细节与标准SQL 会有一些出入,相当于是 SQL 的一种"方言"(dialect)。为了提高与 Hive 集成时的兼容性,Flink SQL 提供了一个非常有趣而强大的功能:可以使用方言来编写 SQL 语句。换句话说,我们可以直接在 Flink 中写Hive SQL 来操作 Hive 表,这无疑给我们的读写处理带来了极大的方便。
Flink 目前支持两种SQL 方言的配置:default 和 hive。所谓的 default 就是 Flink SQL 默认的 SQL 语法了。我们需要先切换到 hive 方言,然后才能使用 Hive SQL 的语法。具体设置可以分为 SQL 和Table API 两种方式。
- SQL 中设置
我们可以通过配置table.sql-dialect 属性来设置SQL 方言:
set table.sql-dialect=hive;
当然,我们可以在代码中执行上面的 SET 语句,也可以直接启动 SQL 客户端来运行。如果使用 SQL 客户端,我们还可以在配置文件 sql-cli-defaults.yaml 中通过"configuration"模块来设置:
execution: planner: blink type: batch
result-mode: table
configuration:
table.sql-dialect: hive
- Table API 中设置
另外一种方式就是在代码中,直接使用Table API 获取表环境的配置项来进行设置:
// 配置 hive 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 配置 default 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
4. 读写 Hive 表
有了 SQL 方言的设置,我们就可以很方便的在 Flink 中创建 Hive 表并进行读写操作了。Flink 支持以批处理和流处理模式向 Hive 中读写数据。在批处理模式下,Flink 会在执行查询语句时对Hive 表进行一次性读取,在作业完成时将结果数据向 Hive 表进行一次性写入;而在流处理模式下,Flink 会持续监控 Hive 表,在新数据可用时增量读取,也可以持续写入新数据并增量式地让它们可见。
更灵活的是,我们可以随时切换 SQL 方言,从其它数据源(例如 Kafka)读取数据、经转换后再写入Hive。下面是以纯SQL 形式编写的一个示例,我们可以启动SQL 客户端来运行:
-- 设置 SQL 方言为 hive,创建 Hive 表
SET table.sql-dialect=hive; CREATE TABLE hive_table (
user_id STRING, order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
-- 设置 SQL 方言为 default,创建 Kafka 表
SET table.sql-dialect=default; CREATE TABLE kafka_table (
user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 定义水位线
) WITH (...);
-- 将 Kafka 中读取的数据经转换后写入 Hive INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
这里我们创建 Hive 表时设置了通过分区时间来触发提交的策略。将 Kafka 中读取的数据经转换后写入Hive,这是一个流处理的 Flink SQL 程序。
文章作者: 雨中散步撒哈拉
文章链接: https://liudongdong.top/archives/flinkliu-shi-yi-flinksql-zhi-lian-jie-dao-wai-bu-xi-tong