From 9c6dd30f44a0d36ce12d4471bc3e6fb8e8cc4d70 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 30 Aug 2023 11:04:05 -0300 Subject: [PATCH] feat(session): store iterator ids in session record --- apps/emqx/include/emqx_session.hrl | 6 +++++- apps/emqx/integration_test/emqx_ds_SUITE.erl | 14 ++++++++++++-- apps/emqx/src/emqx_session.erl | 13 ++++++++++--- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl index 3fea157ed..304f92d58 100644 --- a/apps/emqx/include/emqx_session.hrl +++ b/apps/emqx/include/emqx_session.hrl @@ -49,7 +49,11 @@ %% Awaiting PUBREL Timeout (Unit: millisecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer() + created_at :: pos_integer(), + %% Topic filter to iterator ID mapping. + %% Note: we shouldn't serialize this when persisting sessions, as this information + %% also exists in the `?ITERATOR_REF_TAB' table. + iterators = #{} :: #{emqx_topic:topic() => emqx_ds:iterator_id()} }). -endif. diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index 3f0d3f3e4..8fdbf7fb4 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -97,6 +97,12 @@ get_all_iterator_ids(Node) -> emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) end). +get_session_iterators(Node, ClientId) -> + erpc:call(Node, fun() -> + [ConnPid] = emqx_cm:lookup_channels(ClientId), + emqx_connection:info({channel, {session, iterators}}, sys:get_state(ConnPid)) + end). + wait_nodeup(Node) -> ?retry( _Sleep0 = 500, @@ -191,14 +197,18 @@ t_session_subscription_idempotency(Config) -> {ok, _} = emqtt:connect(Client1), ct:pal("subscribing 2"), {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), + SessionIterators = get_session_iterators(Node1, ClientId), ok = emqtt:stop(Client1), - ok + #{session_iterators => SessionIterators} end, - fun(Trace) -> + fun(Res, Trace) -> ct:pal("trace:\n ~p", [Trace]), + #{session_iterators := SessionIterators} = Res, %% Exactly one iterator should have been opened. + ?assertEqual(1, map_size(SessionIterators), #{iterators => SessionIterators}), + ?assertMatch(#{SubTopicFilter := _}, SessionIterators), ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch( {_IsNew = false, ClientId}, diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 0c051f002..8c1df9d06 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -269,7 +269,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt. + CreatedAt; +info(iterators, #session{iterators = Iterators}) -> + Iterators. %% @doc Get stats of the session. -spec stats(session()) -> emqx_types:stats(). @@ -318,8 +320,13 @@ is_subscriptions_full(#session{ -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> session(). add_persistent_subscription(TopicFilterBin, ClientId, Session) -> - _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId), - Session. + case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of + {ok, IteratorId, _IsNew} -> + Iterators = Session#session.iterators, + Session#session{iterators = Iterators#{TopicFilterBin => IteratorId}}; + _ -> + Session + end. %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE