Apache doris Datax DorisWriter扩展使用方法

作者: 张家锋

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能

为了更好的扩展Apache doris生态,为doris用户提供更方便的数据导入,社区开发扩展支持了Datax DorisWriter,使大家更方便Datax进行数据导入

1.场景

这里演示介绍的使用 Doris 的 Datax 扩展 DorisWriter实现从Mysql数据定时抽取数据导入到Doris数仓表里

2.编译 DorisWriter

这个的扩展的编译可以不在 doris 的 docker 编译环境下进行,本文是在 windows 下的 WLS 下进行编译的

首先从github上拉取源码

 git clone https://github.com/apache/incubator-doris.git

进入到incubator-doris/extension/DataX/ 执行编译

首先执行:

 sh init_env.sh

这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:

  1. 将 DataX 代码库 clone 到本地。
  2. doriswriter/ 目录软链到 DataX/doriswriter 目录。
  3. DataX/pom.xml 文件中添加 <module>doriswriter</module> 模块。
  4. DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13.

    httpclient v4.5 在处理 307 转发时有bug。

这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中,方便开发者提交代码

2.1 开始编译

这里我为了加快编译速度去掉了很多无用的插件:这里直接在Datax目录下的pom.xml里注释掉就行

 hbase11xreader
 hbase094xreader
 tsdbreader
 oceanbasev10reader
 odpswriter
 hdfswriter
 adswriter
 ocswriter
 oscarwriter
 oceanbasev10writer

然后进入到incubator-doris/extension/DataX/ 目录下的 Datax 目录,执行编译

这里我是执行的将 Datax 编译成 tar 包,和官方的编译命令不太一样。

 mvn -U clean package assembly:assembly -Dmaven.test.skip=true

目录结构:

编译完成以后,tar 包在 Datax/target 目录下,你可以将这tar包拷贝到你需要的地方,这里我是直接在 datax 执行测试,这里因为的 python 版本是 3.x版本,需要将 bin 目录下的三个文件换成 python 3能之别的版本,这个你可以去下面的地址下载:

 https://github.com/WeiYe-Jing/datax-web/tree/master/doc/datax-web/datax-python3

将下载的三个文件替换 bin 目录下的文件以后,整个编译,安装就完成了

如果你编译不成功也可以从我的百度网盘上下载编译好的包,注意我上边编译去掉的那些插件

 链接:https://pan.baidu.com/s/1hXYkpkrUE2qW4j98k2Wu7A
 提取码:3azi
  1. 下载 alibaba-datax-maven-m2-20210928.tar.gz
  2. 解压后,将得到的 alibaba/datax/ 目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/ 下。
  3. 再次尝试编译。

3.数据接入

这个时候我们就可以开始使用 Datax 的doriswriter扩展开始从 Mysql(或者其他数据源)直接将数据抽取出来导入到 Doris 表中了。

3.1 Mysql 数据库准备

下面是我数据库的建表脚本(mysql 8):

 CREATE TABLE `order_analysis` (
   `date` varchar(19) DEFAULT NULL,
   `user_src` varchar(9) DEFAULT NULL,
   `order_src` varchar(11) DEFAULT NULL,
   `order_location` varchar(2) DEFAULT NULL,
   `new_order` int DEFAULT NULL,
   `payed_order` int DEFAULT NULL,
   `pending_order` int DEFAULT NULL,
   `cancel_order` int DEFAULT NULL,
   `reject_order` int DEFAULT NULL,
   `good_order` int DEFAULT NULL,
   `report_order` int DEFAULT NULL
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT

示例数据

 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', '广告二维码', 'Android APP', '上海', 15253, 13210, 684, 1247, 1000, 10824, 862);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', '微信朋友圈H5页面', 'iOS APP', '广州', 17134, 11270, 549, 204, 224, 10234, 773);
 INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '地推二维码扫描', 'iOS APP', '北京', 16061, 9418, 1220, 1247, 458, 13877, 749);

3.2 doris数据库准备

下面是我上面数据表在doris对应的建表脚本

 CREATE TABLE `order_analysis` (
   `date` datetime DEFAULT NULL,
   `user_src` varchar(30) DEFAULT NULL,
   `order_src` varchar(50) DEFAULT NULL,
   `order_location` varchar(10) DEFAULT NULL,
   `new_order` int DEFAULT NULL,
   `payed_order` int DEFAULT NULL,
   `pending_order` int DEFAULT NULL,
   `cancel_order` int DEFAULT NULL,
   `reject_order` int DEFAULT NULL,
   `good_order` int DEFAULT NULL,
   `report_order` int DEFAULT NULL
 ) ENGINE=OLAP
 DUPLICATE KEY(`date`,user_src)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`user_src`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

3.3 Datax Job JSON文件

创建并编辑datax job任务json文件,并保存到指定目录

 {
     "job": {
         "setting": {
             "speed": {
                 "channel": 1
            },
             "errorLimit": {
                 "record": 0,
                 "percentage": 0
            }
        },
         "content": [
            {
                 "reader": {
                     "name": "mysqlreader",
                     "parameter": {
                         "username": "root",
                         "password": "zhangfeng",
                         "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order" ],
                         "connection": [ { "table": [ "order_analysis" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/demo" ] } ] }
                },
                 "writer": {
                     "name": "doriswriter",
                     "parameter": {
                         "feLoadUrl": ["fe:8030"],
                         "beLoadUrl": ["be1:8040","be1:8040","be1:8040","be1:8040","be1:8040","be1:8040"],
                         "jdbcUrl": "jdbc:mysql://fe:9030/",
                         "database": "test_2",
                         "table": "order_analysis",
                         "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"],
                         "username": "root",
                         "password": "",
                         "postSql": [],
                         "preSql": [],
                         "loadProps": {
                        },
                         "maxBatchRows" : 10000,
                         "maxBatchByteSize" : 104857600,
                         "labelPrefix": "datax_doris_writer_demo_",
                         "lineDelimiter": "\n"
                    }
                }
            }
        ]
    }
 }

这块 Mysql reader 使用方式参照:

 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

doriswriter的使用及参数说明:

 https://github.com/apache/incubator-doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md

4.执行Datax数据导入任务

 python bin/datax.py doris.json

然后就可以看到执行结果:

再去 Doris 数据库中查看你的表,数据就已经导入进去了,任务执行结束

因为 Datax 的任务是要靠外部触发才能执行,这里你可以使用Linux的crontab或者海豚调度之类的来控制任务运行

文章列表

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多