DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能
为了更好的扩展Apache doris生态,为doris用户提供更方便的数据导入,社区开发扩展支持了Datax DorisWriter,使大家更方便Datax进行数据导入
1.场景
这里演示介绍的使用 Doris 的 Datax 扩展 DorisWriter实现从Mysql数据定时抽取数据导入到Doris数仓表里
2.编译 DorisWriter
这个的扩展的编译可以不在 doris 的 docker 编译环境下进行,本文是在 windows 下的 WLS 下进行编译的
首先从github上拉取源码
git clone https://github.com/apache/incubator-doris.git
进入到incubator-doris/extension/DataX/
执行编译
首先执行:
sh init_env.sh
这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:
- 将 DataX 代码库 clone 到本地。
- 将
doriswriter/
目录软链到DataX/doriswriter
目录。 - 在
DataX/pom.xml
文件中添加<module>doriswriter</module>
模块。 - 将
DataX/core/pom.xml
文件中的 httpclient 版本从 4.5 改为 4.5.13.httpclient v4.5 在处理 307 转发时有bug。
这个脚本执行后,开发者就可以进入 DataX/
目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter
目录中文件的修改,都会反映到 doriswriter/
目录中,方便开发者提交代码
2.1 开始编译
这里我为了加快编译速度去掉了很多无用的插件:这里直接在Datax目录下的pom.xml里注释掉就行
hbase11xreader
hbase094xreader
tsdbreader
oceanbasev10reader
odpswriter
hdfswriter
adswriter
ocswriter
oscarwriter
oceanbasev10writer
然后进入到incubator-doris/extension/DataX/
目录下的 Datax 目录,执行编译
这里我是执行的将 Datax 编译成 tar 包,和官方的编译命令不太一样。
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
目录结构:
编译完成以后,tar 包在 Datax/target
目录下,你可以将这tar包拷贝到你需要的地方,这里我是直接在 datax 执行测试,这里因为的 python 版本是 3.x版本,需要将 bin 目录下的三个文件换成 python 3能之别的版本,这个你可以去下面的地址下载:
https://github.com/WeiYe-Jing/datax-web/tree/master/doc/datax-web/datax-python3
将下载的三个文件替换 bin 目录下的文件以后,整个编译,安装就完成了
如果你编译不成功也可以从我的百度网盘上下载编译好的包,注意我上边编译去掉的那些插件
链接:https://pan.baidu.com/s/1hXYkpkrUE2qW4j98k2Wu7A
提取码:3azi
- 下载 alibaba-datax-maven-m2-20210928.tar.gz
- 解压后,将得到的
alibaba/datax/
目录,拷贝到所使用的 maven 对应的.m2/repository/com/alibaba/
下。 - 再次尝试编译。
3.数据接入
这个时候我们就可以开始使用 Datax 的doriswriter扩展开始从 Mysql(或者其他数据源)直接将数据抽取出来导入到 Doris 表中了。
3.1 Mysql 数据库准备
下面是我数据库的建表脚本(mysql 8):
CREATE TABLE `order_analysis` (
`date` varchar(19) DEFAULT NULL,
`user_src` varchar(9) DEFAULT NULL,
`order_src` varchar(11) DEFAULT NULL,
`order_location` varchar(2) DEFAULT NULL,
`new_order` int DEFAULT NULL,
`payed_order` int DEFAULT NULL,
`pending_order` int DEFAULT NULL,
`cancel_order` int DEFAULT NULL,
`reject_order` int DEFAULT NULL,
`good_order` int DEFAULT NULL,
`report_order` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT
示例数据
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', '广告二维码', 'Android APP', '上海', 15253, 13210, 684, 1247, 1000, 10824, 862);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', '微信朋友圈H5页面', 'iOS APP', '广州', 17134, 11270, 549, 204, 224, 10234, 773);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '地推二维码扫描', 'iOS APP', '北京', 16061, 9418, 1220, 1247, 458, 13877, 749);
3.2 doris数据库准备
下面是我上面数据表在doris对应的建表脚本
CREATE TABLE `order_analysis` (
`date` datetime DEFAULT NULL,
`user_src` varchar(30) DEFAULT NULL,
`order_src` varchar(50) DEFAULT NULL,
`order_location` varchar(10) DEFAULT NULL,
`new_order` int DEFAULT NULL,
`payed_order` int DEFAULT NULL,
`pending_order` int DEFAULT NULL,
`cancel_order` int DEFAULT NULL,
`reject_order` int DEFAULT NULL,
`good_order` int DEFAULT NULL,
`report_order` int DEFAULT NULL
) ENGINE=OLAP
DUPLICATE KEY(`date`,user_src)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_src`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);
3.3 Datax Job JSON文件
创建并编辑datax job任务json文件,并保存到指定目录
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "zhangfeng",
"column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order" ],
"connection": [ { "table": [ "order_analysis" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/demo" ] } ] }
},
"writer": {
"name": "doriswriter",
"parameter": {
"feLoadUrl": ["fe:8030"],
"beLoadUrl": ["be1:8040","be1:8040","be1:8040","be1:8040","be1:8040","be1:8040"],
"jdbcUrl": "jdbc:mysql://fe:9030/",
"database": "test_2",
"table": "order_analysis",
"column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"],
"username": "root",
"password": "",
"postSql": [],
"preSql": [],
"loadProps": {
},
"maxBatchRows" : 10000,
"maxBatchByteSize" : 104857600,
"labelPrefix": "datax_doris_writer_demo_",
"lineDelimiter": "\n"
}
}
}
]
}
}
这块 Mysql reader 使用方式参照:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
doriswriter的使用及参数说明:
https://github.com/apache/incubator-doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md
4.执行Datax数据导入任务
python bin/datax.py doris.json
然后就可以看到执行结果:
再去 Doris 数据库中查看你的表,数据就已经导入进去了,任务执行结束
因为 Datax 的任务是要靠外部触发才能执行,这里你可以使用Linux的crontab或者海豚调度之类的来控制任务运行
文章列表
- 获得Apache Doris社区之星
- 自己写的一个BI可视化系统(支持Apache Doris)
- 神仙打架海底捞火了
- 数据治理方案
- 怎么进行数据仓库分层设计及设计规范
- 实现通过Flink Mysql CDC结合Apache doris flink connector实现数据实时入库
- 如何构建公司的数据指标体系
- 基于Apache-doris怎么构建数据中台(四)-数据接入系统
- 基于Apache-doris怎么构建数据中台(六)-数据服务
- 基于Apache-doris怎么构建数据中台(八)-数仓管理
- 基于Apache-doris怎么构建数据中台(五)-数据质量
- 基于Apache doris怎么构建数据中台(二)-数据中台建设内容
- 基于Apache doris怎么构建数据中台(九)-数据安全
- 基于Apache doris怎么构建数据中台(三)-数据资产管理
- 基于Apache doris怎么构建数据中台(一)-什么是数据中台
- 基于Apache Doris怎么构建数据中台(七)-数据指标管理
- 参与开源两年来的感悟
- 元数据管理系统
- 使用supervisor实现Apache Doris进程自动拉起
- 使用Grafana监控Apache Doris
- link 使用 SQL 读取 Kafka 利用Doris Flink Connector写入到Doris表中
- [[Doris 社区的访谈]一个人可能走得更快,但一群人会走得更远](https://www.oomspot.com/post/dorisshequdefangtanyigerenkenengzoudegengkuaidanyi)
- Doris Grafana监控指标介绍
- Apache doris架构及组件介绍
- Apache doris 错误代码说明
- Apache doris 使用过程中常见问题汇总
- Apache doris ODBC外表使用方式
- Apache doris ODBC mysql外表注意事项
- Apache doris FE使用ProxySQL实现负载均衡
- Apache doris Datax DorisWriter扩展使用方法
- Apache doris BE配置参数说明
- Apache Doris数据模型
- Apache Doris数据备份及恢复
- Apache Doris常见问题答疑(二)
- Apache Doris常见问题答疑(一)
- Apache Doris安装部署
- Apache Doris在蜀海供应链数仓建设中的实践
- Apache Doris关系模型与数据划分
- Apache Doris 环境安装部署
- Apache Doris 物化视图介绍
- Apache Doris 数据更新操作
- Apache Doris 数据导出
- Apache Doris 数据导入总览
- Apache Doris 数据导入之INSERT
- Apache Doris 排序键及ShortKey Index
- Apache Doris 实战指南
- Apache Doris 升级手册
- Apache Doris 动态分区介绍及使用方法
- Apache Doris 删除数据恢复
- Apache Doris 元数据恢复
- Apache Doris sequence介绍及使用方法
- Apache Doris fe配置参数说明
- Apache Doris Windows下fe开发环境搭建
- Apache Doris Stream Load数据导入
- Apache Doris Stream Load使用示例
- Apache Doris Spark Connector设计方案
- Apache Doris SQL日志审计
- Apache Doris RuntimeFilter 原理及使用
- Apache Doris Routine Load数据导入使用方法
- Apache Doris On ElasticSearche
- Apache Doris Flink Connector设计方案
- Apache Doris FE 元数据故障运维
- Apache Doris Colocate Join 原理及使用
- Apache Doris Bucket Shuffle Join 原理及使用
- Apache Doris Broker数据导入
- Apache Doris Binlog load使用方法
- Apache Doris BE 开发环境搭建
- Apache DORIS安装使用测试报告