diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7d118ccb4..564c99c00 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -93,12 +93,7 @@ add_subscription(TopicFilterBin, DSSessionID) -> {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( DSSessionID, TopicFilter ), - case IsNew of - true -> - ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID); - false -> - ok - end, + ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID), {ok, IteratorID, IsNew} end ). @@ -106,7 +101,9 @@ add_subscription(TopicFilterBin, DSSessionID) -> -spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> Nodes = emqx:running_nodes(), - Results = emqx_persistent_session_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID), + Results = emqx_persistent_session_ds_proto_v1:open_iterator( + Nodes, TopicFilter, StartMS, IteratorID + ), %% TODO: handle errors true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), ok. @@ -114,10 +111,15 @@ open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> -spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. do_open_iterator(TopicFilter, StartMS, IteratorID) -> Replay = {TopicFilter, StartMS}, - %% FIXME: choose DS shard based on ...? - {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), - ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), - ok. + case emqx_ds_storage_layer:is_iterator_present(?DS_SHARD, IteratorID) of + true -> + {ok, _It} = emqx_ds_storage_layer:restore_iterator(?DS_SHARD, IteratorID), + ok; + false -> + {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + ok + end. %%