年轻有为的猴子 · 青年艺术家访谈 | ...· 2 周前 · |
含蓄的钢笔 · 《国际经济评论》:(董秀成)新一轮巴以冲突对 ...· 3 月前 · |
爱热闹的打火机 · 苏州要有机场了?这次是通用机场_腾讯新闻· 4 月前 · |
不爱学习的芒果 · 原味复刻做工欠奉,手套魂NIKE ZOOM ...· 7 月前 · |
爱吹牛的人字拖 · 这就告诉你如何看po文,不用再问啦· 11 月前 · |
更新时间: 2023.06.07 10:05:57
唯一键引擎(HaUniqueMergeTree) 是 ByteHouse 自研的一款既保留了 ClickHouse 高效的查询性能、又支持主键更新的表引擎。它解决了社区版 ClickHouse 不能支持高效更新操作的痛点,帮助业务更简单地开发实时分析应用。
用户通过 UNIQUE KEY 配置唯一键,提供 upsert 更新写语义,查询自动返回每个唯一键的最新值。(和社区的 ReplacingMergeTree 相比,ReplacingMergeTree 在数据导入后需要等待 Merge 完成,才可以查到去重后的数据,而 HaUniqueMergeTree 则是即导入即生效的)
性能:单shard写入吞吐一般可以达到10k+ rows/s;查询性能与普通 MergeTree 表几乎相同
唯一键支持多字段和表达式(目前支持最多三个字段)
支持分区级别唯一和表级别唯一两种模式
支持自定义版本字段,写入低版本数据时自动忽略
多副本部署,通过主备异步复制保障数据可靠性
支持通过删除字段,对行进行删除
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], `_delete_flag_` UInt8 ) ENGINE = HaUniqueMergeTree(shard, replica, version_column) -- 默认为 '/clickhouse/bytehouse/库名.表名/{shard}','{replica}' PARTITION BY toYYYYMM(EventDate) ORDER BY expr [PARTITION BY expr] [PRIMARY KEY expr] UNIQUE KEY expr [SAMPLE BY expr] [TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx' [, ...] ] [WHERE conditions] [GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ] [SETTINGS name=value, ...]
Unique Key
设置:支持字段(但不支持 Nullable,也不支持 Map,Array 等复合类型),也支持表达式,最多三个值:
例:
UNIQUE KEY product_id, sipHash64(city)
若超过三个值,可以对多个字段使用 hash,例:
UNIQUE KEY sipHash64(val1,val2,val3....)
version_column
(版本字段): 选择一个字段作为版本控制的依据,用于根据版本更新,使用示例可查看例2。在设计表结构时,建议优先考虑分区值作为版本,减少内存占用。
`_delete_flag_` UInt8
(删除字段):作为删除标记,若该字段传 1 则代表删除该唯一键指代的行,0 则代表不删除。为表内部的保留字段,因此其他字段不可占用
_delete_flag_
列。使用示例可查看例4。
其他的字段设置,如
Order By
,
Partition By
等,和 MergeTree 家族的其他引擎的设置规则一致。
参数名 | 常用字段 | 默认值 | 说明 |
---|---|---|---|
partition_level_unique_keys |
是 |
1 |
0:UNIQUE KEY 表粒度唯一
|
enable_unique_partial_update | 是 | 0 | 允许部分列更新写入(需要新引擎版本支持,相关文档暂未更新) |
enable_disk_based_unique_key_index |
是 |
0 |
0:in-memory mode。使用方式会在每张unique表会维护一个in-memory key index,因此能支撑的数据量受限于内存 ;
|
假设表Schema如下:
CREATE TABLE t2 `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ENGINE = HaUniqueMergeTree('xxxxxxx') PARTITION BY toDate(event_time) --分区字段 ORDER BY (city, category) --排序字段 UNIQUE KEY product_id --唯一键 SETTINGS partition_level_unique_keys = 0; --设置表级别唯一
顺序插入以下测试数据:
INSERT INTO t2 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100); INSERT INTO t2 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 4, 400), ('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10001, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10002, 'Beijing', '男装', 2, 200);
可以看到,10001,10002,10003 这三个产品都更新到了最新数据,且10001,10002 都从 2020-10-29 分区更新到了 2020-10-30 分区。
select * from t2 order by toDate(event_time), product_id; ┌──────event_time─┬product_id─┬─city──┬category─┬amount─┬revenue─┐ │ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10001 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ └─────────────┴───────┴─────┴──────┴─────┴─────┘
默认情况下,相同 unique key 后写入的数据会覆盖已有的数据。这可能会带来以下问题
回溯上游数据时,老数据可能覆盖新数据,导致查询到的数据结果出现回退
Lambda 架构下,如果离线和实时任务同时写一个分区,最终保留哪条数据取决于任务的执行顺序
为了解决上面的问题,HaUniqueMergeTree 支持将表中的某个字段指定为版本字段。引擎保证写入相同 key 的数据时,只有数据版本 >= 已有版本时,才会进行覆盖。版本字段支持所有UInt类型和Data/DateTime,且不能为 Nullable。
假设schema如下:
CREATE TABLE t3 `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ENGINE = HaUniqueMergeTree('/clickhouse/default/t3/{shard}', '{replica}', event_time) --event_time为版本字段 PARTITION BY toDate(event_time) --分区字段 ORDER BY (city, category) --排序字段 UNIQUE KEY product_id; --唯一键
顺序插入以下数据:
INSERT INTO t3 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 8, 800), ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 5, 500);
结果保留后两条。
select * from t3 order by toDate(event_time), product_id; ┌──────event_time─┬product_id─┬─city──┬category─┬amount─┬revenue─┐ │ 2020-10-29 23:50:00 │ 10001 │ Beijing │ 男装 │ 8 │ 800 │ │ 2020-10-29 23:50:00 │ 10002 │ Beijing │ 男装 │ 5 │ 500 │ └─────────────┴───────┴─────┴──────┴─────┴─────┘
若在此时重新导入回溯前两条数据。
INSERT INTO t3 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200);
由于版本 < 已有版本,写入时自动跳过。10001 和 10002 的版本没有回退。
select * from t3 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:50:00 │ 10001 │ Beijing │ 男装 │ 8 │ 800 │ │ 2020-10-29 23:50:00 │ 10002 │ Beijing │ 男装 │ 5 │ 500 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
在某些应用场景下,用户希望在INSERT时加上一个字段来标识是否删除来扩展INSERT语义。
在HaUniqueMergeTree引擎中,为每张表都添加了一个保留字段
_delete_flag_
,类型为
UInt8
, 0 表示数据写入,非 0 表示数据删除。
假设schema如下:
CREATE TABLE t1 `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64, `_delete_flag_` As `delete` UInt8 --delete为删除字段 ENGINE = HaUniqueMergeTree(xxxx) PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id; --唯一键
导入以下数据:
注:需要选择单个节点,并插入本地表来使用 delete flag 做数据删除
INSERT INTO t1_local (event_time, product_id, city, category, amount, revenue, _delete_flag_) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500, 0), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200, 0), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100, 0), ('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 4, 400, 5), ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 2, 200, 1), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100, 0);
查询结果中包含了新加入的一行数据,并删除了两行旧数据:
select * from t1 order by toDate(event_time), product_id; ┌──────event_time─┬─product_id┬─city──┬─category┬amount─┬revenue─┐ │ 2020-10-29 23:40:00 │ 10003 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ └─────────────┴───────┴─────┴──────┴─────┴─────┘
针对某个 where 条件对多行数据进行批量删除的方式如下:
insert into `t1_local` (*, _delete_flag_) select *, 1 as _delete_flag_ from `t1_local` where product_id=10001;
使用条件:
仅支持partition-level的部分列更新
Unique key必须是Order-by key的 前缀
行更新模式:缺省列采用默认值填充
部分列更新模式:缺省列如果有原值则保留,否则填充默认值
unique表指定变量 enable_unique_partial_update = 1 后允许写入以部分列更新模式进行,默认关闭。
部分列写入模式当且仅当unique表开启了部分列更新功能才有效,否则等效为行更新模式:
对于交互式写入,Unique表默认为部分列更新模式,指定会话变量 enable_unique_partial_update = 0后切换到行更新模式。即参数 enable_unique_partial_update 默认值为1。
对于Kafka实时写入,kafka表新增 enable_unique_partial_update 参数(默认值为1),1表示kafka消费使用部分列更新模式,0表示使用行更新模式。
类型的默认值
数值类型:0
字符串类型:''
Nullable类型:null
Map类型(更新规则比较特殊,详见下方 5.列更新规则-Map类型 ):{}
Array类型:[]
部分列更新写入模式时如果表包含版本字段
写入版本<当前版本,忽略写入行
写入版本>=当前版本,应用列更新规则
写入未指定版本时,版本字段取默认值
列更新规则
方式一:按功能列_update_columns_(String类型)区分更新列方案的列更新规则
_update_columns_中的内容是需要更新的列,以 逗号分隔各列名 ,引擎在解析时 不会处理列名前后的特殊字符 ,如空格、Tab、换行符等,且 不支持正则表达式 。
当_update_columns_为空时表示更新所有列
非Map类型:写入非默认值表示更新,不允许更新为默认值
Map类型: partial_update_enable_merge_map = true 时对有旧值的key进行更新,对无旧值的key进行写入;为false时直接替换value
CREATE TABLE t1 ( k Int32, c1 Int32, c2 Nullable(Float64), c3 Nullable(String), c4 Nullable(Int64), m1 Map(String, Int32), a1 Array(String)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 1, partial_update_enable_merge_map = 0; SET enable_unique_partial_update = 1; INSERT INTO t1 (k, c1, c2, m1, a1) VALUES (1, 10, 3.14, {'k1':1}, ['hello']); -- 此时解析时会填充_update_columns_为'k,c1,c2,m1,a1' ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k1':1} │ ['hello'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c2, m1, a1, _update_columns_) VALUES (1, 20, 31.4, {'k2':2}, ['world'], 'k,c1,m1,a1'); ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 20 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k2':2} │ ['world'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c3, m1) VALUES (1, 0, 'foo', {'k3':3}); -- 此时解析是会填充_update_columns_为'k,c1,c3,m1' -- 等价于INSERT INTO t1 (k, c1, c3, m1, _update_columns_) VALUES (1, 0, 'foo', {'k2':2}, 'k, c1, c3, m1'); -- 此时c1会强制更新为0,c3强制更新为'foo',m1强制更新为'k3':3 ┌─k─┬─c1─┬───c2─┬─c3──┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 0 │ 3.14 │ foo │ ᴺᵁᴸᴸ │ {'k3':3} │ ['world'] │ └───┴────┴──────┴─────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c2, c3, c4, m1, a1, _update_columns_) VALUES (1, 10, 31.4, 'goo', 15, {'k4': 4}, ['hello', 'world'], ''); -- _update_columns_为空时表示更新所有列 ┌─k─┬─c1─┬───c2─┬─c3──┬─c4─┬─m1───────┬─a1────────────────┐ │ 1 │ 10 │ 31.4 │ goo │ 15 │ {'k4':4} │ ['hello','world'] │ └───┴────┴──────┴─────┴────┴──────────┴───────────────────┘
方式二:按类型默认值区分更新列方案的列更新规则
非Map类型:写入非默认值表示更新,不允许更新为默认值
Map类型: partial_update_enable_merge_map = true 时对有旧值的key进行更新,对无旧值的key进行写入;为false时直接替换value
CREATE TABLE t1 ( k Int32, c1 Int32, c2 Nullable(Float64), c3 Nullable(String), c4 Nullable(Int64), m1 Map(String, Int32), a1 Array(String)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 0, partial_update_enable_merge_map = 1; SET enable_unique_partial_update = 1; INSERT INTO t1 (k, c1, c2, m1, a1) VALUES (1, 10, 3.14, {'k1':1}, ['hello']); ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k1':1} │ ['hello'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c3, m1) VALUES (1, 0, 'foo', {'k2':2}); -- 对于未指定字段 -- - c1为非组合类型,但写入的是默认值,因此保留旧值 -- - c2/c4为Nullable类型,保留旧值 -- - a1为Array空值,保留旧值 -- 对于m1,更新m1{'k2'},保留m1{'k1'} ┌─k─┬─c1─┬───c2─┬─c3──┬───c4─┬─m1──────────────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ foo │ ᴺᵁᴸᴸ │ {'k1':1,'k2':2} │ ['hello'] │ └───┴────┴──────┴─────┴──────┴─────────────────┴───────────┘ INSERT INTO t1 (k, c1, c2, c3, c4, m1, a1) VALUES (1, 20, null, 'bar', 30, {'k2':0, 'k3':3}, ['world']); -- c1不是默认值,更新 -- c2为null,保留旧值;c3/c4不为null,更新 -- 更新m1{'k2'}, m1{'k3'},保留m1{'k1'} -- a1不为空值,更新 ┌─k─┬─c1─┬───c2─┬─c3──┬─c4─┬─m1─────────────────────┬─a1────────┐ │ 1 │ 20 │ 3.14 │ bar │ 30 │ {'k1':1,'k2':0,'k3':3} │ ['world'] │ └ ───┴────┴──────┴─────┴────┴────────────────────────┴───────────┘
【注意】使用这种方式在建表时
慎用默认值
!!!
在建表时可以在字段后指定默认值,默认值可以为表达式。如果用户指定了默认值,那么写入时对于缺省的列会按照建表时指定的方式进行填充,而部分列更新判断是否为默认值时是按照引擎内数据类型的默认值进行判断,因此可能会产生不符合预期的行为。下面将举个例子进行说明
CREATE TABLE t1 ( k Int32, c1 Int32 DEFAULT k + 1, c2 Nullable(Float64)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 0; insert into t1 values (1, 10, 3.14); ┌─k─┬─c1─┬───c2─┐ │ 1 │ 10 │ 3.14 │ └───┴────┴──────┘ insert into t1 (k, c2) values (1, 31.4); ┌─k─┬─c1─┬───c2─┐ │ 1 │ 2 │ 31.4 │ └───┴────┴──────┘ -- 如果在建表时没有指定从c1的默认值,那么这条写入的语义是将k=1的那条数据的c2被更新为31.4,但是由于建表指定了默认值,等价于insert into t1 values (1, 2,31.4),因此c1也被更新为了2。这种情况下为了达到仅更新c2的目的,可以有以下两种方式:1. 建表时不要使用默认值;2. 显示指定默认值,即insert into t1 values (1, 0,31.4) -- ┌─k─┬─c1─┬───c2─┐ -- │ 1 │ 10 │ 31.4 │ -- └───┴────┴──────┘
在使用 in-memory 模式下,HaUniqueMergeTree 的导入性能约是 HaMergeTree 或 社区的 MergeTree 引擎的一半,为 10k rows/s/shard,或 20 MB/s。性能相比社区的 ReplacingMergeTree 也接近一致。
但查询性能上,HaUniqueMergeTree 的性能和 HaMergeTree 或 MergeTree 引擎一致。
在使用 disk-based 模式下,HaUniqueMergeTree 的导入性能约是 HaMergeTree 或 社区的 MergeTree 引擎的 30-40%,为 6-8k rows/s/shard,或 12-16 MB/s。查询性能依旧不变。
在 Kafka 导入时,用户需要保证相同唯一键的数据写入同一个的 Topic Partition,并禁用 Topic 扩容;
唯一键所在的集群暂不支持扩容;
内存索引模式下,内存使用与唯一键大小及基数成正比,不适合单节点数据量超过 1 亿的表,如果使用不当会导致节点OOM;需要启用磁盘索引模式。
目前unique 表有两种key index使用方式:in-memory 和 disk-based。由表级参数
enable_disk_based_unique_key_index
控制。
如果包含很多字段,考虑使用哈希值。例:
UNIQUE KEY sipHash64(val1,val2,val3....)
优先使用默认的分区级别唯一。
如果业务场景必须使用表级别唯一,考虑设置 TTL、采用更粗粒度的分区(例如按月分区)等方式减少分区数量。
如果需要指定版本字段,优先考虑分区值作为版本,减少内存占用。
Detach partition 和 Attach partiton 操作只能在 leader 节点执行;
实时删除功能与唯一键级别、版本的作用规则:
删除字段功能仅能应用于 本地表(即HaUniqueMergeTree表) ,不支持对底表为HaUniqueMergeTree 的分布式表(Distributed 表)进行此操作。
Consumer 会直接写本地 shard。因此