fix(dsstore): persist inherited LTS trie

Before this commit, inherited trie was actually only kept in memory
cache.

Also simplify storage backend behaviour around inheriting previous
generation's legacy.
This commit is contained in:
Andrew Mayorov 2024-06-13 17:57:45 +02:00
parent 3143475769
commit 68f6556856
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 74 additions and 93 deletions

View File

@ -20,6 +20,7 @@
-export([
trie_create/1, trie_create/0,
destroy/1,
trie_dump/2,
trie_restore/2,
trie_update/2,
trie_copy_learned_paths/2,
@ -76,6 +77,8 @@
static_key_size => pos_integer()
}.
-type dump() :: [{_Key, _Val}].
-record(trie, {
persist :: persist_callback(),
static_key_size :: pos_integer(),
@ -125,12 +128,12 @@ destroy(#trie{trie = Trie, stats = Stats}) ->
ok.
%% @doc Restore trie from a dump
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
-spec trie_restore(options(), dump()) -> trie().
trie_restore(Options, Dump) ->
trie_update(trie_create(Options), Dump).
%% @doc Update a trie with a dump of operations (used for replication)
-spec trie_update(trie(), [{_Key, _Val}]) -> trie().
-spec trie_update(trie(), dump()) -> trie().
trie_update(Trie, Dump) ->
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
@ -140,14 +143,23 @@ trie_update(Trie, Dump) ->
),
Trie.
-spec trie_dump(trie(), _Filter :: all | wildcard) -> dump().
trie_dump(Trie, Filter) ->
case Filter of
all ->
Fun = fun(_) -> true end;
wildcard ->
Fun = fun contains_wildcard/1
end,
lists:append([P || P <- paths(Trie), Fun(P)]).
-spec trie_copy_learned_paths(trie(), trie()) -> trie().
trie_copy_learned_paths(OldTrie, NewTrie) ->
WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)],
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
trie_insert(NewTrie, StateFrom, Token, StateTo)
end,
lists:flatten(WildcardPaths)
trie_dump(OldTrie, wildcard)
),
NewTrie.

View File

@ -25,7 +25,7 @@
%% behavior callbacks:
-export([
create/4,
create/5,
open/5,
drop/5,
prepare_batch/4,
@ -37,7 +37,6 @@
update_iterator/4,
next/6,
delete_next/6,
post_creation_actions/1,
handle_event/4
]).
@ -179,10 +178,11 @@
emqx_ds_storage_layer:shard_id(),
rocksdb:db_handle(),
emqx_ds_storage_layer:gen_id(),
options()
options(),
_PrevGeneration :: s() | undefined
) ->
{schema(), emqx_ds_storage_layer:cf_refs()}.
create(_ShardId, DBHandle, GenId, Options) ->
create(_ShardId, DBHandle, GenId, Options, SPrev) ->
%% Get options:
BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
@ -193,6 +193,14 @@ create(_ShardId, DBHandle, GenId, Options) ->
TrieCFName = trie_cf(GenId),
{ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []),
{ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []),
case SPrev of
#s{trie = TriePrev} ->
ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev),
?tp(bitfield_lts_inherited_trie, #{}),
ok;
undefined ->
ok
end,
%% Create schema:
Schema = #{
bits_per_wildcard_level => BitsPerTopicLevel,
@ -241,20 +249,6 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
}.
-spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
s().
post_creation_actions(
#{
new_gen_runtime_data := NewGenData,
old_gen_runtime_data := OldGenData
}
) ->
#s{trie = OldTrie} = OldGenData,
#s{trie = NewTrie0} = NewGenData,
NewTrie = copy_previous_trie(OldTrie, NewTrie0),
?tp(bitfield_lts_inherited_trie, #{}),
NewGenData#s{trie = NewTrie}.
-spec drop(
emqx_ds_storage_layer:shard_id(),
rocksdb:db_handle(),
@ -905,9 +899,19 @@ restore_trie(TopicIndexBytes, DB, CF) ->
rocksdb:iterator_close(IT)
end.
-spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie().
copy_previous_trie(OldTrie, NewTrie) ->
emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie).
-spec copy_previous_trie(rocksdb:db_handle(), rocksdb:cf_handle(), emqx_ds_lts:trie()) ->
ok.
copy_previous_trie(DB, TrieCF, TriePrev) ->
{ok, Batch} = rocksdb:batch(),
lists:foreach(
fun({Key, Val}) ->
ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val))
end,
emqx_ds_lts:trie_dump(TriePrev, wildcard)
),
Result = rocksdb:write_batch(DB, Batch, []),
rocksdb:release_batch(Batch),
Result.
read_persisted_trie(IT, {ok, KeyB, ValB}) ->
[

View File

@ -69,7 +69,6 @@
shard_id/0,
options/0,
prototype/0,
post_creation_context/0,
cooked_batch/0
]).
@ -194,25 +193,19 @@
-type options() :: map().
-type post_creation_context() ::
#{
shard_id := emqx_ds_storage_layer:shard_id(),
db := rocksdb:db_handle(),
new_gen_id := emqx_ds_storage_layer:gen_id(),
old_gen_id := emqx_ds_storage_layer:gen_id(),
new_cf_refs := cf_refs(),
old_cf_refs := cf_refs(),
new_gen_runtime_data := _NewData,
old_gen_runtime_data := _OldData
}.
%%================================================================================
%% Generation callbacks
%%================================================================================
%% Create the new schema given generation id and the options.
%% Create rocksdb column families.
-callback create(shard_id(), rocksdb:db_handle(), gen_id(), Options :: map()) ->
-callback create(
shard_id(),
rocksdb:db_handle(),
gen_id(),
Options :: map(),
PrevRuntimeData :: term()
) ->
{_Schema, cf_refs()}.
%% Open the existing schema
@ -258,9 +251,7 @@
-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
-callback post_creation_actions(post_creation_context()) -> _Data.
-optional_callbacks([post_creation_actions/1, handle_event/4]).
-optional_callbacks([handle_event/4]).
%%================================================================================
%% API for the replication layer
@ -686,42 +677,14 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
server_state() | {error, overlaps_existing_generations}.
handle_add_generation(S0, Since) ->
#s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
#{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
OldKey = ?GEN_KEY(OldGenId),
#{OldKey := OldGenSchema} = Schema0,
#{cf_refs := OldCFRefs} = OldGenSchema,
#{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
Schema1 = update_last_until(Schema0, Since),
Shard1 = update_last_until(Shard0, Since),
case Schema1 of
_Updated = #{} ->
{GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
{GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Shard0, Since),
CFRefs = NewCFRefs ++ CFRefs0,
Key = ?GEN_KEY(GenId),
Generation0 =
#{data := NewGenData0} =
open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
%% When the new generation's module is the same as the last one, we might want to
%% perform actions like inheriting some of the previous (meta)data.
NewGenData =
run_post_creation_actions(
#{
shard_id => ShardId,
db => DB,
new_gen_id => GenId,
old_gen_id => OldGenId,
new_cf_refs => NewCFRefs,
old_cf_refs => OldCFRefs,
new_gen_runtime_data => NewGenData0,
old_gen_runtime_data => OldGenData,
new_module => CurrentMod,
old_module => OldMod
}
),
Generation = Generation0#{data := NewGenData},
Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
Shard = Shard1#{current_generation := GenId, Key => Generation},
S0#s{
cf_refs = CFRefs,
@ -834,9 +797,28 @@ create_new_shard_schema(ShardId, DB, CFRefs, Prototype) ->
-spec new_generation(shard_id(), rocksdb:db_handle(), shard_schema(), emqx_ds:time()) ->
{gen_id(), shard_schema(), cf_refs()}.
new_generation(ShardId, DB, Schema0, Since) ->
new_generation(ShardId, DB, Schema0, undefined, Since).
-spec new_generation(
shard_id(),
rocksdb:db_handle(),
shard_schema(),
shard() | undefined,
emqx_ds:time()
) ->
{gen_id(), shard_schema(), cf_refs()}.
new_generation(ShardId, DB, Schema0, Shard0, Since) ->
#{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
case Shard0 of
#{?GEN_KEY(PrevGenId) := #{module := Mod} = PrevGen} ->
%% When the new generation's module is the same as the last one, we might want
%% to perform actions like inheriting some of the previous (meta)data.
PrevRuntimeData = maps:get(data, PrevGen);
_ ->
PrevRuntimeData = undefined
end,
GenId = next_generation_id(PrevGenId),
{GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
{GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf, PrevRuntimeData),
GenSchema = #{
module => Mod,
data => GenData,
@ -918,23 +900,6 @@ update_last_until(Schema = #{current_generation := GenId}, Until) ->
{error, overlaps_existing_generations}
end.
run_post_creation_actions(
#{
new_module := Mod,
old_module := Mod,
new_gen_runtime_data := NewGenData
} = Context
) ->
case erlang:function_exported(Mod, post_creation_actions, 1) of
true ->
Mod:post_creation_actions(Context);
false ->
NewGenData
end;
run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
%% Different implementation modules
NewGenData.
handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
Name = integer_to_list(erlang:system_time(millisecond)),
Dir = checkpoint_dir(ShardId, Name),

View File

@ -28,7 +28,7 @@
%% behavior callbacks:
-export([
create/4,
create/5,
open/5,
drop/5,
prepare_batch/4,
@ -88,7 +88,7 @@
%% behavior callbacks
%%================================================================================
create(_ShardId, DBHandle, GenId, _Options) ->
create(_ShardId, DBHandle, GenId, _Options, _SPrev) ->
CFName = data_cf(GenId),
{ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, []),
Schema = #schema{},

View File

@ -2,7 +2,7 @@
{application, emqx_durable_storage, [
{description, "Message persistence and subscription replays for EMQX"},
% strict semver, bump manually!
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},