fix(ds): Include generation ID in the storage events

Make sure storage events originating from generation X are handled in
the context of the same generation.
This commit is contained in:
ieQu1 2024-05-19 21:19:57 +02:00
parent acdae4fba3
commit 0ff307e789
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 17 additions and 39 deletions

View File

@ -80,6 +80,10 @@
-define(stream_v2(GENERATION, INNER), [GENERATION | INNER]).
-define(delete_stream(GENERATION, INNER), [GENERATION | INNER]).
%% Wrappers for the storage events:
-define(storage_event(GEN_ID, PAYLOAD), #{0 := 3333, 1 := GEN_ID, 2 := PAYLOAD}).
-define(mk_storage_event(GEN_ID, PAYLOAD), #{0 => 3333, 1 => GEN_ID, 2 => PAYLOAD}).
%%================================================================================
%% Type declarations
%%================================================================================
@ -848,10 +852,6 @@ new_generation(ShardId, DB, Schema0, Since) ->
next_generation_id(GenId) ->
GenId + 1.
-spec prev_generation_id(gen_id()) -> gen_id().
prev_generation_id(GenId) when GenId > 0 ->
GenId - 1.
%% @doc Commit current state of the server to both rocksdb and the persistent term
-spec commit_metadata(server_state()) -> ok.
commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) ->
@ -947,27 +947,23 @@ handle_accept_snapshot(ShardId) ->
Dir = db_dir(ShardId),
emqx_ds_storage_snapshot:new_writer(Dir).
%% FIXME: currently this interface is a hack to handle safe cutoff
%% timestamp in LTS. It has many shortcomings (can lead to infinite
%% loops if the CBM is not careful; events from one generation may be
%% sent to the next one, etc.) and the API is not well thought out in
%% general.
%%
%% The mechanism of storage layer events should be refined later.
-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> {gen_id(), [CustomEvent]}.
handle_event(Shard, Time, Event) ->
case generation_at(Shard, Time) of
{_GenId, #{module := Mod, data := GenData}} ->
?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}),
-spec handle_event(shard_id(), emqx_ds:time(), Event) -> [Event].
handle_event(Shard, Time, ?storage_event(GenId, Event)) ->
case generation_get(Shard, GenId) of
not_found ->
[];
#{module := Mod, data := GenData} ->
case erlang:function_exported(Mod, handle_event, 4) of
true ->
Mod:handle_event(Shard, GenData, Time, Event);
NewEvents = Mod:handle_event(Shard, GenData, Time, Event),
[?mk_storage_event(GenId, E) || E <- NewEvents];
false ->
[]
end;
_ ->
[]
end.
end
end;
handle_event(Shard, Time, Event) ->
GenId = generation_current(Shard),
handle_event(Shard, Time, ?mk_storage_event(GenId, Event)).
%%--------------------------------------------------------------------------------
%% Schema access
@ -1001,24 +997,6 @@ generations_since(Shard, Since) ->
Schema
).
-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}.
generation_at(Shard, Time) ->
Schema = #{current_generation := Current} = get_schema_runtime(Shard),
generation_at(Time, Current, Schema).
generation_at(Time, GenId, Schema) ->
case Schema of
#{?GEN_KEY(GenId) := Gen} ->
case Gen of
#{since := Since} when Time < Since andalso GenId > 0 ->
generation_at(Time, prev_generation_id(GenId), Schema);
_ ->
{GenId, Gen}
end;
_ ->
not_found
end.
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
-spec get_schema_runtime(shard_id()) -> shard().