Flink DataStream-Hybrid Source

作者: ApacheFlink

HybridSource is a source that contains a list of concrete sources. It solves the problem of sequentially reading input from heterogeneous sources to produce a single input stream. For example, a bootstrap use case may need to read several days worth of bounded input from S3 before continuing with the latest unbounded input from Kafka. HybridSource switches from FileSource to KafkaSource when the bounded file input finishes without interrupting the application. Prior to HybridSource, it was necessary to create a topology with multiple sources and define a switching mechanism in user land, which leads to operational complexity and inefficiency. With HybridSource the multiple sources appear as a single source in the Flink job graph and from DataStream API perspective. For more background see FLIP-150 To use the connector, add the `flink-connector-base` dependency to your project: (Typically comes as transitive dependency with concrete sources.)

Start position for next source

To arrange multiple sources in a HybridSource, all sources except the last one need to be bounded. Therefore, the sources typically need to be assigned a start and end position. The last source may be bounded in which case the HybridSource is bounded and unbounded otherwise. Details depend on the specific source and the external storage systems. Here we cover the most basic and then a more complex scenario, following the File/Kafka example.

Fixed start position at graph construction time

Example: Read till pre-determined switch time from files and then continue reading from Kafka. Each source covers an upfront known range and therefore the contained sources can be created upfront as if they were used directly:

long switchTimestamp = ...; // derive from file input paths
FileSource<String> fileSource =
  FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
          KafkaSource.<String>builder()
                  .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                  .build();
HybridSource<String> hybridSource =
          HybridSource.builder(fileSource)
                  .addSource(kafkaSource)
                  .build();
switch_timestamp = ... # derive from file input paths
file_source = FileSource \
    .for_record_stream_format(StreamFormat.text_line_format(), test_dir) \
    .build()
kafka_source = KafkaSource \
    .builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_group_id('MY_GROUP') \
    .set_topics('quickstart-events') \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .set_starting_offsets(KafkaOffsetsInitializer.timestamp(switch_timestamp)) \
    .build()
hybrid_source = HybridSource.builder(file_source).add_source(kafka_source).build()

Dynamic start position at switch time

Example: File source reads a very large backlog, taking potentially longer than retention available for next source. Switch needs to occur at “current time - X”. This requires the start time for the next source to be set at switch time. Here we require transfer of end position from the previous file enumerator for deferred construction of KafkaSource by implementing SourceFactory. Note that enumerators need to support getting the end timestamp. This may currently require a source customization. Adding support for dynamic end position to FileSource is tracked in FLINK-23633.

FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest();
HybridSource<String> hybridSource =
    HybridSource.<String, CustomFileSplitEnumerator>builder(fileSource)
        .addSource(
            switchContext -> {
              CustomFileSplitEnumerator previousEnumerator =
                  switchContext.getPreviousEnumerator();
              // how to get timestamp depends on specific enumerator
              long switchTimestamp = previousEnumerator.getEndTimestamp();
              KafkaSource<String> kafkaSource =
                  KafkaSource.<String>builder()
                      .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                      .build();
              return kafkaSource;
            },
            Boundedness.CONTINUOUS_UNBOUNDED)
        .build();

Still not supported in Python API.

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多