refactor: move logic to `ensure_iterator`
This commit is contained in:
parent
f15f59650d
commit
33a0048155
|
@ -123,18 +123,12 @@ open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) ->
|
||||||
true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results),
|
true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% RPC target.
|
||||||
-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
|
-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
|
||||||
do_open_iterator(TopicFilter, StartMS, IteratorID) ->
|
do_open_iterator(TopicFilter, StartMS, IteratorID) ->
|
||||||
Replay = {TopicFilter, StartMS},
|
Replay = {TopicFilter, StartMS},
|
||||||
case emqx_ds_storage_layer:is_iterator_present(?DS_SHARD, IteratorID) of
|
{ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay),
|
||||||
true ->
|
ok.
|
||||||
{ok, _It} = emqx_ds_storage_layer:restore_iterator(?DS_SHARD, IteratorID),
|
|
||||||
ok;
|
|
||||||
false ->
|
|
||||||
{ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay),
|
|
||||||
ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
preserve_iterator/2,
|
preserve_iterator/2,
|
||||||
restore_iterator/2,
|
restore_iterator/2,
|
||||||
discard_iterator/2,
|
discard_iterator/2,
|
||||||
is_iterator_present/2,
|
ensure_iterator/3,
|
||||||
discard_iterator_prefix/2,
|
discard_iterator_prefix/2,
|
||||||
list_iterator_prefix/2,
|
list_iterator_prefix/2,
|
||||||
foldl_iterator_prefix/4
|
foldl_iterator_prefix/4
|
||||||
|
@ -185,15 +185,18 @@ restore_iterator(Shard, ReplayID) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec is_iterator_present(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) ->
|
||||||
boolean().
|
{ok, iterator()} | {error, _TODO}.
|
||||||
is_iterator_present(Shard, ReplayID) ->
|
ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) ->
|
||||||
%% TODO: use keyMayExist after added to wrapper?
|
case restore_iterator(Shard, IteratorID) of
|
||||||
case iterator_get_state(Shard, ReplayID) of
|
{ok, It} ->
|
||||||
{ok, _} ->
|
{ok, It};
|
||||||
true;
|
{error, not_found} ->
|
||||||
_ ->
|
{ok, It} = make_iterator(Shard, Replay),
|
||||||
false
|
ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
|
||||||
|
{ok, It};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
||||||
|
|
Loading…
Reference in New Issue