Apache Doris Binlog load使用方法

作者: 张家锋

1. 安装配置 Mysql

  1. 安装Mysql 快速使用Docker安装配置Mysql,具体参照下面的连接 https://segmentfault.com/a/1190000021523570 如果是在物理机上安装可以参考下面的连接: 在 CentOS 7 中安装 MySQL 8 的教程详解
  2. 开启Mysql binlog 进入 Docker 容器或者物理机上修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,
    log_bin=mysql_bin
    binlog-format=Row
    server-id=1
    
    然后重启Mysql
    systemctl restart mysqld
    
  3. 创建 Mysql 表
    create database demo;
    
    CREATE TABLE `test_cdc` (
     `id` int NOT NULL AUTO_INCREMENT,
     `sex` TINYINT(1) DEFAULT NULL,
     `name` varchar(20) DEFAULT NULL,
     `address` varchar(255) DEFAULT NULL,
     PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    

2. 安装配置Canal

下载canal-1.1.5: https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

  1. 解压Canal到指定目录:
    tar zxvf canal.deployer-1.1.5.tar.gz -C ./canal
    
  2. 在conf文件夹下新建目录并重命名,作为instance的根目录,目录名你可以自己命名便于识别即可 例如我这里的命名是和我的数据库库名一致:demo
    vi conf/demo/instance.properties
    
    下面给出的是一个我的示例配置: 这里面的参数说明请参考Canal官方文档:QuickStart

mysql serverId , v1.0.26+ will autoGen

canal.instance.mysql.slaveId=12115

enable gtid use true/false

canal.instance.gtidon=false

position info

canal.instance.master.address=10.220.146.11:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=

rds oss binlog

canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId=

table meta tsdb info

canal.instance.tsdb.enable=true

canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

canal.instance.tsdb.dbUsername=canal

canal.instance.tsdb.dbPassword=canal

canal.instance.standby.address =

canal.instance.standby.journal.name =

canal.instance.standby.position =

canal.instance.standby.timestamp =

canal.instance.standby.gtid=

username/password

canal.instance.dbUsername=zhangfeng canal.instance.dbPassword=zhangfeng800729)(*Q canal.instance.connectionCharset = UTF-8

enable druid Decrypt database password

canal.instance.enableDruid=false

canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

table regex

canal.instance.filter.regex=demo\..*

table black regex

canal.instance.filter.black.regex=

table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

mq config

canal.mq.topic=

dynamic topic route by schema or table regex

canal.mq.dynamicTopic=mytest1.user,mytest2\..,.\..*

canal.mq.partition=0

hash partition config

canal.mq.partitionsNum=3

canal.mq.partitionHash=test.table:id^name,.\..

3. 启动Canal

sh bin/startup.sh

> 注意:canal instance user/passwd
>
>1.1.5 版本,在canal.properties里加上这两个配置
>
>canal.user = canal
>canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
>
>默认密码为canal/canal,canal.passwd的密码值可以通过select password("xxx") 来获取 

4. 验证是否启动成功

tail -200f logs/demo/demo.log

   ![image-20211110145044815](https://static.oomspot.com/image/cnbo/2020/image-20211110145044815.png)

## 3.开始同步数据

### 3.1 创建Doris目标表

用户需要先在Doris端创建好与Mysql端对应的目标表
Binlog Load只能支持Unique类型的目标表,且必须激活目标表的Batch Delete功能。

开启Batch Delete的方法可以参考`help alter table`中的批量删除功能。

CREATE TABLE doris_mysql_binlog_demo ( id int NOT NULL, sex TINYINT(1), name varchar(20), address varchar(255) ) ENGINE=OLAP UNIQUE KEY(id,sex) COMMENT “OLAP” DISTRIBUTED BY HASH(sex) BUCKETS 1 PROPERTIES ( “replication_allocation” = “tag.location.default: 3”, “in_memory” = “false”, “storage_format” = “V2” ); – enable batch delete ALTER TABLE test_2.doris_mysql_binlog_demo ENABLE FEATURE “BATCH_DELETE”;

### 3.1 创建同步作业

#### 3.1.1 Create Sync Job 语法说明
Name: 'CREATE SYNC JOB'
Description:

数据同步(Sync Job)功能,支持用户提交一个常驻的数据同步作业,通过从指定的远端地址读取Binlog日志,增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。


目前数据同步作业只支持对接Canal,从Canal Server上获取解析好的Binlog数据,导入到Doris内。


用户可通过 `SHOW SYNC JOB` 查看数据同步作业状态。


语法:

CREATE SYNC [db.]job_name (

channel_desc, 
channel_desc
...

) binlog_desc

1. `job_name`
    同步作业名称,是作业在当前数据库内的唯一标识,相同`job_name`的作业只能有一个在运行。

2. `channel_desc`
    作业下的数据通道,用来描述mysql源表到doris目标表的映射关系。

    语法:         

    ```
    FROM mysql_db.src_tbl INTO des_tbl
    [partitions]
    [columns_mapping]
    ```

    1. `mysql_db.src_tbl`
        指定mysql端的数据库和源表。

    2. `des_tbl`
        指定doris端的目标表,只支持Unique表,且需开启表的batch delete功能(开启方法请看help alter table的'批量删除功能')。

    3. `partitions`
        指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

        示例:
    PARTITION(p1, p2, p3)
    4. `column_mapping`
        指定mysql源表和doris目标表的列之间的映射关系。如果不指定,FE会默认源表和目标表的列按顺序一一对应。

        不支持 col_name = expr 的形式表示列。

        示例:
    假设目标表列为(k1, k2, v1),

    改变列k1和k2的顺序
    COLUMNS(k2, k1, v1)

    忽略源数据的第四列
    COLUMNS(k2, k1, v1, dummy_column)
3. `binlog_desc`
    用来描述远端数据源,目前仅支持canal一种。

    语法:
FROM BINLOG
(
    "key1" = "value1", 
    "key2" = "value2"
)
    1. Canal 数据源对应的属性,以`canal.`为前缀
        1. canal.server.ip: canal server的地址
        2. canal.server.port: canal server的端口
        3. canal.destination: instance的标识
        4. canal.batchSize: 获取的batch大小的最大值,默认8192
        5. canal.username: instance的用户名
        6. canal.password: instance的密码
        7. canal.debug: 可选,设置为true时,会将batch和每一行数据的详细信息都打印出来
Examples:
1. 简单为 `test_db` 的 `test_tbl` 创建一个名为 `job1` 的数据同步作业,连接本地的Canal服务器,对应Mysql源表 `mysql_db1.tbl1`。
        CREATE SYNC `test_db`.`job1`
        (
            FROM `mysql_db1`.`tbl1` INTO `test_tbl `
        )
        FROM BINLOG 
        (
            "type" = "canal",
            "canal.server.ip" = "127.0.0.1",
            "canal.server.port" = "11111",
            "canal.destination" = "example",
            "canal.username" = "",
            "canal.password" = ""
        );

2. 为 `test_db` 的多张表创建一个名为 `job1` 的数据同步作业,一一对应多张Mysql源表,并显式的指定列映射。
        CREATE SYNC `test_db`.`job1` 
        (
            FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2),
            FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1
        ) 
        FROM BINLOG 
        (
            "type" = "canal", 
            "canal.server.ip" = "xx.xxx.xxx.xx", 
            "canal.server.port" = "12111", 
            "canal.destination" = "example",  
            "canal.username" = "username", 
            "canal.password" = "password"
        );

#### 3.1.2 开始同步mysql表里数据到Doris
>注意:
>
>创建同步任务之前,首先要在fe.conf里配置enable_create_sync_job=true,这个默认是false不启用,否则就不能创建同步任务

CREATE SYNC test_2.doris_mysql_binlog_demo_job (

FROM demo.test_cdc INTO doris_mysql_binlog_demo

) FROM BINLOG (

"type" = "canal", 
"canal.server.ip" = "10.220.146.10", 
"canal.server.port" = "11111", 
"canal.destination" = "demo",  
"canal.username" = "canal", 
"canal.password" = "canal"

);

#### 3.1.3 查看同步任务

SHOW SYNC JOB from test_2;

![image-20211110160106602](https://static.oomspot.com/image/cnbo/2020/image-20211110160106602.png)

#### 3.1.4 查看表里的数据

select * from doris_mysql_binlog_demo;

![image-20211110160331479](https://static.oomspot.com/image/cnbo/2020/image-20211110160331479.png)

#### 3.1.5 删除数据

我们在Mysql 数据表里删除数据,然后看Doris表里的变化

delete from test_cdc where id in (12,13)

我们在去看Doris表里,id是12,13这两条数据已经被删除

![image-20211110160710709](https://static.oomspot.com/image/cnbo/2020/image-20211110160710709.png)

#### 3.1.6 多表同步

多表同步只需要像下面这样写法就可以了

CREATE SYNC test_2.doris_mysql_binlog_demo_job (

FROM demo.test_cdc INTO doris_mysql_binlog_demo,
FROM demo.test_cdc_1 INTO doris_mysql_binlog_demo,
FROM demo.test_cdc_2 INTO doris_mysql_binlog_demo,
FROM demo.test_cdc_3 INTO doris_mysql_binlog_demo

)

## 文章列表
- [获得Apache Doris社区之星](https://www.oomspot.com/post/huodeapachedorisshequzhixing)
- [自己写的一个BI可视化系统(支持Apache Doris)](https://www.oomspot.com/post/zijixiedeyigebikeshihuaxitongzhichiapachedoris)
- [神仙打架海底捞火了](https://www.oomspot.com/post/shenxiandajiahaidilaohuole)
- [数据治理方案](https://www.oomspot.com/post/shujuzhilifangan)
- [怎么进行数据仓库分层设计及设计规范](https://www.oomspot.com/post/zenmejinxingshujucangkufencengshejijishejiguifan)
- [实现通过Flink Mysql CDC结合Apache doris flink connector实现数据实时入库](https://www.oomspot.com/post/shixiantongguoflinkmysqlcdcjieheapachedorisflinkco)
- [如何构建公司的数据指标体系](https://www.oomspot.com/post/ruhegoujiangongsideshujuzhibiaotixi)
- [基于Apache-doris怎么构建数据中台(四)-数据接入系统](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaisishujujie)
- [基于Apache-doris怎么构建数据中台(六)-数据服务](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtailiushujufu)
- [基于Apache-doris怎么构建数据中台(八)-数仓管理](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaibashucangg)
- [基于Apache-doris怎么构建数据中台(五)-数据质量](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaiwushujuzhi)
- [基于Apache doris怎么构建数据中台(二)-数据中台建设内容](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaiershujuzho)
- [基于Apache doris怎么构建数据中台(九)-数据安全](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaijiushujuan)
- [基于Apache doris怎么构建数据中台(三)-数据资产管理](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaisanshujuzi)
- [基于Apache doris怎么构建数据中台(一)-什么是数据中台](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaiyishenmesh)
- [基于Apache Doris怎么构建数据中台(七)-数据指标管理](https://www.oomspot.com/post/jiyuapachedoriszenmegoujianshujuzhongtaiqishujuzhi)
- [参与开源两年来的感悟](https://www.oomspot.com/post/canyukaiyuanliangnianlaideganwu)
- [元数据管理系统](https://www.oomspot.com/post/yuanshujuguanlixitong)
- [使用supervisor实现Apache Doris进程自动拉起](https://www.oomspot.com/post/shiyongsupervisorshixianapachedorisjinchengzidongl)
- [使用Grafana监控Apache Doris](https://www.oomspot.com/post/shiyonggrafanajiankongapachedoris)
- [link 使用 SQL 读取 Kafka 利用Doris Flink Connector写入到Doris表中](https://www.oomspot.com/post/linkshiyongsqlduqukafkaliyongdorisflinkconnectorxi)
- [[Doris 社区的访谈]一个人可能走得更快,但一群人会走得更远](https://www.oomspot.com/post/dorisshequdefangtanyigerenkenengzoudegengkuaidanyi)
- [Doris Grafana监控指标介绍](https://www.oomspot.com/post/dorisgrafanajiankongzhibiaojieshao)
- [Apache doris架构及组件介绍](https://www.oomspot.com/post/apachedorisjiagoujizujianjieshao)
- [Apache doris 错误代码说明](https://www.oomspot.com/post/apachedoriscuowudaimashuoming)
- [Apache doris 使用过程中常见问题汇总](https://www.oomspot.com/post/apachedorisshiyongguochengzhongchangjianwentihuizo)
- [Apache doris ODBC外表使用方式](https://www.oomspot.com/post/apachedorisodbcwaibiaoshiyongfangshi)
- [Apache doris ODBC mysql外表注意事项](https://www.oomspot.com/post/apachedorisodbcmysqlwaibiaozhuyishixiang)
- [Apache doris FE使用ProxySQL实现负载均衡](https://www.oomspot.com/post/apachedorisfeshiyongproxysqlshixianfuzaijunheng)
- [Apache doris Datax DorisWriter扩展使用方法](https://www.oomspot.com/post/apachedorisdataxdoriswriterkuozhanshiyongfangfa)
- [Apache doris BE配置参数说明](https://www.oomspot.com/post/apachedorisbepeizhicanshushuoming)
- [Apache Doris数据模型](https://www.oomspot.com/post/apachedorisshujumoxing)
- [Apache Doris数据备份及恢复](https://www.oomspot.com/post/apachedorisshujubeifenjihuifu)
- [Apache Doris常见问题答疑(二)](https://www.oomspot.com/post/apachedorischangjianwentidayier)
- [Apache Doris常见问题答疑(一)](https://www.oomspot.com/post/apachedorischangjianwentidayiyi)
- [Apache Doris安装部署](https://www.oomspot.com/post/apachedorisanzhuangbushu)
- [Apache Doris在蜀海供应链数仓建设中的实践](https://www.oomspot.com/post/apachedoriszaishuhaigongyinglianshucangjianshezhon)
- [Apache Doris关系模型与数据划分](https://www.oomspot.com/post/apachedorisguanximoxingyushujuhuafen)
- [Apache Doris 环境安装部署](https://www.oomspot.com/post/apachedorishuanjinganzhuangbushu)
- [Apache Doris 物化视图介绍](https://www.oomspot.com/post/apachedoriswuhuashitujieshao)
- [Apache Doris 数据更新操作](https://www.oomspot.com/post/apachedorisshujugengxincaozuo)
- [Apache Doris 数据导出](https://www.oomspot.com/post/apachedorisshujudaochu)
- [Apache Doris 数据导入总览](https://www.oomspot.com/post/apachedorisshujudaoruzonglan)
- [Apache Doris 数据导入之INSERT](https://www.oomspot.com/post/apachedorisshujudaoruzhiinsert)
- [Apache Doris 排序键及ShortKey Index ](https://www.oomspot.com/post/apachedorispaixujianjishortkeyindex)
- [Apache Doris 实战指南](https://www.oomspot.com/post/apachedorisshizhanzhinan)
- [Apache Doris 升级手册](https://www.oomspot.com/post/apachedorisshengjishouce)
- [Apache Doris 动态分区介绍及使用方法](https://www.oomspot.com/post/apachedorisdongtaifenqujieshaojishiyongfangfa)
- [Apache Doris 删除数据恢复](https://www.oomspot.com/post/apachedorisshanchushujuhuifu)
- [Apache Doris 元数据恢复](https://www.oomspot.com/post/apachedorisyuanshujuhuifu)
- [Apache Doris sequence介绍及使用方法](https://www.oomspot.com/post/apachedorissequencejieshaojishiyongfangfa)
- [Apache Doris fe配置参数说明](https://www.oomspot.com/post/apachedorisfepeizhicanshushuoming)
- [Apache Doris Windows下fe开发环境搭建](https://www.oomspot.com/post/apachedoriswindowsxiafekaifahuanjingdajian)
- [Apache Doris Stream Load数据导入](https://www.oomspot.com/post/apachedorisstreamloadshujudaoru)
- [Apache Doris Stream Load使用示例](https://www.oomspot.com/post/apachedorisstreamloadshiyongshili)
- [Apache Doris Spark Connector设计方案](https://www.oomspot.com/post/apachedorissparkconnectorshejifangan)
- [Apache Doris SQL日志审计](https://www.oomspot.com/post/apachedorissqlrizhishenji)
- [Apache Doris RuntimeFilter 原理及使用](https://www.oomspot.com/post/apachedorisruntimefilteryuanlijishiyong)
- [Apache Doris Routine Load数据导入使用方法](https://www.oomspot.com/post/apachedorisroutineloadshujudaorushiyongfangfa)
- [Apache Doris On ElasticSearche](https://www.oomspot.com/post/apachedorisonelasticsearche)
- [Apache Doris Flink Connector设计方案](https://www.oomspot.com/post/apachedorisflinkconnectorshejifangan)
- [Apache Doris FE 元数据故障运维](https://www.oomspot.com/post/apachedorisfeyuanshujuguzhangyunwei)
- [Apache Doris Colocate Join 原理及使用](https://www.oomspot.com/post/apachedoriscolocatejoinyuanlijishiyong)
- [Apache Doris Bucket Shuffle Join 原理及使用](https://www.oomspot.com/post/apachedorisbucketshufflejoinyuanlijishiyong)
- [Apache Doris Broker数据导入](https://www.oomspot.com/post/apachedorisbrokershujudaoru)
- [Apache Doris Binlog load使用方法](https://www.oomspot.com/post/apachedorisbinlogloadshiyongfangfa)
- [Apache Doris BE 开发环境搭建](https://www.oomspot.com/post/apachedorisbekaifahuanjingdajian)
- [Apache DORIS安装使用测试报告](https://www.oomspot.com/post/apachedorisanzhuangshiyongceshibaogao)

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多