我们以一个通过socket接收数据,将数据分割为多个字段,并输出处理结果的应用为例,快速展示seatunnel的使用方法。 因项目由 waterdrop 改名 为 seatunnel 进入孵化器, 因此历史发行版本文件夹名称还为 waterdrop
Step 1: 准备Flink 运行环境
如果你熟悉Flink或者已准备好Flink运行环境,可忽略此步骤,Flink不需要做任何特殊配置。
请先下载Flink, Flink版本请选择 1.9.0,更高版本的 Flink 兼容正在孵化中。下载完成进行安装
Step 2: 下载 seatunnel
进入seatunnel安装包下载页面,下载最新版seatunnel-<version>.zip
或者直接下载指定版本(以v2.0.4为例):
wget https://github.com/apache/incubator-seatunnel/releases/download/v2.0.4/waterdrop-dist-2.0.4-2.11.8-release.zip -O seatunnel-2.0.4.zip
下载后,解压:
unzip seatunnel-<version>.zip
ln -s waterdrop-<version> seatunnel
Step 3: 配置 seatunnel
编辑 config/waterdrop-env.sh
, 指定必须环境配置如FLINK_HOME(Step 1 中Flink下载并解压后的目录)
新建并编辑 config/application.conf
(可直接复制以下内容), 它决定了 seatunnel 启动后,数据输入,处理,输出的方式和逻辑。
env {
## You can set flink configuration here
execution.parallelism = 1
##execution.checkpoint.interval = 10000
##execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
SocketStream{
result_table_name = "fake"
field_name = "info"
}
}
transform {
Split{
##"
fields = ["name","age"]
}
sql {
sql = "select * from (select info,split(info) as info_row from fake) t1"
}
}
sink {
ConsoleSink
}
Step 4: 启动netcat server用于发送数据
nc -l -p 9999
Step 5: 启动seatunnel
cd seatunnel
./bin/start-waterdrop-flink.sh --config ./config/application.conf
Step 6: 在nc端输入
xg##1995
可在 flink Web-UI(http://localhost:8081/##/task-manager)的 TaskManager Stdout日志打印出:
xg##1995,xg,1995
总结
如果想了解更多的seatunnel配置示例可参见: 配置示例1 : Streaming 流式计算
以上配置为默认【流式处理配置模版】,可直接运行,命令如下:
cd seatunnel
./bin/start-waterdrop-flink.sh --config ./config/flink.streaming.conf.template
以上配置为默认【离线批处理配置模版】,可直接运行,命令如下:
cd seatunnel
./bin/start-waterdrop-flink.sh --config ./config/flink.batch.conf.template
文章列表
- Apache Seatunnel-Roadmap
- Apache Seatunnel-Sink plugin : Clickhouse [Spark]
- Apache Seatunnel-Sink plugin : Console [Spark]
- Apache Seatunnel-Sink plugin : Elasticsearch [Spark]
- Apache Seatunnel-Sink plugin : Email [Spark]
- Apache Seatunnel-Sink plugin : File [Spark]
- Apache Seatunnel-Sink plugin : Hbase [Spark]
- Apache Seatunnel-Sink plugin : Hdfs [Spark]
- Apache Seatunnel-Sink plugin : Mysql [Spark]
- Apache Seatunnel-Sink plugin : Phoenix [Spark]
- Apache Seatunnel-Source plugin : Fake [Spark]
- Apache Seatunnel-Source plugin : FakeStream [Spark]
- Apache Seatunnel-Source plugin : Hive [Spark]
- Apache Seatunnel-Source plugin : JDBC [Spark]
- Apache Seatunnel-Source plugin : Kafka [Flink]
- Apache Seatunnel-Source plugin : Kafka [Spark]
- Apache Seatunnel-Source plugin : Socket [Flink]
- Apache Seatunnel-Source plugin : SocketStream [Spark]
- Apache Seatunnel-Transform Plugin
- Apache Seatunnel-Transform plugin : Json [Spark]
- Apache Seatunnel-Transform plugin : SQL [Spark]
- Apache Seatunnel-Transform plugin : Split [Spark]
- Apache Seatunnel-seatunnel v2.x 与 v1.x 的区别是什么?
- Apache Seatunnel-start-seatunnel-flink.sh 使用方法
- Apache Seatunnel-start-seatunnel-spark.sh 使用方法
- Apache Seatunnel-下载、安装
- Apache Seatunnel-为 seatunnel v2.x 贡献代码
- Apache Seatunnel-完整配置文件案例 [Spark]
- Apache Seatunnel-快速开始
- Apache Seatunnel-插件开发
- Apache Seatunnel-深入seatunnel
- Apache Seatunnel-部署与运行