diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 69f5b8231..521467d8c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -258,14 +258,14 @@ get_streams(Shard, TopicFilter, StartTime) -> lists:flatmap( fun(GenId) -> ?tp(get_streams_get_gen, #{gen_id => GenId}), - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), [ {GenId, ?stream_v2(GenId, InnerStream)} || InnerStream <- Streams ]; - {error, not_found} -> + not_found -> %% race condition: generation was dropped before getting its streams? [] end @@ -281,14 +281,14 @@ get_delete_streams(Shard, TopicFilter, StartTime) -> lists:flatmap( fun(GenId) -> ?tp(get_streams_get_gen, #{gen_id => GenId}), - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime), [ ?delete_stream(GenId, InnerStream) || InnerStream <- Streams ]; - {error, not_found} -> + not_found -> %% race condition: generation was dropped before getting its streams? [] end @@ -301,8 +301,8 @@ get_delete_streams(Shard, TopicFilter, StartTime) -> make_iterator( Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{ @@ -313,7 +313,7 @@ make_iterator( {error, _} = Err -> Err end; - {error, not_found} -> + not_found -> {error, unrecoverable, generation_not_found} end. @@ -322,8 +322,8 @@ make_iterator( make_delete_iterator( Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{ @@ -334,7 +334,7 @@ make_delete_iterator( {error, _} = Err -> Err end; - {error, not_found} -> + not_found -> {error, end_of_stream} end. @@ -345,8 +345,8 @@ update_iterator( #{?tag := ?IT, ?generation := GenId, ?enc := OldIter}, DSKey ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of {ok, Iter} -> {ok, #{ @@ -357,15 +357,15 @@ update_iterator( {error, _} = Err -> Err end; - {error, not_found} -> + not_found -> {error, unrecoverable, generation_not_found} end. -spec next(shard_id(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Current = generation_current(Shard), case Mod:next(Shard, GenData, GenIter0, BatchSize) of {ok, _GenIter, []} when GenId < Current -> @@ -378,7 +378,7 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch Error = {error, _, _} -> Error end; - {error, not_found} -> + not_found -> %% generation was possibly dropped by GC {error, unrecoverable, generation_not_found} end. @@ -391,8 +391,8 @@ delete_next( Selector, BatchSize ) -> - case generation_get_safe(Shard, GenId) of - {ok, #{module := Mod, data := GenData}} -> + case generation_get(Shard, GenId) of + #{module := Mod, data := GenData} -> Current = generation_current(Shard), case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> @@ -405,7 +405,7 @@ delete_next( Error = {error, _} -> Error end; - {error, not_found} -> + not_found -> %% generation was possibly dropped by GC {ok, end_of_stream} end. @@ -768,18 +768,13 @@ generation_current(Shard) -> #{current_generation := Current} = get_schema_runtime(Shard), Current. --spec generation_get(shard_id(), gen_id()) -> generation(). +-spec generation_get(shard_id(), gen_id()) -> generation() | not_found. generation_get(Shard, GenId) -> - {ok, GenData} = generation_get_safe(Shard, GenId), - GenData. - --spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}. -generation_get_safe(Shard, GenId) -> case get_schema_runtime(Shard) of #{?GEN_KEY(GenId) := GenData} -> - {ok, GenData}; + GenData; #{} -> - {error, not_found} + not_found end. -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].