该连接器可以向 JDBC 数据库写入数据。
添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):
注意该连接器目前还 不是 二进制发行版的一部分,如何在集群中运行请参考 这里。
已创建的 JDBC Sink 能够保证至少一次的语义。
更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
用法示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbMetadata().getUrl())
.withDriverName(getDbMetadata().getDriverClass())
.build()));
env.execute();
env = StreamExecutionEnvironment.get_execution_environment()
type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])
env.from_collection(
[(101, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
(102, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
(103, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
(104, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
], type_info=type_info) \
.add_sink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
type_info,
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url('jdbc:postgresql://dbhost:5432/postgresdb')
.with_driver_name('org.postgresql.Driver')
.with_user_name('someUser')
.with_password('somePassword')
.build()
))
env.execute()
更多细节请查看 API documentation 。
文章列表
- Flink DataStream-文件系统
- Flink DataStream-亚马逊Kinesis数据流SQL连接器
- Flink DataStream-RabbitMQ连接器
- Flink DataStream-JDBC Connector
- Flink DataStream-Hybrid Source
- Flink DataStream-Google Cloud PubSub
- Flink DataStream-DataStream Connectors
- Flink DataStream-Data Source和Sink的容错保证
- Flink DataStream-Apache Pulsar连接器
- Flink DataStream-Apache Kafka连接器
- Flink DataStream-Apache Cassandra Connector
- Flink DataStream-Amazon Kinesis Data Firehose Sink