From 33a0048155b66bc78bbdfac7502553caa701b279 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 25 Aug 2023 15:24:53 -0300 Subject: [PATCH] refactor: move logic to `ensure_iterator` --- apps/emqx/src/emqx_persistent_session_ds.erl | 12 +++------- .../src/emqx_ds_storage_layer.erl | 23 +++++++++++-------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b862ef0d9..dc615fd5b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -123,18 +123,12 @@ open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) -> true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), ok. +%% RPC target. -spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. do_open_iterator(TopicFilter, StartMS, IteratorID) -> Replay = {TopicFilter, StartMS}, - case emqx_ds_storage_layer:is_iterator_present(?DS_SHARD, IteratorID) of - true -> - {ok, _It} = emqx_ds_storage_layer:restore_iterator(?DS_SHARD, IteratorID), - ok; - false -> - {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), - ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), - ok - end. + {ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay), + ok. %% diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 69c0e008c..47c29e170 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -17,7 +17,7 @@ preserve_iterator/2, restore_iterator/2, discard_iterator/2, - is_iterator_present/2, + ensure_iterator/3, discard_iterator_prefix/2, list_iterator_prefix/2, foldl_iterator_prefix/4 @@ -185,15 +185,18 @@ restore_iterator(Shard, ReplayID) -> Error end. --spec is_iterator_present(emqx_ds:shard(), emqx_ds:replay_id()) -> - boolean(). -is_iterator_present(Shard, ReplayID) -> - %% TODO: use keyMayExist after added to wrapper? - case iterator_get_state(Shard, ReplayID) of - {ok, _} -> - true; - _ -> - false +-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) -> + {ok, iterator()} | {error, _TODO}. +ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) -> + case restore_iterator(Shard, IteratorID) of + {ok, It} -> + {ok, It}; + {error, not_found} -> + {ok, It} = make_iterator(Shard, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + {ok, It}; + Error -> + Error end. -spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->