feat(lts): inherit previous generation's lts when possible

This commit is contained in:
Thales Macedo Garitezi 2024-01-18 14:28:13 -03:00
parent 75b08b525b
commit db710c4be5
3 changed files with 72 additions and 2 deletions

View File

@ -18,7 +18,12 @@
%% API:
-export([
trie_create/1, trie_create/0, trie_restore/2, topic_key/3, match_topics/2, lookup_topic_key/2
trie_create/1, trie_create/0,
trie_restore/2,
trie_restore_existing/2,
topic_key/3,
match_topics/2,
lookup_topic_key/2
]).
%% Debug:
@ -115,6 +120,10 @@ trie_create() ->
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
trie_restore(Options, Dump) ->
Trie = trie_create(Options),
trie_restore_existing(Trie, Dump).
-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie().
trie_restore_existing(Trie, Dump) ->
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
trie_insert(Trie, StateFrom, Token, StateTo)

View File

@ -32,7 +32,8 @@
get_streams/4,
make_iterator/5,
update_iterator/4,
next/4
next/4,
post_creation_actions/1
]).
%% internal exports:
@ -200,6 +201,22 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
ts_offset = TSOffsetBits
}.
-spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
s().
post_creation_actions(
#{
db := DBHandle,
old_gen_id := OldGenId,
old_cf_refs := OldCFRefs,
new_gen_runtime_data := NewGenData0
}
) ->
{_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs),
#s{trie = NewTrie0} = NewGenData0,
NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF),
?tp(bitfield_lts_inherited_trie, #{}),
NewGenData0#s{trie = NewTrie}.
-spec drop(
emqx_ds_storage_layer:shard_id(),
rocksdb:db_handle(),
@ -516,6 +533,17 @@ restore_trie(TopicIndexBytes, DB, CF) ->
rocksdb:iterator_close(IT)
end.
-spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) ->
emqx_ds_lts:trie().
copy_previous_trie(DBHandle, NewTrie, OldCF) ->
{ok, IT} = rocksdb:iterator(DBHandle, OldCF, []),
try
OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
emqx_ds_lts:trie_restore_existing(NewTrie, OldDump)
after
rocksdb:iterator_close(IT)
end.
read_persisted_trie(IT, {ok, KeyB, ValB}) ->
[
{binary_to_term(KeyB), binary_to_term(ValB)}

View File

@ -131,6 +131,39 @@ t_get_streams(_Config) ->
?assert(lists:member(A, AllStreams)),
ok.
t_new_generation_inherit_trie(_Config) ->
%% This test checks that we inherit the previous generation's LTS when creating a new
%% generation.
?check_trace(
begin
%% Create a bunch of topics to be learned in the first generation
Timestamps = lists:seq(1, 10_000, 100),
Batch = [
begin
B = integer_to_binary(I),
make_message(
TS,
<<"wildcard/", B/binary, "/suffix/", Suffix/binary>>,
integer_to_binary(TS)
)
end
|| I <- lists:seq(1, 200),
TS <- Timestamps,
Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
%% Now we create a new generation with the same LTS module. It should inherit the
%% learned trie.
ok = emqx_ds_storage_layer:add_generation(?SHARD),
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(bitfield_lts_inherited_trie, Trace)),
ok
end
),
ok.
t_replay(_Config) ->
%% Create concrete topics:
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],