Apache StreamPark-简介

作者: Apache StreamPark

介绍

Flink的Table API和SQL程序可以连接到其他外部系统,用于读写批处理表和流处理表。

表source提供对存储在外部系统(如数据库、键值存储、消息队列或文件系统)中数据的访问。表sink向外部存储系统发送数据。根据source和sink的类型,它们支持不同的格式,如CSVAvroParquetORC

本节描述如何使用内置的连接器在Flink中注册表source和表sink。注册source或sink后,可以通过表API和SQL语句访问它。

如果你实现自己的自定义表source或sink,请查看自定义source和sink连接器页面。

支持的连接器

Flink内置各种连接器。下表列出了所有可用的连接器。

Name Version Source Sink
Filesystem Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink
Elasticsearch 6.x & 7.x Not supported Streaming Sink, Batch Sink
Apache Kafka 0.10+ Unbounded Scan Streaming Sink, Batch Sink
Amazon Kinesis Data Streams Unbounded Scan Streaming Sink
JDBC Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache HBase 1.4.x & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache Hive Supported Versions Unbounded Scan, Bounded Scan, Lookup Streaming Sink, Batch Sink

使用连接器

Flink支持使用SQL CREATE TABLE语句来注册表。可以定义表名、表schema和用于连接外部系统的表选项。

有关创建表的更多信息,请参阅[语法]部分。

下面的代码展示了如何连接到Kafka来读写JSON格式记录的完整示例。

CREATE TABLE MyUserTable (
    -- 声明表的schema
    `user` BIGINT,
    `message` STRING,
    `rowtime` TIMESTAMP(3) METADATA FROM 'timestamp',    -- 使用元数据字段来访问kafka数据的timestamp时间戳
    `proctime` AS PROCTIME(),    -- 使用计算列来定义处理时间属性
    WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECOND    -- 使用WATERMARK语句定义rowtime属性
) WITH (
    -- 定义连接的外部系统属性
    'connector' = 'kafka',
    'topic' = 'topic_name',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'   -- 声明这个外部系统使用format
);

所需的连接属性被转换为基于字符串的键值对。工厂将基于工厂标识符(本例中是kafka和json)从键值对中创建配置好的表source、表sink和相应的format格式。

在为每个组件搜索一个匹配的工厂时,会考虑所有可以通过Java的服务提供者接口(SPI)找到的工厂。

如果找不到任何工厂或找到了多个与给定属性匹配的工厂,则将抛出一个异常,并提供有关可以使用的工厂和受支持属性的附加信息。

schema匹配

SQL CREATE TABLE语句的body子句定义了物理列、约束、水印的名称和类型。Flink不保存数据,因此schema定义仅声明如何将物理列从外部系统映射到Flink。

映射可能不是按名称一一对应的,这取决于格式和连接器的实现。例如,MySQL数据库表是按字段名(不区分大小写)映射的,CSV文件系统是按字段顺序映射的(字段名可以是任意的)。这些规则将根据对应的连接器来解析。

下面的示例展示了一个简单的schema,其中没有时间属性、输入/输出到表列的一对一字段映射。

CREATE TABLE MyTable (
    MyField1 INT,
    MyField2 STRING,
    MyField3 BOOLEAN
) WITH (
    ...
)

元数据

一些连接器和格式公开了附加的元数据字段,可以在物理列后面的元数据列中访问这些字段。有关元数据列的更多信息,请参阅[CREATE TABLE]部分。

主键

主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值。主键唯一地标识表中的一行。
source表的主键用于优化元数据信息。sink表的主键通常用于插入更新数据。
SQL标准指定主键约束可以是ENFORCED的,也可以是NOT ENFORCED的。这将控制是否对传入/传出数据执行约束检查。
Flink本身并不拥有数据,因此唯一支持的模式是NOT ENFORCED模式。确保查询执行的主键强制约束由用户实现。

CREATE TABLE MyTable (
    MyField1 INT,
    MyField2 STRING,
    MyField3 BOOLEAN,
    PRIMARY KEY (MyField1, MyField2) NOT ENFORCED  -- 根据字段定义主键列
) WITH (
    ...
)

时间属性

在处理无界流表时,时间属性是必不可少的。因此,proctimerowtime属性都可以定义为schema的一部分。

有关Flink中的时间处理(尤其是事件时间)的更多信息,建议参阅事件时间部分。

处理时间属性

为了在模式中声明proctime属性,可以使用计算列语法声明一个由proctime()内置函数生成的计算列。计算列是不存储在物理数据中的虚拟列。

CREATE TABLE MyTable (
    MyField1 INT,
    MyField2 STRING,
    MyField3 BOOLEAN
    MyField4 AS PROCTIME() -- 定义一个处理时间属性列
) WITH (
    ...
)

rowtime时间属性

为了控制表的事件时间行为,Flink提供了预定义的时间戳提取器和水印策略。

有关在DDL中定义时间属性的更多信息,请参阅[CREATE TABLE]语句。

支持以下时间戳提取器:

-- 使用已存在的TIMESTAMP(3)类型的字段作为事件时间属性
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ...
) WITH (
    ...
)
-- 使用系统函数、UDF、表达式来提取期望的TIMESTAMP(3)类型的事件时间属性
CREATE TABLE MyTable (
    log_ts STRING,
    ts_field AS TO_TIMESTAMP(log_ts),
    WATERMARK FOR ts_field AS ...
) WITH (
    ...
)

支持的水印策略如下:

-- 为严格升序的事件时间属性设置水印策略。发出到目前为止观察到的最大时间戳水印。时间戳大于最大时间戳的行不属于延迟。
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ts_field
) WITH (
    ...
)
-- 设置升序事件时间属性的水印策略。发出到目前为止观察到的最大时间戳减去1的水印。时间戳大于或等于最大时间戳的行不属于延迟。
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ts_field - INTERVAL '0.001' SECOND
) WITH (
    ...
)
-- 为事件时间属性设置水印策略,这些事件时间属性在有限的时间间隔内是无序的。发出的水印是观察到的最大时间戳减去指定的延迟,例如2秒。
CREATE TABLE MyTable (
    ts_field TIMESTAMP(3),
    WATERMARK FOR ts_field AS ts_field - INTERVAL '2' SECOND
) WITH (
    ...
)

一定要同时声明时间戳和水印。触发基于时间的操作需要水印。

SQL字段类型

请参阅[数据类型]章节,了解如何在SQL中声明类型。

文章列表

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多