From a4642d4d06eef76f63734610e7cdb48aa3bb54ef Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 1 Jul 2024 01:02:13 +0200 Subject: [PATCH] feat(ds): Add a new storage layout engine: "skipstream" This layout is based on LTS as well, but it uses separate index streams for constrained replay of streams with learned wildcards --- .../src/emqx_ds_storage_bitfield_lts.erl | 2 +- .../src/emqx_ds_storage_skipstream_lts.erl | 674 ++++++++++++++++++ 2 files changed, 675 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 182fe8cf0..fb831318e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -198,7 +198,7 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) -> case SPrev of #s{trie = TriePrev} -> ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev), - ?tp(bitfield_lts_inherited_trie, #{}), + ?tp(layout_inherited_lts_trie, #{}), ok; undefined -> ok diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl new file mode 100644 index 000000000..7b0c49c6f --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl @@ -0,0 +1,674 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_skipstream_lts). + +-behaviour(emqx_ds_storage_layer). + +%% API: +-export([]). + +%% behavior callbacks: +-export([ + create/5, + open/5, + drop/5, + prepare_batch/4, + commit_batch/4, + get_streams/4, + get_delete_streams/4, + make_iterator/5, + make_delete_iterator/5, + update_iterator/4, + next/6, + delete_next/7 +]). + +%% internal exports: +-export([]). + +-export_type([schema/0, s/0]). + +-include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +-elvis([{elvis_style, nesting_level, disable}]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%% keys: +-define(cooked_payloads, 6). +-define(cooked_lts_ops, 7). + +-define(lts_persist_ops, emqx_ds_storage_skipstream_lts_ops). + +%% Width of the wildcard layer, in bits: +-define(wcb, 16). +-type wildcard_idx() :: 0..16#ffff. + +%% Width of the timestamp, in bits: +-define(tsb, 64). +-define(max_ts, 16#ffffffffffffffff). +-type ts() :: 0..?max_ts. + +-type wildcard_hash() :: binary(). + +%% Permanent state: +-type schema() :: + #{ + wildcard_hash_bytes := pos_integer(), + topic_index_bytes := pos_integer(), + keep_message_id := boolean(), + serialization_schema := emqx_ds_msg_serializer:schema(), + with_guid := boolean() + }. + +%% Runtime state: +-record(s, { + db :: rocksdb:db_handle(), + data_cf :: rocksdb:cf_handle(), + trie :: emqx_ds_lts:trie(), + trie_cf :: rocksdb:cf_handle(), + serialization_schema :: emqx_ds_msg_serializer:schema(), + hash_bytes :: pos_integer(), + with_guid :: boolean() +}). + +-type s() :: #s{}. + +-record(stream, { + static_index :: emqx_ds_lts:static_key() +}). + +-record(it, { + static_index :: emqx_ds_lts:static_key(), + ts :: ts(), + compressed_tf :: binary() +}). + +%% Level iterator: +-record(l, { + n :: non_neg_integer(), + handle :: rocksdb:itr_handle(), + hash :: binary() +}). + +%%================================================================================ +%% API functions +%%================================================================================ + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +create(_ShardId, DBHandle, GenId, Schema0, SPrev) -> + Defaults = #{ + wildcard_hash_bytes => 8, + topic_index_bytes => 8, + serialization_schema => asn1, + with_guid => false + }, + Schema = maps:merge(Defaults, Schema0), + ok = emqx_ds_msg_serializer:check_schema(maps:get(serialization_schema, Schema)), + DataCFName = data_cf(GenId), + TrieCFName = trie_cf(GenId), + {ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []), + {ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []), + case SPrev of + #s{trie = TriePrev} -> + ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev), + ?tp(layout_inherited_lts_trie, #{}), + ok; + undefined -> + ok + end, + {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}. + +open(_Shard, DBHandle, GenId, CFRefs, #{ + topic_index_bytes := TIBytes, + wildcard_hash_bytes := WCBytes, + serialization_schema := SSchema, + with_guid := WithGuid +}) -> + {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), + {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), + Trie = restore_trie(TIBytes, DBHandle, TrieCF), + #s{ + db = DBHandle, + data_cf = DataCF, + trie_cf = TrieCF, + trie = Trie, + hash_bytes = WCBytes, + serialization_schema = SSchema, + with_guid = WithGuid + }. + +drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF, trie = Trie}) -> + emqx_ds_lts:destroy(Trie), + ok = rocksdb:drop_column_family(DBHandle, DataCF), + ok = rocksdb:drop_column_family(DBHandle, TrieCF), + ok. + +prepare_batch( + _ShardId, + S = #s{trie = Trie, hash_bytes = HashBytes}, + Messages, + _Options +) -> + _ = erase(?lts_persist_ops), + Payloads = + lists:flatmap( + fun({Timestamp, Msg = #message{topic = Topic}}) -> + Tokens = words(Topic), + {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), + %% TODO: is it possible to create index during the + %% commit phase to avoid transferring indexes through + %% the translog? + [ + {mk_key(Static, 0, <<>>, Timestamp), serialize(S, Varying, Msg)} + | mk_index(HashBytes, Static, Timestamp, Varying) + ] + end, + Messages + ), + {ok, #{ + ?cooked_payloads => Payloads, + ?cooked_lts_ops => pop_lts_persist_ops() + }}. + +commit_batch( + _ShardId, + #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie}, + #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads}, + Options +) -> + {ok, Batch} = rocksdb:batch(), + try + %% Commit LTS trie to the storage: + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val)) + end, + LtsOps + ), + %% Apply LTS ops to the memory cache: + _ = emqx_ds_lts:trie_update(Trie, LtsOps), + %% Commit payloads: + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, DataCF, Key, Val) + end, + Payloads + ), + Result = rocksdb:write_batch(DB, Batch, [ + {disable_wal, not maps:get(durable, Options, true)} + ]), + %% NOTE + %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to + %% observe until there's `{no_slowdown, true}` in write options. + case Result of + ok -> + ok; + {error, {error, Reason}} -> + {error, unrecoverable, {rocksdb, Reason}} + end + after + rocksdb:release_batch(Batch) + end. + +get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> + get_streams(Trie, TopicFilter). + +get_delete_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> + get_streams(Trie, TopicFilter). + +make_iterator(_Shard, #s{trie = Trie}, #stream{static_index = StaticIdx}, TopicFilter, StartTime) -> + {ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx), + CompressedTF = emqx_ds_lts:compress_topic(StaticIdx, TopicStructure, TopicFilter), + {ok, #it{ + static_index = StaticIdx, + ts = StartTime, + compressed_tf = emqx_topic:join(CompressedTF) + }}. + +make_delete_iterator(Shard, Data, Stream, TopicFilter, StartTime) -> + make_iterator(Shard, Data, Stream, TopicFilter, StartTime). + +update_iterator(_Shard, _Data, OldIter, DSKey) -> + case match_ds_key(OldIter#it.static_index, DSKey) of + false -> + {error, unrecoverable, "Invalid datastream key"}; + TS -> + {ok, OldIter#it{ts = TS}} + end. + +next({_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) -> + Iterators = init_iterators(S, It), + %% ?tp(notice, skipstream_init_iters, #{it => It, its => Iterators}), + try + case next_loop(Shard, S, It, Iterators, BatchSize, TMax) of + {ok, _, []} when not IsCurrent -> + {ok, end_of_stream}; + Result -> + Result + end + after + free_iterators(Iterators) + end. + +delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) -> + case next(Shard, S, It0, BatchSize, Now, IsCurrent) of + {ok, It, KVs} -> + batch_delete(S, It, Selector, KVs); + Ret -> + Ret + end. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ + +get_streams(Trie, TopicFilter) -> + lists:map( + fun({Static, _Varying}) -> + #stream{static_index = Static} + end, + emqx_ds_lts:match_topics(Trie, TopicFilter) + ). + +%%%%%%%% Value (de)serialization %%%%%%%%%% + +serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg0) -> + %% Replace original topic with the varying parts: + Msg = Msg0#message{ + id = + case WithGuid of + true -> Msg0#message.id; + false -> <<>> + end, + topic = emqx_topic:join(Varying) + }, + emqx_ds_msg_serializer:serialize(SSchema, Msg). + +enrich( + Shard, + #s{trie = Trie, with_guid = WithGuid}, + DSKey, + StaticKey, + Msg0 +) -> + case emqx_ds_lts:reverse_lookup(Trie, StaticKey) of + {ok, Structure} -> + %% Reconstruct the original topic from the static topic + %% index and varying parts: + Topic = emqx_topic:join( + emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic)) + ), + Msg0#message{ + topic = Topic, + id = + case WithGuid of + true -> Msg0#message.id; + false -> fake_guid(Shard, DSKey) + end + }; + undefined -> + Err = #{ + msg => "LTS trie missing key", + key => StaticKey + }, + throw({unrecoverable, Err}) + end. + +deserialize( + #s{serialization_schema = SSchema}, + Blob +) -> + emqx_ds_msg_serializer:deserialize(SSchema, Blob). + +fake_guid(_Shard, DSKey) -> + %% Both guid and MD5 are 16 bytes: + crypto:hash(md5, DSKey). + +%%%%%%%% Deletion %%%%%%%%%% + +batch_delete(#s{hash_bytes = HashBytes, db = DB, data_cf = CF}, It, Selector, KVs) -> + #it{static_index = Static, compressed_tf = CompressedTF} = It, + {Indices, _} = lists:foldl( + fun + ('+', {Acc, WildcardIdx}) -> + {Acc, WildcardIdx + 1}; + (LevelFilter, {Acc0, WildcardIdx}) -> + Acc = [{WildcardIdx, hash(HashBytes, LevelFilter)} | Acc0], + {Acc, WildcardIdx + 1} + end, + {[], 1}, + words(CompressedTF) + ), + KeyFamily = [{0, <<>>} | Indices], + {ok, Batch} = rocksdb:batch(), + try + Ndeleted = lists:foldl( + fun({MsgKey, Val}, Acc) -> + case Selector(Val) of + true -> + do_delete(CF, Batch, Static, KeyFamily, MsgKey), + Acc + 1; + false -> + Acc + end + end, + 0, + KVs + ), + case rocksdb:write_batch(DB, Batch, []) of + ok -> + {ok, It, Ndeleted, length(KVs)}; + {error, {error, Reason}} -> + {error, unrecoverable, {rocksdb, Reason}} + end + after + rocksdb:release_batch(Batch) + end. + +do_delete(CF, Batch, Static, KeyFamily, MsgKey) -> + TS = match_ds_key(Static, MsgKey), + lists:foreach( + fun({WildcardIdx, Hash}) -> + ok = rocksdb:batch_delete(Batch, CF, mk_key(Static, WildcardIdx, Hash, TS)) + end, + KeyFamily + ). + +%%%%%%%% Iteration %%%%%%%%%% + +init_iterators(S, #it{static_index = Static, compressed_tf = CompressedTF}) -> + do_init_iterators(S, Static, words(CompressedTF), 1). + +do_init_iterators(S, Static, ['+' | TopicFilter], WildcardLevel) -> + %% Ignore wildcard levels in the topic filter: + do_init_iterators(S, Static, TopicFilter, WildcardLevel + 1); +do_init_iterators(S, Static, [Constraint | TopicFilter], WildcardLevel) -> + %% Create iterator for the index stream: + #s{hash_bytes = HashBytes, db = DB, data_cf = DataCF} = S, + Hash = hash(HashBytes, Constraint), + {ok, ItHandle} = rocksdb:iterator(DB, DataCF, get_key_range(Static, WildcardLevel, Hash)), + It = #l{ + n = WildcardLevel, + handle = ItHandle, + hash = Hash + }, + [It | do_init_iterators(S, Static, TopicFilter, WildcardLevel + 1)]; +do_init_iterators(S, Static, [], _WildcardLevel) -> + %% Create an iterator for the data stream: + #s{db = DB, data_cf = DataCF} = S, + Hash = <<>>, + {ok, ItHandle} = rocksdb:iterator(DB, DataCF, get_key_range(Static, 0, Hash)), + [ + #l{ + n = 0, + handle = ItHandle, + hash = Hash + } + ]. + +next_loop(Shard, S, It = #it{ts = TS}, Iterators, BatchSize, TMax) -> + next_loop(Shard, S, It, Iterators, BatchSize, TMax, {seek, TS}, []). + +next_loop(_Shard, _S, It, _Iterators, 0, _TMax, Op, Acc) -> + finalize_loop(It, Op, Acc); +next_loop(Shard, S, It0, Iterators, BatchSize, TMax, Op, Acc) -> + %% ?tp(notice, skipstream_loop, #{ + %% ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op + %% }), + #it{static_index = StaticIdx, compressed_tf = CompressedTF} = It0, + case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of + none -> + %% ?tp(notice, skipstream_loop_result, #{r => none}), + finalize_loop(It0, Op, Acc); + {seek, TS} when TS > TMax -> + %% ?tp(notice, skipstream_loop_result, #{r => seek_future, ts => TS}), + finalize_loop(It0, {seek, TS}, Acc); + {ok, TS, _Key, _Msg0} when TS > TMax -> + %% ?tp(notice, skipstream_loop_result, #{r => ok_future, ts => TS, key => _Key}), + finalize_loop(It0, {seek, TS}, Acc); + {seek, TS} -> + %% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}), + It = It0#it{ts = TS}, + next_loop(Shard, S, It, Iterators, BatchSize, TMax, {seek, TS}, Acc); + {ok, TS, DSKey, Msg0} -> + %% ?tp(notice, skipstream_loop_result, #{r => ok, ts => TS, key => Key}), + Message = enrich(Shard, S, DSKey, StaticIdx, Msg0), + It = It0#it{ts = TS}, + next_loop(Shard, S, It, Iterators, BatchSize - 1, TMax, next, [{DSKey, Message} | Acc]) + end. + +finalize_loop(It0, Op, Acc) -> + case Op of + next -> NextTS = It0#it.ts + 1; + {seek, NextTS} -> ok + end, + It = It0#it{ts = NextTS}, + {ok, It, lists:reverse(Acc)}. + +next_step( + S, StaticIdx, CompressedTF, [#l{hash = Hash, handle = IH, n = N} | Iterators], ExpectedTS, Op +) -> + Result = + case Op of + next -> + rocksdb:iterator_move(IH, next); + {seek, TS} -> + rocksdb:iterator_move(IH, {seek, mk_key(StaticIdx, N, Hash, TS)}) + end, + case Result of + {error, invalid_iterator} -> + none; + {ok, Key, Blob} -> + case match_key(StaticIdx, N, Hash, Key) of + false -> + %% This should not happen, since we set boundaries + %% to the iterators, and overflow to a different + %% key prefix should be caught by the previous + %% clause: + none; + NextTS when ExpectedTS =:= undefined; NextTS =:= ExpectedTS -> + %% We found a key that corresponds to the + %% timestamp we expect. + %% ?tp(notice, ?MODULE_STRING "_step_hit", #{ + %% next_ts => NextTS, expected => ExpectedTS, n => N + %% }), + case Iterators of + [] -> + %% This is data stream as well. Check + %% message for hash collisions and return + %% value: + Msg0 = deserialize(S, Blob), + case emqx_topic:match(Msg0#message.topic, CompressedTF) of + true -> + {ok, NextTS, Key, Msg0}; + false -> + %% Hash collision. Advance to the + %% next timestamp: + {seek, NextTS + 1} + end; + _ -> + %% This is index stream. Keep going: + next_step(S, StaticIdx, CompressedTF, Iterators, NextTS, {seek, NextTS}) + end; + NextTS when NextTS > ExpectedTS, N > 0 -> + %% Next index level is not what we expect. + {seek, NextTS} + end + end. + +free_iterators(Its) -> + lists:foreach( + fun(#l{handle = IH}) -> + ok = rocksdb:iterator_close(IH) + end, + Its + ). + +%%%%%%%% Indexes %%%%%%%%%% + +mk_index(HashBytes, Static, Timestamp, Varying) -> + mk_index(HashBytes, Static, Timestamp, 1, Varying, []). + +mk_index(HashBytes, Static, Timestamp, N, [TopicLevel | Varying], Acc) -> + Op = {mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), <<>>}, + mk_index(HashBytes, Static, Timestamp, N + 1, Varying, [Op | Acc]); +mk_index(_HashBytes, _Static, _Timestamp, _N, [], Acc) -> + Acc. + +%%%%%%%% Keys %%%%%%%%%% + +get_key_range(StaticIdx, WildcardIdx, Hash) -> + [ + {iterate_lower_bound, mk_key(StaticIdx, WildcardIdx, Hash, 0)}, + {iterate_upper_bound, mk_key(StaticIdx, WildcardIdx, Hash, ?max_ts)} + ]. + +-spec match_ds_key(emqx_ds_lts:static_key(), binary()) -> ts() | false. +match_ds_key(StaticIdx, Key) -> + match_key(StaticIdx, 0, <<>>, Key). + +-spec match_key(emqx_ds_lts:static_key(), wildcard_idx(), wildcard_hash(), binary()) -> + ts() | false. +match_key(StaticIdx, 0, <<>>, Key) -> + TSz = size(StaticIdx), + case Key of + <> -> + Timestamp; + _ -> + false + end; +match_key(StaticIdx, Idx, Hash, Key) when Idx > 0 -> + Tsz = size(StaticIdx), + Hsz = size(Hash), + case Key of + <> -> + Timestamp; + _ -> + false + end. + +-spec mk_key(emqx_ds_lts:static_key(), wildcard_idx(), wildcard_hash(), ts()) -> binary(). +mk_key(StaticIdx, 0, <<>>, Timestamp) -> + %% Data stream is identified by wildcard level = 0 + <>; +mk_key(StaticIdx, N, Hash, Timestamp) when N > 0 -> + %% Index stream: + <>. + +hash(HashBytes, '') -> + hash(HashBytes, <<>>); +hash(HashBytes, TopicLevel) -> + {Hash, _} = split_binary(erlang:md5(TopicLevel), HashBytes), + Hash. + +%%%%%%%% LTS %%%%%%%%%% + +%% TODO: don't hardcode the thresholds +threshold_fun(0) -> + 100; +threshold_fun(_) -> + 10. + +-spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). +restore_trie(StaticIdxBytes, DB, CF) -> + PersistCallback = fun(Key, Val) -> + push_lts_persist_op(Key, Val), + ok + end, + {ok, IT} = rocksdb:iterator(DB, CF, []), + try + Dump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)), + TrieOpts = #{ + persist_callback => PersistCallback, + static_key_bytes => StaticIdxBytes, + reverse_lookups => true + }, + emqx_ds_lts:trie_restore(TrieOpts, Dump) + after + rocksdb:iterator_close(IT) + end. + +-spec copy_previous_trie(rocksdb:db_handle(), rocksdb:cf_handle(), emqx_ds_lts:trie()) -> + ok. +copy_previous_trie(DB, TrieCF, TriePrev) -> + {ok, Batch} = rocksdb:batch(), + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val)) + end, + emqx_ds_lts:trie_dump(TriePrev, wildcard) + ), + Result = rocksdb:write_batch(DB, Batch, []), + rocksdb:release_batch(Batch), + Result. + +push_lts_persist_op(Key, Val) -> + case erlang:get(?lts_persist_ops) of + undefined -> + erlang:put(?lts_persist_ops, [{Key, Val}]); + L when is_list(L) -> + erlang:put(?lts_persist_ops, [{Key, Val} | L]) + end. + +pop_lts_persist_ops() -> + case erlang:erase(?lts_persist_ops) of + undefined -> + []; + L when is_list(L) -> + L + end. + +read_persisted_trie(IT, {ok, KeyB, ValB}) -> + [ + {binary_to_term(KeyB), binary_to_term(ValB)} + | read_persisted_trie(IT, rocksdb:iterator_move(IT, next)) + ]; +read_persisted_trie(_IT, {error, invalid_iterator}) -> + []. + +%%%%%%%% Column families %%%%%%%%%% + +%% @doc Generate a column family ID for the MQTT messages +-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. +data_cf(GenId) -> + "emqx_ds_storage_skipstream_lts_data" ++ integer_to_list(GenId). + +%% @doc Generate a column family ID for the trie +-spec trie_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. +trie_cf(GenId) -> + "emqx_ds_storage_skipstream_lts_trie" ++ integer_to_list(GenId). + +%%%%%%%% Topic encoding %%%%%%%%%% + +words(<<>>) -> + []; +words(Bin) -> + emqx_topic:words(Bin).