From 7c0d37fdb978486b3bcfd07856011a26682a94a1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 18 Jan 2024 14:28:13 -0300 Subject: [PATCH] feat(lts): inherit previous generation's lts when possible --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 11 ++++++- .../src/emqx_ds_storage_bitfield_lts.erl | 30 ++++++++++++++++- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 33 +++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index bcf95852d..9d87cf571 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -18,7 +18,12 @@ %% API: -export([ - trie_create/1, trie_create/0, trie_restore/2, topic_key/3, match_topics/2, lookup_topic_key/2 + trie_create/1, trie_create/0, + trie_restore/2, + trie_restore_existing/2, + topic_key/3, + match_topics/2, + lookup_topic_key/2 ]). %% Debug: @@ -115,6 +120,10 @@ trie_create() -> -spec trie_restore(options(), [{_Key, _Val}]) -> trie(). trie_restore(Options, Dump) -> Trie = trie_create(Options), + trie_restore_existing(Trie, Dump). + +-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie(). +trie_restore_existing(Trie, Dump) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(Trie, StateFrom, Token, StateTo) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 27d41e6c6..2a3086a57 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -32,7 +32,8 @@ get_streams/4, make_iterator/5, update_iterator/4, - next/4 + next/4, + post_creation_actions/1 ]). %% internal exports: @@ -200,6 +201,22 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> ts_offset = TSOffsetBits }. +-spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> + s(). +post_creation_actions( + #{ + db := DBHandle, + old_gen_id := OldGenId, + old_cf_refs := OldCFRefs, + new_gen_runtime_data := NewGenData0 + } +) -> + {_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs), + #s{trie = NewTrie0} = NewGenData0, + NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF), + ?tp(bitfield_lts_inherited_trie, #{}), + NewGenData0#s{trie = NewTrie}. + -spec drop( emqx_ds_storage_layer:shard_id(), rocksdb:db_handle(), @@ -516,6 +533,17 @@ restore_trie(TopicIndexBytes, DB, CF) -> rocksdb:iterator_close(IT) end. +-spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) -> + emqx_ds_lts:trie(). +copy_previous_trie(DBHandle, NewTrie, OldCF) -> + {ok, IT} = rocksdb:iterator(DBHandle, OldCF, []), + try + OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)), + emqx_ds_lts:trie_restore_existing(NewTrie, OldDump) + after + rocksdb:iterator_close(IT) + end. + read_persisted_trie(IT, {ok, KeyB, ValB}) -> [ {binary_to_term(KeyB), binary_to_term(ValB)} diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 03ff1a6cb..5d32143a7 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -131,6 +131,39 @@ t_get_streams(_Config) -> ?assert(lists:member(A, AllStreams)), ok. +t_new_generation_inherit_trie(_Config) -> + %% This test checks that we inherit the previous generation's LTS when creating a new + %% generation. + ?check_trace( + begin + %% Create a bunch of topics to be learned in the first generation + Timestamps = lists:seq(1, 10_000, 100), + Batch = [ + begin + B = integer_to_binary(I), + make_message( + TS, + <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, + integer_to_binary(TS) + ) + end + || I <- lists:seq(1, 200), + TS <- Timestamps, + Suffix <- [<<"foo">>, <<"bar">>] + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + %% Now we create a new generation with the same LTS module. It should inherit the + %% learned trie. + ok = emqx_ds_storage_layer:add_generation(?SHARD), + ok + end, + fun(Trace) -> + ?assertMatch([_], ?of_kind(bitfield_lts_inherited_trie, Trace)), + ok + end + ), + ok. + t_replay(_Config) -> %% Create concrete topics: Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],