feat(ds): Restore LTS trie from a dump

This commit is contained in:
ieQu1 2023-10-11 16:49:25 +02:00
parent f1ab7c8a7c
commit ac91dbc58f
4 changed files with 38 additions and 15 deletions

View File

@ -348,7 +348,6 @@ ones(Bits) ->
%% NBits = ceil(math:log2(N + 1)), %% NBits = ceil(math:log2(N + 1)),
%% ones(NBits). %% ones(NBits).
%% bitmask_of_test() -> %% bitmask_of_test() ->
%% ?assertEqual(2#0, bitmask_of(0)), %% ?assertEqual(2#0, bitmask_of(0)),
%% ?assertEqual(2#1, bitmask_of(1)), %% ?assertEqual(2#1, bitmask_of(1)),

View File

@ -17,7 +17,9 @@
-module(emqx_ds_lts). -module(emqx_ds_lts).
%% API: %% API:
-export([trie_create/1, trie_create/0, topic_key/3, match_topics/2, lookup_topic_key/2]). -export([
trie_create/1, trie_create/0, trie_restore/2, topic_key/3, match_topics/2, lookup_topic_key/2
]).
%% Debug: %% Debug:
-export([trie_next/3, trie_insert/3, dump_to_dot/2]). -export([trie_next/3, trie_insert/3, dump_to_dot/2]).
@ -85,8 +87,19 @@ trie_create(Persist) ->
-spec trie_create() -> trie(). -spec trie_create() -> trie().
trie_create() -> trie_create() ->
trie_create(fun(_, _) -> trie_create(fun(_, _) ->
ok ok
end). end).
%% @doc Restore trie from a dump
-spec trie_restore(persist_callback(), [{_Key, _Val}]) -> trie().
trie_restore(Persist, Dump) ->
Trie = trie_create(Persist),
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
trie_insert(Trie, StateFrom, Token, StateTo)
end,
Dump
).
%% @doc Lookup the topic key. Create a new one, if not found. %% @doc Lookup the topic key. Create a new one, if not found.
-spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key(). -spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key().
@ -173,11 +186,20 @@ trie_next(#trie{trie = Trie}, State, Token) ->
end. end.
-spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when -spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when
NChildren :: non_neg_integer(), NChildren :: non_neg_integer(),
Updated :: false | NChildren. Updated :: false | NChildren.
trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token) -> trie_insert(Trie, State, Token) ->
trie_insert(Trie, State, Token, get_id_for_key(State, Token)).
%%================================================================================
%% Internal functions
%%================================================================================
-spec trie_insert(trie(), state(), edge(), state()) -> {Updated, state()} when
NChildren :: non_neg_integer(),
Updated :: false | NChildren.
trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, NewState) ->
Key = {State, Token}, Key = {State, Token},
NewState = get_id_for_key(State, Token),
Rec = #trans{ Rec = #trans{
key = Key, key = Key,
next = NewState next = NewState
@ -198,10 +220,6 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token)
{false, NextState} {false, NextState}
end. end.
%%================================================================================
%% Internal functions
%%================================================================================
-spec get_id_for_key(state(), edge()) -> static_key(). -spec get_id_for_key(state(), edge()) -> static_key().
get_id_for_key(_State, _Token) -> get_id_for_key(_State, _Token) ->
%% Requirements for the return value: %% Requirements for the return value:

View File

@ -132,7 +132,8 @@ get_streams(DB, TopicFilter, StartTime) ->
Shards Shards
). ).
-spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). -spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()).
make_iterator(Stream, TopicFilter, StartTime) -> make_iterator(Stream, TopicFilter, StartTime) ->
#stream{shard = Shard, enc = StorageStream} = Stream, #stream{shard = Shard, enc = StorageStream} = Stream,
Node = node_of_shard(Shard), Node = node_of_shard(Shard),
@ -184,7 +185,8 @@ do_drop_shard_v1(Shard) ->
do_get_streams_v1(Shard, TopicFilter, StartTime) -> do_get_streams_v1(Shard, TopicFilter, StartTime) ->
emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
-spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> {ok, iterator()} | {error, _}. -spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) ->
{ok, iterator()} | {error, _}.
do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) -> do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime). emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).

View File

@ -45,10 +45,14 @@ drop_shard(Node, Shard) ->
get_streams(Node, Shard, TopicFilter, Time) -> get_streams(Node, Shard, TopicFilter, Time) ->
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
-spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> -spec make_iterator(
node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
) ->
{ok, emqx_ds_replication_layer:iterator()} | {error, _}. {ok, emqx_ds_replication_layer:iterator()} | {error, _}.
make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, TopicFilter, StartTime]). erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
Shard, Stream, TopicFilter, StartTime
]).
-spec next( -spec next(
node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer() node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()