chore(ds): slightly simplify working with storage generations

This commit is contained in:
Andrew Mayorov 2024-03-19 20:03:08 +01:00
parent 2d074df209
commit b8b9b7739b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 24 additions and 29 deletions

View File

@ -259,14 +259,14 @@ get_streams(Shard, TopicFilter, StartTime) ->
lists:flatmap( lists:flatmap(
fun(GenId) -> fun(GenId) ->
?tp(get_streams_get_gen, #{gen_id => GenId}), ?tp(get_streams_get_gen, #{gen_id => GenId}),
case generation_get_safe(Shard, GenId) of case generation_get(Shard, GenId) of
{ok, #{module := Mod, data := GenData}} -> #{module := Mod, data := GenData} ->
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
[ [
{GenId, ?stream_v2(GenId, InnerStream)} {GenId, ?stream_v2(GenId, InnerStream)}
|| InnerStream <- Streams || InnerStream <- Streams
]; ];
{error, not_found} -> not_found ->
%% race condition: generation was dropped before getting its streams? %% race condition: generation was dropped before getting its streams?
[] []
end end
@ -282,14 +282,14 @@ get_delete_streams(Shard, TopicFilter, StartTime) ->
lists:flatmap( lists:flatmap(
fun(GenId) -> fun(GenId) ->
?tp(get_streams_get_gen, #{gen_id => GenId}), ?tp(get_streams_get_gen, #{gen_id => GenId}),
case generation_get_safe(Shard, GenId) of case generation_get(Shard, GenId) of
{ok, #{module := Mod, data := GenData}} -> #{module := Mod, data := GenData} ->
Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime), Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime),
[ [
?delete_stream(GenId, InnerStream) ?delete_stream(GenId, InnerStream)
|| InnerStream <- Streams || InnerStream <- Streams
]; ];
{error, not_found} -> not_found ->
%% race condition: generation was dropped before getting its streams? %% race condition: generation was dropped before getting its streams?
[] []
end end
@ -302,8 +302,8 @@ get_delete_streams(Shard, TopicFilter, StartTime) ->
make_iterator( make_iterator(
Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime
) -> ) ->
case generation_get_safe(Shard, GenId) of case generation_get(Shard, GenId) of
{ok, #{module := Mod, data := GenData}} -> #{module := Mod, data := GenData} ->
case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{ {ok, #{
@ -314,7 +314,7 @@ make_iterator(
{error, _} = Err -> {error, _} = Err ->
Err Err
end; end;
{error, not_found} -> not_found ->
{error, unrecoverable, generation_not_found} {error, unrecoverable, generation_not_found}
end. end.
@ -323,8 +323,8 @@ make_iterator(
make_delete_iterator( make_delete_iterator(
Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime
) -> ) ->
case generation_get_safe(Shard, GenId) of case generation_get(Shard, GenId) of
{ok, #{module := Mod, data := GenData}} -> #{module := Mod, data := GenData} ->
case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{ {ok, #{
@ -335,7 +335,7 @@ make_delete_iterator(
{error, _} = Err -> {error, _} = Err ->
Err Err
end; end;
{error, not_found} -> not_found ->
{error, end_of_stream} {error, end_of_stream}
end. end.
@ -346,8 +346,8 @@ update_iterator(
#{?tag := ?IT, ?generation := GenId, ?enc := OldIter}, #{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
DSKey DSKey
) -> ) ->
case generation_get_safe(Shard, GenId) of case generation_get(Shard, GenId) of
{ok, #{module := Mod, data := GenData}} -> #{module := Mod, data := GenData} ->
case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{ {ok, #{
@ -358,15 +358,15 @@ update_iterator(
{error, _} = Err -> {error, _} = Err ->
Err Err
end; end;
{error, not_found} -> not_found ->
{error, unrecoverable, generation_not_found} {error, unrecoverable, generation_not_found}
end. end.
-spec next(shard_id(), iterator(), pos_integer()) -> -spec next(shard_id(), iterator(), pos_integer()) ->
emqx_ds:next_result(iterator()). emqx_ds:next_result(iterator()).
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
case generation_get_safe(Shard, GenId) of case generation_get(Shard, GenId) of
{ok, #{module := Mod, data := GenData}} -> #{module := Mod, data := GenData} ->
Current = generation_current(Shard), Current = generation_current(Shard),
case Mod:next(Shard, GenData, GenIter0, BatchSize) of case Mod:next(Shard, GenData, GenIter0, BatchSize) of
{ok, _GenIter, []} when GenId < Current -> {ok, _GenIter, []} when GenId < Current ->
@ -379,7 +379,7 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
Error = {error, _, _} -> Error = {error, _, _} ->
Error Error
end; end;
{error, not_found} -> not_found ->
%% generation was possibly dropped by GC %% generation was possibly dropped by GC
{error, unrecoverable, generation_not_found} {error, unrecoverable, generation_not_found}
end. end.
@ -392,8 +392,8 @@ delete_next(
Selector, Selector,
BatchSize BatchSize
) -> ) ->
case generation_get_safe(Shard, GenId) of case generation_get(Shard, GenId) of
{ok, #{module := Mod, data := GenData}} -> #{module := Mod, data := GenData} ->
Current = generation_current(Shard), Current = generation_current(Shard),
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of
{ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
@ -406,7 +406,7 @@ delete_next(
Error = {error, _} -> Error = {error, _} ->
Error Error
end; end;
{error, not_found} -> not_found ->
%% generation was possibly dropped by GC %% generation was possibly dropped by GC
{ok, end_of_stream} {ok, end_of_stream}
end. end.
@ -777,18 +777,13 @@ generation_current(Shard) ->
#{current_generation := Current} = get_schema_runtime(Shard), #{current_generation := Current} = get_schema_runtime(Shard),
Current. Current.
-spec generation_get(shard_id(), gen_id()) -> generation(). -spec generation_get(shard_id(), gen_id()) -> generation() | not_found.
generation_get(Shard, GenId) -> generation_get(Shard, GenId) ->
{ok, GenData} = generation_get_safe(Shard, GenId),
GenData.
-spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}.
generation_get_safe(Shard, GenId) ->
case get_schema_runtime(Shard) of case get_schema_runtime(Shard) of
#{?GEN_KEY(GenId) := GenData} -> #{?GEN_KEY(GenId) := GenData} ->
{ok, GenData}; GenData;
#{} -> #{} ->
{error, not_found} not_found
end. end.
-spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()]. -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].