diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl index fba4cf911..3fea157ed 100644 --- a/apps/emqx/include/emqx_session.hrl +++ b/apps/emqx/include/emqx_session.hrl @@ -49,9 +49,7 @@ %% Awaiting PUBREL Timeout (Unit: millisecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer(), - %% Durable storage iterators for existing subscriptions - iterators = [] :: [emqx_ds_replay:replay_id()] + created_at :: pos_integer() }). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 0120f09d4..19e11b1a3 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -85,7 +85,7 @@ open_session(ClientID) -> ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). -spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) -> - {ok, emqx_ds:iterator_id(), _IsNew :: boolean()} | {skipped, disabled}. + {ok, emqx_ds:iterator_id(), IsNew :: boolean()} | {skipped, disabled}. add_subscription(TopicFilterBin, DSSessionID) -> ?WHEN_ENABLED( begin diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 9b877ae44..32c98290a 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -169,14 +169,8 @@ -spec init_and_open(emqx_types:clientid(), options()) -> session(). init_and_open(ClientID, Options) -> Session0 = emqx_session:init(Options), - IteratorIDs = - case emqx_persistent_session_ds:open_session(ClientID) of - {skipped, disabled} -> - []; - {_IsNew, _DSSessionID, Iterators0} -> - Iterators0 - end, - Session0#session{iterators = IteratorIDs}. + _ = emqx_persistent_session_ds:open_session(ClientID), + Session0. -spec init(options()) -> session(). init(Opts) -> @@ -274,9 +268,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt; -info(iterators, #session{iterators = IteratorIds}) -> - IteratorIds. + CreatedAt. %% @doc Get stats of the session. -spec stats(session()) -> emqx_types:stats(). @@ -325,15 +317,8 @@ is_subscriptions_full(#session{ -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> session(). add_persistent_subscription(TopicFilterBin, ClientId, Session) -> - case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of - {skipped, disabled} -> - Session; - {ok, IteratorID, _IsNew = true} -> - Iterators = Session#session.iterators, - Session#session{iterators = [IteratorID | Iterators]}; - {ok, _IteratorID, _IsNew = false} -> - Session - end. + _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId), + Session. %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index b669be889..7ca1f3f15 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -148,18 +148,15 @@ t_session_subscription_iterators(Config) -> ct:pal("publishing 4"), Message4 = emqx_message:make(AnotherTopic, Payload4), publish(Node1, Message4), - IteratorIds = get_iterator_ids(Node1, ClientId), emqtt:stop(Client), #{ - messages => [Message1, Message2, Message3, Message4], - iterator_ids => IteratorIds + messages => [Message1, Message2, Message3, Message4] } end, fun(Results, Trace) -> ct:pal("trace:\n ~p", [Trace]), #{ - messages := [_Message1, Message2, Message3 | _], - iterator_ids := IteratorIds + messages := [_Message1, Message2, Message3 | _] } = Results, case ?of_kind(ds_session_subscription_added, Trace) of [] -> @@ -182,10 +179,9 @@ t_session_subscription_iterators(Config) -> ), ok end, - ?assertMatch([_], IteratorIds), ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), - ?assertMatch({ok, [_]}, get_all_iterator_ids(Node2)), - [IteratorId] = IteratorIds, + {ok, [IteratorId]} = get_all_iterator_ids(Node1), + ?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)), ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), ExpectedMessages = [Message2, Message3], ?assertEqual(ExpectedMessages, ReplayMessages1), @@ -284,7 +280,4 @@ get_mqtt_port(Node, Type) -> Port. get_all_iterator_ids(Node) -> - Fn = fun(K, _V, Acc) -> [K | Acc] end, - erpc:call(Node, fun() -> - emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) - end). + erpc:call(Node, emqx_ds_storage_layer, list_iterator_prefix, [?DS_SHARD, <<>>]). diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 56e0b23b8..ab1720754 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -147,8 +147,7 @@ sessioninfo() -> awaiting_rel = awaiting_rel(), max_awaiting_rel = non_neg_integer(), await_rel_timeout = safty_timeout(), - created_at = timestamp(), - iterators = [] + created_at = timestamp() }, emqx_session:info(Session) ). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 297c7d857..3cc7ca886 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_ds). +-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API: @@ -124,17 +125,15 @@ message_stats() -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}. +-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}. session_open(ClientID) -> case mnesia:dirty_read(?SESSION_TAB, ClientID) of - [#session{iterators = Iterators}] -> - IteratorIDs = maps:values(Iterators), - {false, ClientID, IteratorIDs}; + [#session{}] -> + {false, ClientID}; [] -> - Iterators = #{}, - Session = #session{id = ClientID, iterators = Iterators}, + Session = #session{id = ClientID}, mria:dirty_write(?SESSION_TAB, Session), - {true, ClientID, _IteratorIDs = []} + {true, ClientID} end. %% @doc Called when a client reconnects with `clean session=true' or diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index 55223068f..47493bd0b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -21,8 +21,10 @@ -define(DS_SHARD, emqx_ds_shard). -record(session, { + %% same as clientid id :: emqx_ds:session_id(), - iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()} + %% for future usage + props = #{} :: map() }). -record(iterator_ref, { diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 9bc7924e8..adede5322 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -19,6 +19,7 @@ discard_iterator/2, is_iterator_present/2, discard_iterator_prefix/2, + list_iterator_prefix/2, foldl_iterator_prefix/4 ]). @@ -203,7 +204,17 @@ discard_iterator(Shard, ReplayID) -> -spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> ok | {error, _TODO}. discard_iterator_prefix(Shard, KeyPrefix) -> - do_discard_iterator_prefix(Shard, KeyPrefix). + case do_discard_iterator_prefix(Shard, KeyPrefix) of + {ok, _} -> ok; + Error -> Error + end. + +-spec list_iterator_prefix( + emqx_ds:shard(), + binary() +) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}. +list_iterator_prefix(Shard, KeyPrefix) -> + do_list_iterator_prefix(Shard, KeyPrefix). -spec foldl_iterator_prefix( emqx_ds:shard(), @@ -377,7 +388,11 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, %% --define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>). +-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>). +-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin + <> = (KeyReplayState), + IteratorId +end). -define(ITERATION_WRITE_OPTS, []). -define(ITERATION_READ_OPTS, []). @@ -424,6 +439,13 @@ restore_iterator_state( It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, open_restore_iterator(meta_get_gen(Shard, Gen), It, State). +do_list_iterator_prefix(Shard, KeyPrefix) -> + Fn = fun(K0, _V, Acc) -> + K = ?KEY_REPLAY_STATE_PAT(K0), + [K | Acc] + end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []). + do_discard_iterator_prefix(Shard, KeyPrefix) -> #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 8a2d18c0d..73eb28d85 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -169,7 +169,7 @@ t_session_subscription_idempotency(Config) -> %% Exactly one iterator should have been opened. ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch( - {_IsNew = false, ClientId, _}, + {_IsNew = false, ClientId}, erpc:call(Node1, emqx_ds, session_open, [ClientId]) ), ok