diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 68e2f4597..e93780ba2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -80,6 +80,10 @@ -define(stream_v2(GENERATION, INNER), [GENERATION | INNER]). -define(delete_stream(GENERATION, INNER), [GENERATION | INNER]). +%% Wrappers for the storage events: +-define(storage_event(GEN_ID, PAYLOAD), #{0 := 3333, 1 := GEN_ID, 2 := PAYLOAD}). +-define(mk_storage_event(GEN_ID, PAYLOAD), #{0 => 3333, 1 => GEN_ID, 2 => PAYLOAD}). + %%================================================================================ %% Type declarations %%================================================================================ @@ -848,10 +852,6 @@ new_generation(ShardId, DB, Schema0, Since) -> next_generation_id(GenId) -> GenId + 1. --spec prev_generation_id(gen_id()) -> gen_id(). -prev_generation_id(GenId) when GenId > 0 -> - GenId - 1. - %% @doc Commit current state of the server to both rocksdb and the persistent term -spec commit_metadata(server_state()) -> ok. commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) -> @@ -947,27 +947,23 @@ handle_accept_snapshot(ShardId) -> Dir = db_dir(ShardId), emqx_ds_storage_snapshot:new_writer(Dir). -%% FIXME: currently this interface is a hack to handle safe cutoff -%% timestamp in LTS. It has many shortcomings (can lead to infinite -%% loops if the CBM is not careful; events from one generation may be -%% sent to the next one, etc.) and the API is not well thought out in -%% general. -%% -%% The mechanism of storage layer events should be refined later. --spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> {gen_id(), [CustomEvent]}. -handle_event(Shard, Time, Event) -> - case generation_at(Shard, Time) of - {_GenId, #{module := Mod, data := GenData}} -> - ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), +-spec handle_event(shard_id(), emqx_ds:time(), Event) -> [Event]. +handle_event(Shard, Time, ?storage_event(GenId, Event)) -> + case generation_get(Shard, GenId) of + not_found -> + []; + #{module := Mod, data := GenData} -> case erlang:function_exported(Mod, handle_event, 4) of true -> - Mod:handle_event(Shard, GenData, Time, Event); + NewEvents = Mod:handle_event(Shard, GenData, Time, Event), + [?mk_storage_event(GenId, E) || E <- NewEvents]; false -> [] - end; - _ -> - [] - end. + end + end; +handle_event(Shard, Time, Event) -> + GenId = generation_current(Shard), + handle_event(Shard, Time, ?mk_storage_event(GenId, Event)). %%-------------------------------------------------------------------------------- %% Schema access @@ -1001,24 +997,6 @@ generations_since(Shard, Since) -> Schema ). --spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}. -generation_at(Shard, Time) -> - Schema = #{current_generation := Current} = get_schema_runtime(Shard), - generation_at(Time, Current, Schema). - -generation_at(Time, GenId, Schema) -> - case Schema of - #{?GEN_KEY(GenId) := Gen} -> - case Gen of - #{since := Since} when Time < Since andalso GenId > 0 -> - generation_at(Time, prev_generation_id(GenId), Schema); - _ -> - {GenId, Gen} - end; - _ -> - not_found - end. - -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -spec get_schema_runtime(shard_id()) -> shard().