Flink DataStream-Google Cloud PubSub

作者: ApacheFlink

这个连接器可向 Google Cloud PubSub 读取与写入数据。添加下面的依赖来使用此连接器:

注意:此连接器最近才加到 Flink 里,还未接受广泛测试。

注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考[这里]

Consuming or Producing PubSubMessages

连接器可以接收和发送 Google PubSub 的信息。和 Google PubSub 一样,这个连接器能够保证至少一次的语义。

PubSub SourceFunction

PubSubSource 类的对象由构建类来构建: PubSubSource.newBuilder(...)

有多种可选的方法来创建 PubSubSource,但最低要求是要提供 Google Project、Pubsub 订阅和反序列化 PubSubMessages 的方法。 Example:

StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<SomeObject> deserializer = (...);
SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                      .withDeserializationSchema(deserializer)
                                                      .withProjectName("project")
                                                      .withSubscriptionName("subscription")
                                                      .build();
streamExecEnv.addSource(pubsubSource);

当前还不支持 PubSub 的 source functions pulls messages 和 push endpoints

PubSub Sink

PubSubSink 类的对象由构建类来构建: PubSubSink.newBuilder(...)

构建类的使用方式与 PubSubSource 类似。 Example:

DataStream<SomeObject> dataStream = (...);
SerializationSchema<SomeObject> serializationSchema = (...);
SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
                                                .withSerializationSchema(serializationSchema)
                                                .withProjectName("project")
                                                .withSubscriptionName("subscription")
                                                .build()
dataStream.addSink(pubsubSink);

Google Credentials

应用程序需要使用 Credentials 来通过认证和授权才能使用 Google Cloud Platform 的资源,例如 PubSub。

上述的两个构建类都允许你提供 Credentials, 但是连接器默认会通过环境变量: GOOGLE_APPLICATION_CREDENTIALS 来获取 Credentials 的路径。

如果你想手动提供 Credentials,例如你想从外部系统读取 Credentials,你可以使用 PubSubSource.newBuilder(...).withCredentials(...)

集成测试

在集成测试的时候,如果你不想直接连 PubSub 而是想读取和写入一个 docker container,可以参照 PubSub testing locally

下面的例子展示了如何使用 source 来从仿真器读取信息并发送回去:

String hostAndPort = "localhost:1234";
DeserializationSchema<SomeObject> deserializationSchema = (...);
SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                      .withDeserializationSchema(deserializationSchema)
                                                      .withProjectName("my-fake-project")
                                                      .withSubscriptionName("subscription")
                                                      .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator(hostAndPort, "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
                                                      .build();
SerializationSchema<SomeObject> serializationSchema = (...);
SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
                                                .withSerializationSchema(serializationSchema)
                                                .withProjectName("my-fake-project")
                                                .withSubscriptionName("subscription")
                                                .withHostAndPortForEmulator(hostAndPort)
                                                .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(pubsubSource)
   .addSink(pubsubSink);

至少一次语义保证

SourceFunction

有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。

另一个可能的原因是超过了确认的截止时间,即收到与确认信息之间的时间间隔。PubSubSource 只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。

因此,我们建议把快照的间隔设置得比信息确认截止时间更短。

参照 PubSub 来增加信息确认截止时间。

注意: PubSubMessagesProcessedNotAcked 显示了有多少信息正在等待下一个 checkpoint 还没被确认。

SinkFunction

Sink function 会把准备发到 PubSub 的信息短暂地缓存以提高性能。每次 checkpoint 前,它会刷新缓冲区,并且只有当所有信息成功发送到 PubSub 之后,checkpoint 才会成功完成。

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多