fix: ensure iterator is opened

This commit is contained in:
Thales Macedo Garitezi 2023-08-11 10:06:39 -03:00
parent 33ddbe80ad
commit c28c6d1b7e
1 changed files with 13 additions and 11 deletions

View File

@ -93,12 +93,7 @@ add_subscription(TopicFilterBin, DSSessionID) ->
{ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
DSSessionID, TopicFilter
),
case IsNew of
true ->
ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID);
false ->
ok
end,
ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID),
{ok, IteratorID, IsNew}
end
).
@ -106,7 +101,9 @@ add_subscription(TopicFilterBin, DSSessionID) ->
-spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) ->
Nodes = emqx:running_nodes(),
Results = emqx_persistent_session_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID),
Results = emqx_persistent_session_ds_proto_v1:open_iterator(
Nodes, TopicFilter, StartMS, IteratorID
),
%% TODO: handle errors
true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results),
ok.
@ -114,10 +111,15 @@ open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) ->
-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
do_open_iterator(TopicFilter, StartMS, IteratorID) ->
Replay = {TopicFilter, StartMS},
%% FIXME: choose DS shard based on ...?
case emqx_ds_storage_layer:is_iterator_present(?DS_SHARD, IteratorID) of
true ->
{ok, _It} = emqx_ds_storage_layer:restore_iterator(?DS_SHARD, IteratorID),
ok;
false ->
{ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay),
ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
ok.
ok
end.
%%