Apache StreamPark-Elasticsearch

作者: Apache StreamPark

介绍

支持:

  • Sink: Batch
  • Sink: Streaming Append & Upsert Mode
    Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。

连接器可以工作在 upsert 模式下,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。

如果 DDL 中没有定义主键,则连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。

依赖

为了使用Elasticsearch连接器,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。
6.x

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  <version>1.13.0</version>
</dependency>    

7.x and later versions

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
  <version>1.13.0</version>
</dependency>

注意自己使用的 flink 和 scala 版本。

创建 Elasticsearch 表

以下示例展示如何创建 Elasticsearch sink 表:

CREATE TABLE myUserTable (
    user_id STRING,
    user_name STRING
    uv BIGINT,
    pv BIGINT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200',
    'index' = 'users'
);

连接器参数

参数 是否必选 从 flink-1.15.x 开始支持
是否可传递
默认值 数据类型 描述
connector 必选 (none) String 指定要使用的连接器,有效值为:
elasticsearch-6:连接到 Elasticsearch 6.x 的集群。
elasticsearch-7:连接到 Elasticsearch 7.x 及更高版本的集群。
hosts 必选 (none) String 要连接到的一台或多台 Elasticsearch 主机,例如 http://host_name:9092;http://host_name:9093
index 必选 (none) String Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 myIndex)或一个动态索引(例如 index-{log_ts&##124;yyyy-MM-dd})。 更多详细信息,请参见下面的动态索引部分。
document-type 6.x 版本中必选 6.x:是 (none) String Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。
document-id.key-delimiter 可选 _ String 复合键的分隔符(默认为**_),例如,指定为将导致文档1KEY2$KEY3**。
username 可选 (none) String 用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。
password 可选 (none) String 用于连接 Elasticsearch 实例的密码。如果配置了username,则此选项也必须配置为非空字符串。
failure-handler 可选 fail String 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:
fail:如果请求失败并因此导致作业失败,则抛出异常。
ignore:忽略失败并放弃请求。
retry-rejected:重新添加由于队列容量饱和而失败的请求。
自定义类名称:使用 ActionRequestFailureHandler 的子类进行失败处理。
sink.flush-on-checkpoint 可选 true Boolean 是否在 checkpoint 时执行 flush。禁用后,在 checkpoint 时 sink 将不会等待所有的 pending 请求被 Elasticsearch 确认。因此,sink 不会为请求的 at-least-once 交付提供任何有力保证。
sink.bulk-flush.max-actions 可选 1000 Integer 每个批量请求的最大缓冲操作数。 可以设置为0来禁用它。
sink.bulk-flush.max-size 可选 2mb MemorySize 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为0来禁用它。
sink.bulk-flush.interval 可选 1s Duration flush 缓冲操作的间隔。 可以设置为0来禁用它。注意,sink.bulk-flush.max-sizesink.bulk-flush.max-actions都设置为0的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。
sink.bulk-flush.backoff.strategy 可选 DISABLED String 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:
DISABLED:不执行重试,即第一次请求错误后失败。
CONSTANT:以指定的重试延迟时间间隔来进行重试。
EXPONENTIAL:先等待回退延迟,然后在重试之间指数递增延迟时间。
sink.bulk-flush.backoff.max-retries 可选 flink-1.13.x:8
flink-1.15.x:(none)
Integer 最大回退重试次数。
sink.bulk-flush.backoff.delay 可选 flink-1.13.x:50ms
flink-1.15.x:(none)
Duration 每次回退尝试之间的延迟。对于 CONSTANT 回退策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 回退策略,该值是初始的延迟。
在 flink-1.15.x 中被删除
connection.max-retry-timeout
可选 (none) Duration 最大重试超时时间。
connection.path-prefix 可选 (none) String 添加到每个 REST 通信中的前缀字符串,例如,**/v
从 Flink-1.15.x 开始支持
connection.request-timeout
可选 (none) Duration 请求连接的超时时间,单位:毫秒,数值必须大于等于0。设置为0,表示超时时间无限大。
从 Flink-1.15.x 开始支持
connection.timeout
可选 (none) Duration 建立连接使用的超时时间,单位:毫秒,数值必须大于等于0。设置为0,表示超时时间无限大。
从 Flink-1.15.x 开始支持
socket.timeout
可选 (none) Duration socket 等待数据的超时时间(SO_TIMEOUT),换句话说,两个连续数据包之间不活跃的最大时间,数值必须大于等于0。设置为0,表示超时时间无限大。
format 可选 json String Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 json 格式。更多详细信息,请参阅 JSON Format 页面。

特性

Key 处理

Elasticsearch sink 可以根据是否定义了主键来确定是在 upsert 模式还是 append 模式下工作。

如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 的消息。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 的消息。

在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。
Elasticsearch 连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。

某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTESROWARRAYMAP 等。 如果未指定主键,Elasticsearch 将自动生成文档 id。

有关 PRIMARY KEY 语法的更多详细信息,请参见 [CREATE TABLE DDL]。

动态索引

Elasticsearch sink 同时支持静态索引和动态索引。

如果想使用静态索引,则 index 选项值应为纯字符串,例如 myusers,所有记录都将被写入到 myusers 索引中。

如果想使用动态索引,你可以使用 来引用记录中的字段值来动态生成目标索引。

你也可以使用 {field_name|date_format_string}  TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。
date_format_string 与 Java 的 DateTimeFormatter 兼容。

例如,如果选项值设置为 myusers-{log_ts|yyyy-MM-dd},则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 myusers-2020-03-27 索引中。

数据类型映射

Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。
Flink 为 Elasticsearch 连接器使用内置的 json 格式。更多类型映射的详细信息,请参阅 [JSON Format] 页面。

文章列表

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多