diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index 184f709e9..dd06d884a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -20,6 +20,7 @@ -export([ trie_create/1, trie_create/0, destroy/1, + trie_dump/2, trie_restore/2, trie_update/2, trie_copy_learned_paths/2, @@ -76,6 +77,8 @@ static_key_size => pos_integer() }. +-type dump() :: [{_Key, _Val}]. + -record(trie, { persist :: persist_callback(), static_key_size :: pos_integer(), @@ -125,12 +128,12 @@ destroy(#trie{trie = Trie, stats = Stats}) -> ok. %% @doc Restore trie from a dump --spec trie_restore(options(), [{_Key, _Val}]) -> trie(). +-spec trie_restore(options(), dump()) -> trie(). trie_restore(Options, Dump) -> trie_update(trie_create(Options), Dump). %% @doc Update a trie with a dump of operations (used for replication) --spec trie_update(trie(), [{_Key, _Val}]) -> trie(). +-spec trie_update(trie(), dump()) -> trie(). trie_update(Trie, Dump) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> @@ -140,14 +143,23 @@ trie_update(Trie, Dump) -> ), Trie. +-spec trie_dump(trie(), _Filter :: all | wildcard) -> dump(). +trie_dump(Trie, Filter) -> + case Filter of + all -> + Fun = fun(_) -> true end; + wildcard -> + Fun = fun contains_wildcard/1 + end, + lists:append([P || P <- paths(Trie), Fun(P)]). + -spec trie_copy_learned_paths(trie(), trie()) -> trie(). trie_copy_learned_paths(OldTrie, NewTrie) -> - WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)], lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(NewTrie, StateFrom, Token, StateTo) end, - lists:flatten(WildcardPaths) + trie_dump(OldTrie, wildcard) ), NewTrie. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 3b62fbfdf..91f52abc6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -25,7 +25,7 @@ %% behavior callbacks: -export([ - create/4, + create/5, open/5, drop/5, prepare_batch/4, @@ -37,7 +37,6 @@ update_iterator/4, next/6, delete_next/6, - post_creation_actions/1, handle_event/4 ]). @@ -179,10 +178,11 @@ emqx_ds_storage_layer:shard_id(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), - options() + options(), + _PrevGeneration :: s() | undefined ) -> {schema(), emqx_ds_storage_layer:cf_refs()}. -create(_ShardId, DBHandle, GenId, Options) -> +create(_ShardId, DBHandle, GenId, Options, SPrev) -> %% Get options: BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), @@ -193,6 +193,14 @@ create(_ShardId, DBHandle, GenId, Options) -> TrieCFName = trie_cf(GenId), {ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []), {ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []), + case SPrev of + #s{trie = TriePrev} -> + ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev), + ?tp(bitfield_lts_inherited_trie, #{}), + ok; + undefined -> + ok + end, %% Create schema: Schema = #{ bits_per_wildcard_level => BitsPerTopicLevel, @@ -241,20 +249,6 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}]) }. --spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> - s(). -post_creation_actions( - #{ - new_gen_runtime_data := NewGenData, - old_gen_runtime_data := OldGenData - } -) -> - #s{trie = OldTrie} = OldGenData, - #s{trie = NewTrie0} = NewGenData, - NewTrie = copy_previous_trie(OldTrie, NewTrie0), - ?tp(bitfield_lts_inherited_trie, #{}), - NewGenData#s{trie = NewTrie}. - -spec drop( emqx_ds_storage_layer:shard_id(), rocksdb:db_handle(), @@ -905,9 +899,19 @@ restore_trie(TopicIndexBytes, DB, CF) -> rocksdb:iterator_close(IT) end. --spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie(). -copy_previous_trie(OldTrie, NewTrie) -> - emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie). +-spec copy_previous_trie(rocksdb:db_handle(), rocksdb:cf_handle(), emqx_ds_lts:trie()) -> + ok. +copy_previous_trie(DB, TrieCF, TriePrev) -> + {ok, Batch} = rocksdb:batch(), + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val)) + end, + emqx_ds_lts:trie_dump(TriePrev, wildcard) + ), + Result = rocksdb:write_batch(DB, Batch, []), + rocksdb:release_batch(Batch), + Result. read_persisted_trie(IT, {ok, KeyB, ValB}) -> [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index f35792c17..47fe047fc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -69,7 +69,6 @@ shard_id/0, options/0, prototype/0, - post_creation_context/0, cooked_batch/0 ]). @@ -169,11 +168,14 @@ until := emqx_ds:time() | undefined }. +%% Module-specific runtime data, as instantiated by `Mod:open/5` callback function. +-type generation_data() :: term(). + %% Schema for a generation. Persistent term. -type generation_schema() :: generation(term()). %% Runtime view of generation: --type generation() :: generation(term()). +-type generation() :: generation(generation_data()). %%%% Shard: @@ -194,38 +196,32 @@ -type options() :: map(). --type post_creation_context() :: - #{ - shard_id := emqx_ds_storage_layer:shard_id(), - db := rocksdb:db_handle(), - new_gen_id := emqx_ds_storage_layer:gen_id(), - old_gen_id := emqx_ds_storage_layer:gen_id(), - new_cf_refs := cf_refs(), - old_cf_refs := cf_refs(), - new_gen_runtime_data := _NewData, - old_gen_runtime_data := _OldData - }. - %%================================================================================ %% Generation callbacks %%================================================================================ %% Create the new schema given generation id and the options. %% Create rocksdb column families. --callback create(shard_id(), rocksdb:db_handle(), gen_id(), Options :: map()) -> +-callback create( + shard_id(), + rocksdb:db_handle(), + gen_id(), + Options :: map(), + generation_data() | undefined +) -> {_Schema, cf_refs()}. %% Open the existing schema -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> - _Data. + generation_data(). %% Delete the schema and data --callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> +-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), generation_data()) -> ok | {error, _Reason}. -callback prepare_batch( shard_id(), - _Data, + generation_data(), [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> @@ -233,34 +229,44 @@ -callback commit_batch( shard_id(), - _Data, + generation_data(), _CookedBatch ) -> ok | emqx_ds:error(_). --callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> +-callback get_streams( + shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time() +) -> [_Stream]. --callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> +-callback make_iterator( + shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time() +) -> emqx_ds:make_iterator_result(_Iterator). -callback make_delete_iterator( - shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() + shard_id(), generation_data(), _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) -> +-callback next( + shard_id(), generation_data(), Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean() +) -> {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. -callback delete_next( - shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() + shard_id(), + generation_data(), + DeleteIterator, + emqx_ds:delete_selector(), + pos_integer(), + emqx_ds:time() ) -> {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. --callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> + [CustomEvent]. --callback post_creation_actions(post_creation_context()) -> _Data. - --optional_callbacks([post_creation_actions/1, handle_event/4]). +-optional_callbacks([handle_event/4]). %%================================================================================ %% API for the replication layer @@ -686,42 +692,14 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> server_state() | {error, overlaps_existing_generations}. handle_add_generation(S0, Since) -> #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, - - #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0, - OldKey = ?GEN_KEY(OldGenId), - #{OldKey := OldGenSchema} = Schema0, - #{cf_refs := OldCFRefs} = OldGenSchema, - #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0, - Schema1 = update_last_until(Schema0, Since), Shard1 = update_last_until(Shard0, Since), - case Schema1 of _Updated = #{} -> - {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), + {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Shard0, Since), CFRefs = NewCFRefs ++ CFRefs0, Key = ?GEN_KEY(GenId), - Generation0 = - #{data := NewGenData0} = - open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), - %% When the new generation's module is the same as the last one, we might want to - %% perform actions like inheriting some of the previous (meta)data. - NewGenData = - run_post_creation_actions( - #{ - shard_id => ShardId, - db => DB, - new_gen_id => GenId, - old_gen_id => OldGenId, - new_cf_refs => NewCFRefs, - old_cf_refs => OldCFRefs, - new_gen_runtime_data => NewGenData0, - old_gen_runtime_data => OldGenData, - new_module => CurrentMod, - old_module => OldMod - } - ), - Generation = Generation0#{data := NewGenData}, + Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), Shard = Shard1#{current_generation := GenId, Key => Generation}, S0#s{ cf_refs = CFRefs, @@ -834,9 +812,28 @@ create_new_shard_schema(ShardId, DB, CFRefs, Prototype) -> -spec new_generation(shard_id(), rocksdb:db_handle(), shard_schema(), emqx_ds:time()) -> {gen_id(), shard_schema(), cf_refs()}. new_generation(ShardId, DB, Schema0, Since) -> + new_generation(ShardId, DB, Schema0, undefined, Since). + +-spec new_generation( + shard_id(), + rocksdb:db_handle(), + shard_schema(), + shard() | undefined, + emqx_ds:time() +) -> + {gen_id(), shard_schema(), cf_refs()}. +new_generation(ShardId, DB, Schema0, Shard0, Since) -> #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0, + case Shard0 of + #{?GEN_KEY(PrevGenId) := #{module := Mod} = PrevGen} -> + %% When the new generation's module is the same as the last one, we might want + %% to perform actions like inheriting some of the previous (meta)data. + PrevRuntimeData = maps:get(data, PrevGen); + _ -> + PrevRuntimeData = undefined + end, GenId = next_generation_id(PrevGenId), - {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf), + {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf, PrevRuntimeData), GenSchema = #{ module => Mod, data => GenData, @@ -918,23 +915,6 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> {error, overlaps_existing_generations} end. -run_post_creation_actions( - #{ - new_module := Mod, - old_module := Mod, - new_gen_runtime_data := NewGenData - } = Context -) -> - case erlang:function_exported(Mod, post_creation_actions, 1) of - true -> - Mod:post_creation_actions(Context); - false -> - NewGenData - end; -run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) -> - %% Different implementation modules - NewGenData. - handle_take_snapshot(#s{db = DB, shard_id = ShardId}) -> Name = integer_to_list(erlang:system_time(millisecond)), Dir = checkpoint_dir(ShardId, Name), @@ -1007,17 +987,17 @@ generation_get(Shard, GenId) -> -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()]. generations_since(Shard, Since) -> - Schema = get_schema_runtime(Shard), - maps:fold( - fun - (?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since -> - [GenId | Acc]; - (_K, _V, Acc) -> - Acc - end, - [], - Schema - ). + Schema = #{current_generation := Current} = get_schema_runtime(Shard), + list_generations_since(Schema, Current, Since). + +list_generations_since(Schema, GenId, Since) -> + case Schema of + #{?GEN_KEY(GenId) := #{until := Until}} when Until > Since -> + [GenId | list_generations_since(Schema, GenId - 1, Since)]; + #{} -> + %% No more live generations. + [] + end. format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) -> #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 1c506390e..2d5803f31 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -28,7 +28,7 @@ %% behavior callbacks: -export([ - create/4, + create/5, open/5, drop/5, prepare_batch/4, @@ -88,7 +88,7 @@ %% behavior callbacks %%================================================================================ -create(_ShardId, DBHandle, GenId, _Options) -> +create(_ShardId, DBHandle, GenId, _Options, _SPrev) -> CFName = data_cf(GenId), {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, []), Schema = #schema{}, diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index 3551a40df..7a20577d4 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,7 +2,7 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]}, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index bb6d0f917..004096431 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -177,20 +177,33 @@ t_new_generation_inherit_trie(_Config) -> ?check_trace( begin %% Create a bunch of topics to be learned in the first generation - Timestamps = lists:seq(1, 10_000, 100), - Batch = [ - begin - Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]), - {TS, make_message(TS, Topic, integer_to_binary(TS))} - end + TS1 = 500, + Batch1 = [ + {TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))} || I <- lists:seq(1, 200), - TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. - ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000), + ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), + %% Restart the shard, to verify that LTS is persisted. + ok = application:stop(emqx_durable_storage), + ok = application:start(emqx_durable_storage), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), + %% Store a batch of messages with the same set of topics. + TS2 = 1_500, + Batch2 = [ + {TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))} + || I <- lists:seq(1, 200), + Suffix <- [<<"foo">>, <<"bar">>] + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + %% We should get only two streams for wildcard query, for "foo" and for "bar". + ?assertMatch( + [_Foo, _Bar], + emqx_ds_storage_layer:get_streams(?SHARD, [<<"wildcard">>, '#'], 1_000) + ), ok end, fun(Trace) -> @@ -211,10 +224,7 @@ t_replay(_Config) -> ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ - begin - Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]), - {TS, make_message(TS, Topic, integer_to_binary(TS))} - end + {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), @@ -475,6 +485,9 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> payload = Payload }. +make_topic(Tokens = [_ | _]) -> + emqx_topic:join([bin(T) || T <- Tokens]). + payloads(Messages) -> lists:map( fun(#message{payload = P}) -> @@ -488,6 +501,9 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic) -> emqx_topic:words(iolist_to_binary(Topic)). +bin(X) -> + emqx_utils_conv:bin(X). + %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/changes/ce/fix-13276.en.md b/changes/ce/fix-13276.en.md new file mode 100644 index 000000000..66b52e45a --- /dev/null +++ b/changes/ce/fix-13276.en.md @@ -0,0 +1 @@ +Fix an issue with durable message storage where parts of the internal storage state were not persisted during setup of new storage generation, a concept used internally for managing message expiration and cleanup. This could have manifested as messages being lost after a restart of the broker.