Apache Doris RuntimeFilter 原理及使用

作者: 张家锋

1. 介绍

Runtime Filter 是在 Doris 0.15 版本中正式加入的新功能。旨在为某些 Join 查询在运行时动态生成过滤条件,来减少扫描的数据量,避免不必要的I/O和网络传输,从而加速查询。 Runtime Filter 目前不支持所有外表,只支持 Doris 内部的表 Runtime Filter主要用于优化针对大表的join,如果左表的数据量太小,或者右表的数据量太大,则Runtime Filter可能不会取得预期效果 Runtime Filte r和谓词下推、分区裁剪不同,Runtime Filter是在运行时动态生成的过滤条件,即在查询运行时解析join on clause 确定过滤表达式,并将表达式广播给正在读取左表的 ScanNode,从而减少扫描的数据量,进而减少 probe hash table 的次数,避免不必要的I/O和网络传输,从而达到提升性能的效果。

2.Runtime Filter相关解释

  • 左表:Join查询时,左边的表。进行Probe操作。可被Join Reorder调整顺序。
  • 右表:Join查询时,右边的表。进行Build操作。可被Join Reorder调整顺序。
  • Fragment:FE会将具体的SQL语句的执行转化为对应的Fragment并下发到BE进行执行。BE上执行对应Fragment,并将结果汇聚返回给FE。
  • Join on clause: A join B on A.a=B.b中的A.a=B.b,在查询规划时基于此生成join conjuncts,包含join Build和Probe使用的expr,其中Build expr在Runtime Filter中称为src expr,Probe expr在Runtime Filter中称为target expr

3.使用方式

3.1 Runtime Filter 查询选项

与Runtime Filter相关的查询选项信息,请参阅以下部分:

  • 第一个查询选项是调整使用的Runtime Filter类型,大多数情况下,您只需要调整这一个选项,其他选项保持默认即可。
  • runtime_filter_type: 包括Bloom Filter、MinMax Filter、IN predicate,默认会保守的只使用IN predicate,部分情况下同时使用Bloom Filter、MinMax Filter、IN predicate时性能更高。
  • 其他查询选项通常仅在某些特定场景下,才需进一步调整以达到最优效果。通常只在性能测试后,针对资源密集型、运行耗时足够长且频率足够高的查询进行优化。
  • runtime_filter_mode: 用于调整Runtime Filter的下推策略,包括OFF、LOCAL、GLOBAL三种策略,默认设置为GLOBAL策略
  • runtime_filter_wait_time_ms: 左表的ScanNode等待每个Runtime Filter的时间,默认1000ms
  • runtime_filters_max_num: 每个查询可应用的Runtime Filter中Bloom Filter的最大数量,默认10
  • runtime_bloom_filter_min_size: Runtime Filter中Bloom Filter的最小长度,默认1048576(1M)
  • runtime_bloom_filter_max_size: Runtime Filter中Bloom Filter的最大长度,默认16777216(16M)
  • runtime_bloom_filter_size: Runtime Filter中Bloom Filter的默认长度,默认2097152(2M)
  • runtime_filter_max_in_num: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认1024

下面对查询选项做进一步说明

3.1.1 runtime_filter_type

使用的Runtime Filter类型。 类型: 数字(1, 2, 4)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX),默认1(IN predicate),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:

set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";

等价于:

set runtime_filter_type=7;

使用注意事项

  • Bloom Filter: 有一定的误判率,导致过滤的数据比预期少一点,但不会导致最终结果不准确,在大部分情况下Bloom Filter都可以提升性能或对性能没有显著影响,但在部分情况下会导致性能降低。
  • Bloom Filter构建和应用的开销较高,所以当过滤率较低时,或者左表数据量较少时,Bloom Filter可能会导致性能降低。
  • 目前只有左表的Key列应用Bloom Filter才能下推到存储引擎,而测试结果显示Bloom Filter不下推到存储引擎时往往会导致性能降低。
  • 目前Bloom Filter仅在ScanNode上使用表达式过滤时有短路(short-circuit)逻辑,即当假阳性率过高时,不继续使用Bloom Filter,但当Bloom Filter下推到存储引擎后没有短路逻辑,所以当过滤率较低时可能导致性能降低。
  • MinMax Filter: 包含最大值和最小值,从而过滤小于最小值和大于最大值的数据,MinMax Filter的过滤效果与join on clause中Key列的类型和左右表数据分布有关。
  • 当join on clause中Key列的类型为int/bigint/double等时,极端情况下,如果左右表的最大最小值相同则没有效果,反之右表最大值小于左表最小值,或右表最小值大于左表最大值,则效果最好。
  • 当join on clause中Key列的类型为varchar等时,应用MinMax Filter往往会导致性能降低。
  • IN predicate: 根据join on clause中Key列在右表上的所有值构建IN predicate,使用构建的IN predicate在左表上过滤,相比Bloom Filter构建和应用的开销更低,在右表数据量较少时往往性能更高。
  • 默认只有右表数据行数少于1024才会下推(可通过session变量中的runtime_filter_max_in_num调整)。
  • 目前IN predicate没有实现合并方法,即无法跨Fragment下推,所以目前当需要下推给shuffle join左表的ScanNode时,如果没有生成Bloom Filter,那么我们会将IN predicate转为Bloom Filter,用于处理跨Fragment下推,所以即使类型只选择了IN predicate,实际也可能应用了Bloom Filter;

3.1.2 runtime_filter_mode

用于控制Runtime Filter在instance之间传输的范围。 类型: 数字(0, 1, 2)或者相对应的助记符字符串(OFF, LOCAL, GLOBAL),默认2(GLOBAL)。 使用注意事项 LOCAL:相对保守,构建的Runtime Filter只能在同一个instance(查询执行的最小单元)上同一个Fragment中使用,即Runtime Filter生产者(构建Filter的HashJoinNode)和消费者(使用RuntimeFilter的ScanNode)在同一个Fragment,比如broadcast join的一般场景; GLOBAL:相对激进,除满足LOCAL策略的场景外,还可以将Runtime Filter合并后通过网络传输到不同instance上的不同Fragment中使用,比如Runtime Filter生产者和消费者在不同Fragment,比如shuffle join。

大多数情况下GLOBAL策略可以在更广泛的场景对查询进行优化,但在有些shuffle join中生成和合并Runtime Filter的开销超过给查询带来的性能优势,可以考虑更改为LOCAL策略。

如果集群中涉及的join查询不会因为Runtime Filter而提高性能,您可以将设置更改为OFF,从而完全关闭该功能。

3.1.3 runtime_filter_wait_time_ms

Runtime Filter的等待耗时。 类型: 整数,默认1000,单位ms 使用注意事项

在开启Runtime Filter后,左表的ScanNode会为每一个分配给自己的Runtime Filter等待一段时间再扫描数据,即如果ScanNode被分配了3个Runtime Filter,那么它最多会等待3000ms。

因为Runtime Filter的构建和合并均需要时间,ScanNode会尝试将等待时间内到达的Runtime Filter下推到存储引擎,如果超过等待时间后,ScanNode会使用已经到达的Runtime Filter直接开始扫描数据。

如果Runtime Filter在ScanNode开始扫描之后到达,则ScanNode不会将该Runtime Filter下推到存储引擎,而是对已经从存储引擎扫描上来的数据,在ScanNode上基于该Runtime Filter使用表达式过滤,之前已经扫描的数据则不会应用该Runtime Filter,这样得到的中间数据规模会大于最优解,但可以避免严重的裂化。

如果集群比较繁忙,并且集群上有许多资源密集型或长耗时的查询,可以考虑增加等待时间,以避免复杂查询错过优化机会。如果集群负载较轻,并且集群上有许多只需要几秒的小查询,可以考虑减少等待时间,以避免每个查询增加1s的延迟

3.1.4 runtime_filters_max_num

每个查询生成的Runtime Filter中Bloom Filter数量的上限。 类型: 整数,默认10 使用注意事项 目前仅对Bloom Filter的数量进行限制,因为相比MinMax Filter和IN predicate,Bloom Filter构建和应用的代价更高。

如果生成的Bloom Filter超过允许的最大数量,则保留选择性大的Bloom Filter,选择性大意味着预期可以过滤更多的行。这个设置可以防止Bloom Filter耗费过多的内存开销而导致潜在的问题。

选择性=(HashJoinNode Cardinality / HashJoinNode left child Cardinality)
-- 因为目前FE拿到Cardinality不准,所以这里Bloom Filter计算的选择性与实际不准,因此最终可能只是随机保留了部分Bloom Filter。

仅在对涉及大表间join的某些长耗时查询进行调优时,才需要调整此查询选项

3.1.5 Bloom Filter长度相关参数

包括runtime_bloom_filter_min_sizeruntime_bloom_filter_max_sizeruntime_bloom_filter_size,用于确定Runtime Filter使用的Bloom Filter数据结构的大小(以字节为单位)。 类型: 整数 使用注意事项 因为需要保证每个HashJoinNode构建的Bloom Filter长度相同才能合并,所以目前在FE查询规划时计算Bloom Filter的长度。

如果能拿到join右表统计信息中的数据行数(Cardinality),会尝试根据Cardinality估计Bloom Filter的最佳大小,并四舍五入到最接近的2的幂(以2为底的log值)。如果无法拿到右表的Cardinality,则会使用默认的Bloom Filter长度runtime_bloom_filter_sizeruntime_bloom_filter_min_sizeruntime_bloom_filter_max_size用于限制最终使用的Bloom Filter长度最小和最大值。

更大的Bloom Filter在处理高基数的输入集时更有效,但需要消耗更多的内存。假如查询中需要过滤高基数列(比如含有数百万个不同的取值),可以考虑增加runtime_bloom_filter_size的值进行一些基准测试,这有助于使Bloom Filter过滤的更加精准,从而获得预期的性能提升。 Bloom Filter的有效性取决于查询的数据分布,因此通常仅对一些特定查询额外调整其Bloom Filter长度,而不是全局修改,一般仅在对涉及大表间join的某些长耗时查询进行调优时,才需要调整此查询选项

3.2 查看 Query 生成的 Runtime Filter

explain命令可以显示的查询计划中包括每个Fragment使用的join on clause信息,以及Fragment生成和使用Runtime Filter的注释,从而确认是否将Runtime Filter应用到了期望的join on clause上。

  • 生成Runtime Filter的Fragment包含的注释例如runtime filters: filter_id[type] <- table.column
  • 使用Runtime Filter的Fragment包含的注释例如runtime filters: filter_id[type] -> table.column

下面例子中的查询使用了一个ID为RF000的Runtime Filter。

CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2 PROPERTIES("replication_num" = "1");
INSERT INTO test VALUES (1), (2), (3), (4);
CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2 PROPERTIES("replication_num" = "1");
INSERT INTO test2 VALUES (3), (4), (5);

EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+-------------------------------------------------------------------+

| Explain String |

+-------------------------------------------------------------------+

| PLAN FRAGMENT 0 | | OUTPUT EXPRS:t1 | | | | 4:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: HASH_PARTITIONED: default_cluster:ssb.test.t1 | | | | 2:HASH JOIN | | | join op: INNER JOIN (BUCKET_SHUFFLE) | | | equal join conjunct: test.t1 = test2.t2 | | | runtime filters: RF000[in] <- test2.t2 |

| | | | |—-3:EXCHANGE | | | | | 0:OlapScanNode | | TABLE: test | | runtime filters: RF000[in] -> test.t1 | | | | PLAN FRAGMENT 2 | | OUTPUT EXPRS: | | PARTITION: HASH_PARTITIONED: default_cluster:ssb.test2.t2 | | | | 1:OlapScanNode |

| TABLE: test2 |

+-------------------------------------------------------------------+
-- 上面`runtime filters`的行显示了`PLAN FRAGMENT 1`的`2:HASH JOIN`生成了ID为RF000的IN predicate,
-- 其中`test2`.`t2`的key values仅在运行时可知,
-- 在`0:OlapScanNode`使用了该IN predicate用于在读取`test`.`t1`时过滤不必要的数据。
SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2; 
-- 返回2行结果[3, 4];
-- 通过query的profile(set is_report_success=true;)可以查看查询内部工作的详细信息,
-- 包括每个Runtime Filter是否下推、等待耗时、以及OLAP_SCAN_NODE从prepare到接收到Runtime Filter的总时长。
RuntimeFilter:in:
    -  HasPushDownToEngine:  true
    -  AWaitTimeCost:  0ns
    -  EffectTimeCost:  2.76ms
-- 此外,在profile的OLAP_SCAN_NODE中还可以查看Runtime Filter下推后的过滤效果和耗时。
    -  RowsVectorPredFiltered:  9.320008M  (9320008)
    -  VectorPredEvalTime:  364.39ms

4. Runtime Filter 的规划规则

  1. 只支持对join on clause中的等值条件生成Runtime Filter,不包括Null-safe条件,因为其可能会过滤掉join左表的null值。
  2. 不支持将Runtime Filter下推到left outer、full outer、anti join的左表;
  3. 不支持src expr或target expr是常量;
  4. 不支持src expr和target expr相等;
  5. 不支持src expr的类型等于HLL或者BITMAP
  6. 目前仅支持将Runtime Filter下推给OlapScanNode;
  7. 不支持target expr包含NULL-checking表达式,比如COALESCE/IFNULL/CASE,因为当outer join上层其他join的join on clause包含NULL-checking表达式并生成Runtime Filter时,将这个Runtime Filter下推到outer join的左表时可能导致结果不正确;
  8. 不支持target expr中的列(slot)无法在原始表中找到某个等价列;
  9. 不支持列传导,这包含两种情况:
  10. 一是例如join on clause包含A.k = B.k and B.k = C.k时,目前C.k只可以下推给B.k,而不可以下推给A.k;
  11. 二是例如join on clause包含A.a + B.b = C.c,如果A.a可以列传导到B.a,即A.a和B.a是等价的列,那么可以用B.a替换A.a,然后可以尝试将Runtime Filter下推给B(如果A.a和B.a不是等价列,则不能下推给B,因为target expr必须与唯一一个join左表绑定);
  12. Target expr和src expr的类型必须相等,因为Bloom Filter基于hash,若类型不等则会尝试将target expr的类型转换为src expr的类型;
  13. 不支持PlanNode.Conjuncts生成的Runtime Filter下推,与HashJoinNode的eqJoinConjunctsotherJoinConjuncts不同,PlanNode.Conjuncts生成的Runtime Filter在测试中发现可能会导致错误的结果,例如IN子查询转换为join时,自动生成的join on clause将保存在PlanNode.Conjuncts中,此时应用Runtime Filter可能会导致结果缺少一些行。

文章列表

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多