From ac91dbc58fc6bc29cbdca54a11d1efe4880a3e17 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 11 Oct 2023 16:49:25 +0200 Subject: [PATCH] feat(ds): Restore LTS trie from a dump --- .../src/emqx_ds_bitmask_keymapper.erl | 1 - apps/emqx_durable_storage/src/emqx_ds_lts.erl | 38 ++++++++++++++----- .../src/emqx_ds_replication_layer.erl | 6 ++- .../src/proto/emqx_ds_proto_v1.erl | 8 +++- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index fd2d41946..2f28de293 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -348,7 +348,6 @@ ones(Bits) -> %% NBits = ceil(math:log2(N + 1)), %% ones(NBits). - %% bitmask_of_test() -> %% ?assertEqual(2#0, bitmask_of(0)), %% ?assertEqual(2#1, bitmask_of(1)), diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index 5422979b7..e9d3124f9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -17,7 +17,9 @@ -module(emqx_ds_lts). %% API: --export([trie_create/1, trie_create/0, topic_key/3, match_topics/2, lookup_topic_key/2]). +-export([ + trie_create/1, trie_create/0, trie_restore/2, topic_key/3, match_topics/2, lookup_topic_key/2 +]). %% Debug: -export([trie_next/3, trie_insert/3, dump_to_dot/2]). @@ -85,8 +87,19 @@ trie_create(Persist) -> -spec trie_create() -> trie(). trie_create() -> trie_create(fun(_, _) -> - ok - end). + ok + end). + +%% @doc Restore trie from a dump +-spec trie_restore(persist_callback(), [{_Key, _Val}]) -> trie(). +trie_restore(Persist, Dump) -> + Trie = trie_create(Persist), + lists:foreach( + fun({{StateFrom, Token}, StateTo}) -> + trie_insert(Trie, StateFrom, Token, StateTo) + end, + Dump + ). %% @doc Lookup the topic key. Create a new one, if not found. -spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key(). @@ -173,11 +186,20 @@ trie_next(#trie{trie = Trie}, State, Token) -> end. -spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when - NChildren :: non_neg_integer(), + NChildren :: non_neg_integer(), Updated :: false | NChildren. -trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token) -> +trie_insert(Trie, State, Token) -> + trie_insert(Trie, State, Token, get_id_for_key(State, Token)). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-spec trie_insert(trie(), state(), edge(), state()) -> {Updated, state()} when + NChildren :: non_neg_integer(), + Updated :: false | NChildren. +trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) -> Key = {State, Token}, - NewState = get_id_for_key(State, Token), Rec = #trans{ key = Key, next = NewState @@ -198,10 +220,6 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token) {false, NextState} end. -%%================================================================================ -%% Internal functions -%%================================================================================ - -spec get_id_for_key(state(), edge()) -> static_key(). get_id_for_key(_State, _Token) -> %% Requirements for the return value: diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index aeb2ce646..5b4ad8666 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -132,7 +132,8 @@ get_streams(DB, TopicFilter, StartTime) -> Shards ). --spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). +-spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> + emqx_ds:make_iterator_result(iterator()). make_iterator(Stream, TopicFilter, StartTime) -> #stream{shard = Shard, enc = StorageStream} = Stream, Node = node_of_shard(Shard), @@ -184,7 +185,8 @@ do_drop_shard_v1(Shard) -> do_get_streams_v1(Shard, TopicFilter, StartTime) -> emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). --spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> {ok, iterator()} | {error, _}. +-spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> + {ok, iterator()} | {error, _}. do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime). diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index d4d7b3631..df9115a78 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -45,10 +45,14 @@ drop_shard(Node, Shard) -> get_streams(Node, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). --spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> +-spec make_iterator( + node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time() +) -> {ok, emqx_ds_replication_layer:iterator()} | {error, _}. make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> - erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, TopicFilter, StartTime]). + erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [ + Shard, Stream, TopicFilter, StartTime + ]). -spec next( node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()