Apache Seatunnel-Source plugin : Kafka [Flink]

作者: Apache Seatunnel

从Kafka消费数据,支持的Kafka版本 >= 0.10.0.

Options

nametyperequireddefault value
[topics] topics-string   
[consumer.group.id] consumergroupid-string   
[consumer.bootstrap.servers] consumerbootstrapservers-string   
[schema] schema-string   
[format.type] format-string   
[format.] format.-string   
[consumer.*] consumer-string   
[rowtime.field] rowtime.field-string   
[watermark] watermark-string   
[offset.reset] offset.reset-string   
[common-options] common-options-string   
topics [string]

Kafka topic名称。如果有多个topic,用”,“分割,例如: “tpc1,tpc2”。

consumer.group.id [string]

Kafka consumer group id,用于区分不同的消费组。

consumer.bootstrap.servers [string]

Kafka集群地址,多个用”,“隔开

format.type [string]

目前支持三种种格式

  • json
  • csv
  • avro
format.* [string]

csv格式通过这个参数来设置分隔符等。例如设置列分隔符为\t,format.field-delimiter=\\t

schema [string]
  • csv
  • csv的schema是一个jsonArray的字符串,如"[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
  • json
  • json的schema参数是提供一个原数据的json字符串,可以自动生成schema,但是需要提供内容最全的原数据,否则会有字段丢失。
  • avro
  • avro的schema参数是提供一个标准的avro的schema JSON字符串,如{\"name\":\"test\",\"type\":\"record\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"long\"},{\"name\":\"addrs\",\"type\":{\"name\":\"addrs\",\"type\":\"record\",\"fields\":[{\"name\":\"province\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"}]}}]}
  • 如需详细了解Avro Schema JSON字符串应该如何定义,请参见:https://avro.apache.org/docs/current/spec.html
consumer.* [string]

除了以上必备的kafka consumer客户端必须指定的参数外,用户还可以指定多个consumer客户端非必须参数,覆盖了kafka官方文档指定的所有consumer参数.

指定参数的方式是在原参数名称上加上前缀"consumer.“,如指定ssl.key.password的方式是: consumer.ssl.key.password= xxxx。如果不指定这些非必须参数,它们将使用Kafka官方文档给出的默认值。

rowtime.field [string]

设置生成watermark的字段

watermark [long]

设置生成watermark的允许延迟

offset.reset [string]

消费者的起始offset,只对新消费者有效。有以下三种模式

  • latest
  • 从最新的offset开始消费
  • earliest
  • 从最早的offset开始消费
  • specific
  • 从指定的offset开始消费,此时要指定各个分区的起始offset。设置方式通过offset.reset.specific="{0:111,1:123}"
common options [string]

Source 插件通用参数,详情参照 [Source Plugin]

Examples

  KafkaTableStream {
    consumer.bootstrap.servers = "127.0.0.1:9092"
    consumer.group.id = "seatunnel5"
    topics = test
    result_table_name = test
    format.type = csv
    schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
    format.field-delimiter = ";"
    format.allow-comments = "true"
    format.ignore-parse-errors = "true"
  }

文章列表

更多推荐

更多
这里什么都没有

近期文章

更多
文章目录

    推荐作者

    更多