From b8b9b7739b8306ed4cb2ced6bf51b01eed8eed32 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Mar 2024 20:03:08 +0100 Subject: [PATCH] chore(ds): slightly simplify working with storage generations --- .../src/emqx_ds_storage_layer.erl | 53 +++++++++---------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 5319458e2..fee4c4457 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -259,14 +259,14 @@ get_streams(Shard, TopicFilter, StartTime) -> lists:flatmap( fun(GenId) -> ?tp(get_streams_get_gen, #{gen_id => GenId}), - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), [ {GenId, ?stream_v2(GenId, InnerStream)} || InnerStream <- Streams ]; - {error, not_found} -> + not_found -> %% race condition: generation was dropped before getting its streams? [] end @@ -282,14 +282,14 @@ get_delete_streams(Shard, TopicFilter, StartTime) -> lists:flatmap( fun(GenId) -> ?tp(get_streams_get_gen, #{gen_id => GenId}), - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime), [ ?delete_stream(GenId, InnerStream) || InnerStream <- Streams ]; - {error, not_found} -> + not_found -> %% race condition: generation was dropped before getting its streams? [] end @@ -302,8 +302,8 @@ get_delete_streams(Shard, TopicFilter, StartTime) -> make_iterator( Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{ @@ -314,7 +314,7 @@ make_iterator( {error, _} = Err -> Err end; - {error, not_found} -> + not_found -> {error, unrecoverable, generation_not_found} end. @@ -323,8 +323,8 @@ make_iterator( make_delete_iterator( Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{ @@ -335,7 +335,7 @@ make_delete_iterator( {error, _} = Err -> Err end; - {error, not_found} -> + not_found -> {error, end_of_stream} end. @@ -346,8 +346,8 @@ update_iterator( #{?tag := ?IT, ?generation := GenId, ?enc := OldIter}, DSKey ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of {ok, Iter} -> {ok, #{ @@ -358,15 +358,15 @@ update_iterator( {error, _} = Err -> Err end; - {error, not_found} -> + not_found -> {error, unrecoverable, generation_not_found} end. -spec next(shard_id(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Current = generation_current(Shard), case Mod:next(Shard, GenData, GenIter0, BatchSize) of {ok, _GenIter, []} when GenId < Current -> @@ -379,7 +379,7 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch Error = {error, _, _} -> Error end; - {error, not_found} -> + not_found -> %% generation was possibly dropped by GC {error, unrecoverable, generation_not_found} end. @@ -392,8 +392,8 @@ delete_next( Selector, BatchSize ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Current = generation_current(Shard), case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> @@ -406,7 +406,7 @@ delete_next( Error = {error, _} -> Error end; - {error, not_found} -> + not_found -> %% generation was possibly dropped by GC {ok, end_of_stream} end. @@ -777,18 +777,13 @@ generation_current(Shard) -> #{current_generation := Current} = get_schema_runtime(Shard), Current. --spec generation_get(shard_id(), gen_id()) -> generation(). +-spec generation_get(shard_id(), gen_id()) -> generation() | not_found. generation_get(Shard, GenId) -> - {ok, GenData} = generation_get_safe(Shard, GenId), - GenData. - --spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}. -generation_get_safe(Shard, GenId) -> case get_schema_runtime(Shard) of #{?GEN_KEY(GenId) := GenData} -> - {ok, GenData}; + GenData; #{} -> - {error, not_found} + not_found end. -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].