Apache Doris Colocate Join 原理及使用

作者: 张家锋

1. 概述

Colocation Join 是在 Doris 0.9 版本中引入的新功能。旨在为某些 Join 查询提供本地性优化,来减少数据在节点间的传输耗时,加速查询。

我们都知道 Join 的常见连接类型分为以下几种:

  • INNER JOIN
  • OUTER JOIN
  • CROSS JOIN
  • SEMI JOIN
  • ANTI JOIN Join 的常见算法实现包含以下几种:
  • Nested Loop Join
  • Sort Merge Join
  • Hash Join

分布式系统实现 Join 数据分布的常见策略有:

  • Shuffle Join
  • Broadcast Join
  • Colocate/Local Join Colocate/Local Join 就是指多个节点 Join 时没有数据移动和网络传输,每个节点只在本地进行 Join,能够本地进行 Join 的前提是相同 Join Key 的数据分布在相同的节点。

2.名词解释

  • Colocation Group(CG):一个 CG 中会包含一张及以上的 Table。在同一个 Group 内的 Table 有着相同的 Colocation Group Schema,并且有着相同的数据分片分布。
  • Colocation Group Schema(CGS):用于描述一个 CG 中的 Table,和 Colocation 相关的通用 Schema 信息。包括分桶列类型,分桶数以及副本数等。

3.为什么要使用 Colocate Join

之前Doris 提供了 Shuffle Join 和 Broadcast Join,但是这两种Join 都是需要网络传输数据,都会有不小的网络开销。

举个例子,当前存在A表与B表的Join查询,它的Join方式为HashJoin,不同Join类型的开销如下:

  • Broadcast Join: 如果根据数据分布,查询规划出A表有3个执行的HashJoinNode,那么需要将B表全量的发送到3个HashJoinNode,那么它的网络开销是3B,它的内存开销也是3B
  • Shuffle Join: Shuffle Join会将A,B两张表的数据根据哈希计算分散到集群的节点之中,所以它的网络开销为 A + B,内存开销为B

相比 Shuffle Join 和 Broadcast Join,Colocate Join 在查询时没有数据的网络传输,性能会更高。 在 Doris 的具体实现中,Colocate Join 相比 Shuffle Join 可以拥有更高的并发粒度,也可以显著提升 Join 的性能。

4.原理

对于 colocate tables,在任何情况下都要保证数据的本地性。 具体包括:

  • 数据导入时保证数据本地性
  • 查询调度时保证数据本地性
  • 数据 balance 后保证数据本地性 Colocation Join 功能,是将一组拥有相同 CGS 的 Table 组成一个 CG。并保证这些 Table 对应的数据分片会落在同一个 BE 节点上。使得当 CG 内的表进行分桶列上的 Join 操作时,可以通过直接进行本地数据 Join,减少数据在节点间的传输耗时。

一个表的数据,最终会根据分桶列值 Hash、对桶数取模的后落在某一个分桶内。假设一个 Table 的分桶数为 8,则共有 [0, 1, 2, 3, 4, 5, 6, 7] 8 个分桶(Bucket),我们称这样一个序列为一个 BucketsSequence。每个 Bucket 内会有一个或多个数据分片(Tablet)。当表为单分区表时,一个 Bucket 内仅有一个 Tablet。如果是多分区表,则会有多个。

为了使得 Table 能够有相同的数据分布,同一 CG 内的 Table 必须保证以下属性相同:

  1. 分桶列和分桶数 分桶列,即在建表语句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定的列。分桶列决定了一张表的数据通过哪些列的值进行 Hash 划分到不同的 Tablet 中。同一 CG 内的 Table 必须保证分桶列的类型和数量完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。
  2. 副本数 同一个 CG 内所有表的所有分区(Partition)的副本数必须一致。如果不一致,可能出现某一个 Tablet 的某一个副本,在同一个 BE 上没有其他的表分片的副本对应。

同一个 CG 内的表,分区的个数、范围以及分区列的类型不要求一致。

在固定了分桶列和分桶数后,同一个 CG 内的表会拥有相同的 BucketsSequence。而副本数决定了每个分桶内的 Tablet 的多个副本,存放在哪些 BE 上。假设 BucketsSequence 为 [0, 1, 2, 3, 4, 5, 6, 7],BE 节点有 [A, B, C, D] 4个。则一个可能的数据分布如下:

+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+

| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |

+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+

| A | | B | | C | | D | | A | | B | | C | | D | | | | | | | | | | | | | | | | | | B | | C | | D | | A | | B | | C | | D | | A | | | | | | | | | | | | | | | | |

| C | | D | | A | | B | | C | | D | | A | | B |

+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+

CG 内所有表的数据都会按照上面的规则进行统一分布,这样就保证了,分桶列值相同的数据都在同一个 BE 节点上,可以进行本地数据 Join。

5.使用方式

5.1 建表

建表时,可以在 PROPERTIES 中指定属性 "colocate_with" = "group_name",表示这个表是一个 Colocation Join 表,并且归属于一个指定的 Colocation Group。

示例:

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "group1"
);

如果指定的 Group 不存在,则 Doris 会自动创建一个只包含当前这张表的 Group。如果 Group 已存在,则 Doris 会检查当前表是否满足 Colocation Group Schema。如果满足,则会创建该表,并将该表加入 Group。同时,表会根据已存在的 Group 中的数据分布规则创建分片和副本。 Group 归属于一个 Database,Group 的名字在一个 Database 内唯一。在内部存储是 Group 的全名为 dbId_groupName,但用户只感知 groupName。

5.2 删表

当 Group 中最后一张表彻底删除后(彻底删除是指从回收站中删除。通常,一张表通过 DROP TABLE 命令删除后,会在回收站默认停留一天的时间后,再删除),该 Group 也会被自动删除。

5.3 查看 Group

以下命令可以查看集群内已存在的 Group 信息。

SHOW PROC '/colocation_group';
+-------------+--------------+--------------+------------+----------------+----------+----------+

| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |

+-------------+--------------+--------------+------------+----------------+----------+----------+

| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |

+-------------+--------------+--------------+------------+----------------+----------+----------+
  • GroupId: 一个 Group 的全集群唯一标识,前半部分为 db id,后半部分为 group id。
  • GroupName: Group 的全名。
  • TabletIds: 该 Group 包含的 Table 的 id 列表。
  • BucketsNum: 分桶数。
  • ReplicationNum: 副本数。
  • DistCols: Distribution columns,即分桶列类型。
  • IsStable: 该 Group 是否稳定(稳定的定义,见 Colocation 副本均衡和修复 一节)。

通过以下命令可以进一步查看一个 Group 的数据分布情况:

SHOW PROC '/colocation_group/10005.10008';
+-------------+---------------------+

| BucketIndex | BackendIds |

+-------------+---------------------+

| 0 | 10004, 10002, 10001 | | 1 | 10003, 10002, 10004 | | 2 | 10002, 10004, 10001 | | 3 | 10003, 10002, 10004 | | 4 | 10002, 10004, 10003 | | 5 | 10003, 10002, 10001 | | 6 | 10003, 10004, 10001 |

| 7 | 10003, 10004, 10002 |

+-------------+---------------------+
  • BucketIndex: 分桶序列的下标。
  • BackendIds: 分桶中数据分片所在的 BE 节点 id 列表。

    以上命令需要 ADMIN 权限。暂不支持普通用户查看。

5.4 修改表 Colocate Group 属性

可以对一个已经创建的表,修改其 Colocation Group 属性。示例:

ALTER TABLE tbl SET ("colocate_with" = "group2");
  • 如果该表之前没有指定过 Group,则该命令检查 Schema,并将该表加入到该 Group(Group 不存在则会创建)。
  • 如果该表之前有指定其他 Group,则该命令会先将该表从原有 Group 中移除,并加入新 Group(Group 不存在则会创建)。

也可以通过以下命令,删除一个表的 Colocation 属性:

ALTER TABLE tbl SET ("colocate_with" = "");

5.5 其他相关操作

当对一个具有 Colocation 属性的表进行增加分区(ADD PARTITION)、修改副本数时,Doris 会检查修改是否会违反 Colocation Group Schema,如果违反则会拒绝。

5.6 Colocation 副本均衡和修复

Colocation 表的副本分布需要遵循 Group 中指定的分布,所以在副本修复和均衡方面和普通分片有所区别。 Group 自身有一个 Stable 属性,当 Stable 为 true 时,表示当前 Group 内的表的所有分片没有正在进行变动,Colocation 特性可以正常使用。当 Stable 为 false 时(Unstable),表示当前 Group 内有部分表的分片正在做修复或迁移,此时,相关表的 Colocation Join 将退化为普通 Join。

5.7 副本修复

副本只能存储在指定的 BE 节点上。所以当某个 BE 不可用时(宕机、Decommission 等),需要寻找一个新的 BE 进行替换。Doris 会优先寻找负载最低的 BE 进行替换。替换后,该 Bucket 内的所有在旧 BE 上的数据分片都要做修复。迁移过程中,Group 被标记为 Unstable。

5.8 副本均衡

Doris 会尽力将 Colocation 表的分片均匀分布在所有 BE 节点上。对于普通表的副本均衡,是以单副本为粒度的,即单独为每一个副本寻找负载较低的 BE 节点即可。而 Colocation 表的均衡是 Bucket 级别的,即一个 Bucket 内的所有副本都会一起迁移。我们采用一个简单的均衡算法,即在不考虑副本实际大小,而只根据副本数量,将 BucketsSequence 均匀的分布在所有 BE 上。具体算法可以参阅 ColocateTableBalancer.java 中的代码注释。

注1:当前的 Colocation 副本均衡和修复算法,对于异构部署的 Doris 集群效果可能不佳。所谓异构部署,即 BE 节点的磁盘容量、数量、磁盘类型(SSD 和 HDD)不一致。在异构部署情况下,可能出现小容量的 BE 节点和大容量的 BE 节点存储了相同的副本数量。

注2:当一个 Group 处于 Unstable 状态时,其中的表的 Join 将退化为普通 Join。此时可能会极大降低集群的查询性能。如果不希望系统自动均衡,可以设置 FE 的配置项 disable_colocate_balance 来禁止自动均衡。然后在合适的时间打开即可。(具体参阅 FE高级配置 一节)

5.9 查询

对 Colocation 表的查询方式和普通表一样,用户无需感知 Colocation 属性。如果 Colocation 表所在的 Group 处于 Unstable 状态,将自动退化为普通 Join。

举例说明:

表1:

CREATE TABLE `tbl1` (
    `k1` date NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
    PARTITION p1 VALUES LESS THAN ('2019-05-31'),
    PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);

表2:

CREATE TABLE `tbl2` (
    `k1` datetime NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);

注意

  1. 这里的表1和表2的AGGREGATE KEY都是k1,k2,分桶列都是k2,分桶数都是8,如果分同列和分桶数不一致,那么Colocate Join将不起作用
  2. 同时也要保持两张表的副本数数一致的
  3. colocate_with的名称也要一致。

查看查询计划:

DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+----------------------------------------------------+

| Explain String |

+----------------------------------------------------+

| PLAN FRAGMENT 0 | | OUTPUT EXPRS:tbl1.k1 | | | PARTITION: RANDOM | | | | RESULT SINK | | | | 2:HASH JOIN | | | join op: INNER JOIN | | | hash predicates: | | | colocate: true | | | tbl1.k2 = tbl2.k2 | | | tuple ids: 0 1 |

| | | | |—-1:OlapScanNode | | | TABLE: tbl2 | | | PREAGGREGATION: OFF. Reason: null | | | partitions=0/1 | | | rollup: null | | | buckets=0/0 | | | cardinality=-1 | | | avgRowSize=0.0 | | | numNodes=0 | | | tuple ids: 1 | | | | | 0:OlapScanNode | | TABLE: tbl1 | | PREAGGREGATION: OFF. Reason: No AggregateInfo | | partitions=0/2 | | rollup: null | | buckets=0/0 | | cardinality=-1 | | avgRowSize=0.0 | | numNodes=0 |

| tuple ids: 0 |

+----------------------------------------------------+

如果 Colocation Join 生效,则 Hash Join 节点会显示 colocate: true

如果没有生效,则查询计划如下:

+----------------------------------------------------+

| Explain String |

+----------------------------------------------------+

| PLAN FRAGMENT 0 | | OUTPUT EXPRS:tbl1.k1 | | | PARTITION: RANDOM | | | | RESULT SINK | | | | 2:HASH JOIN | | | join op: INNER JOIN (BROADCAST) | | | hash predicates: | | | colocate: false, reason: group is not stable | | | tbl1.k2 = tbl2.k2 | | | tuple ids: 0 1 |

| | | | |—-3:EXCHANGE | | | tuple ids: 1 | | | | | 0:OlapScanNode | | TABLE: tbl1 | | PREAGGREGATION: OFF. Reason: No AggregateInfo | | partitions=0/2 | | rollup: null | | buckets=0/0 | | cardinality=-1 | | avgRowSize=0.0 | | numNodes=0 | | tuple ids: 0 | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 1:OlapScanNode | | TABLE: tbl2 | | PREAGGREGATION: OFF. Reason: null | | partitions=0/1 | | rollup: null | | buckets=0/0 | | cardinality=-1 | | avgRowSize=0.0 | | numNodes=0 |

| tuple ids: 1 |

+----------------------------------------------------+

HASH JOIN 节点会显示对应原因:colocate: false, reason: group is not stable。同时会有一个 EXCHANGE 节点生成

6. 使用场景及限制

6.1 使用场景

Colocate Join 十分适合几张表按照相同字段分桶,并高频根据相同字段 Join 的场景,比如电商的不少应用都按照商家 Id 分桶,并高频按照商家 Id 进行 Join。

6.2 使用限制

  1. Colocate Table 必须是 OLAP 类型的表
  2. colocate_with 属性相同表的 BUCKET 数必须一样
  3. colocate_with 属性相同表的 副本数必须一样 (这个限制之后可能会去掉,但对用户应该没有实际影响)
  4. colocate_with 属性相同表的 DISTRIBUTED Columns 的数据类型必须一样

7.FE高级配置

  • disable_colocate_relocate 是否关闭 Doris 的自动 Colocation 副本修复。默认为 false,即不关闭。该参数只影响 Colocation 表的副本修复,不影响普通表。
  • disable_colocate_balance 是否关闭 Doris 的自动 Colocation 副本均衡。默认为 false,即不关闭。该参数只影响 Colocation 表的副本均衡,不影响普通表。

以上参数可以动态修改,设置方式请参阅 HELP ADMIN SHOW CONFIG;HELP ADMIN SET CONFIG;

  • disable_colocate_join 是否关闭 Colocation Join 功能。在 0.10 及之前的版本,默认为 true,即关闭。在之后的某个版本中将默认为 false,即开启。
  • use_new_tablet_scheduler 在 0.10 及之前的版本中,新的副本调度逻辑与 Colocation Join 功能不兼容,所以在 0.10 及之前版本,如果 disable_colocate_join = false,则需设置 use_new_tablet_scheduler = false,即关闭新的副本调度器。之后的版本中,use_new_tablet_scheduler 将衡为 true。

文章列表

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多