From 063913f2451c024dbcd45710e83d64771198b460 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 11 Sep 2023 14:43:04 -0300 Subject: [PATCH] fix(session): remove recently added `iterators` field from `#session{}` record Fixes https://emqx.atlassian.net/browse/EMQX-10945 This could lead to `badrecord` errors being raised if a takeover were to happen during a rolling cluster upgrade, as the old nodes could receive a record with more fields than expected. --- apps/emqx/include/emqx_session.hrl | 6 +---- apps/emqx/integration_test/emqx_ds_SUITE.erl | 21 ++++------------ apps/emqx/src/emqx_persistent_session_ds.erl | 25 ++++++++++++-------- apps/emqx/src/emqx_session.erl | 24 ++++--------------- 4 files changed, 25 insertions(+), 51 deletions(-) diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl index 304f92d58..3fea157ed 100644 --- a/apps/emqx/include/emqx_session.hrl +++ b/apps/emqx/include/emqx_session.hrl @@ -49,11 +49,7 @@ %% Awaiting PUBREL Timeout (Unit: millisecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer(), - %% Topic filter to iterator ID mapping. - %% Note: we shouldn't serialize this when persisting sessions, as this information - %% also exists in the `?ITERATOR_REF_TAB' table. - iterators = #{} :: #{emqx_topic:topic() => emqx_ds:iterator_id()} + created_at :: pos_integer() }). -endif. diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index cbfa5c185..d721df5ed 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -108,12 +108,6 @@ get_all_iterator_ids(Node) -> emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) end). -get_session_iterators(Node, ClientId) -> - erpc:call(Node, fun() -> - [ConnPid] = emqx_cm:lookup_channels(ClientId), - emqx_connection:info({channel, {session, iterators}}, sys:get_state(ConnPid)) - end). - wait_nodeup(Node) -> ?retry( _Sleep0 = 500, @@ -209,18 +203,14 @@ t_session_subscription_idempotency(Config) -> {ok, _} = emqtt:connect(Client1), ct:pal("subscribing 2"), {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), - SessionIterators = get_session_iterators(Node1, ClientId), ok = emqtt:stop(Client1), - #{session_iterators => SessionIterators} + ok end, - fun(Res, Trace) -> + fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - #{session_iterators := SessionIterators} = Res, %% Exactly one iterator should have been opened. - ?assertEqual(1, map_size(SessionIterators), #{iterators => SessionIterators}), - ?assertMatch(#{SubTopicFilter := _}, SessionIterators), SubTopicFilterWords = emqx_topic:words(SubTopicFilter), ?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)), ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), @@ -321,17 +311,14 @@ t_session_unsubscription_idempotency(Config) -> }, 15_000 ), - SessionIterators = get_session_iterators(Node1, ClientId), ok = emqtt:stop(Client1), - #{session_iterators => SessionIterators} + ok end, - fun(Res, Trace) -> + fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - #{session_iterators := SessionIterators} = Res, %% No iterators remaining - ?assertEqual(#{}, SessionIterators), ?assertEqual([], get_all_iterator_refs(Node1)), ?assertEqual({ok, []}, get_all_iterator_ids(Node1)), ok diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 83c2375f2..4aa9175cd 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -24,7 +24,7 @@ persist_message/1, open_session/1, add_subscription/2, - del_subscription/3 + del_subscription/2 ]). -export([ @@ -139,21 +139,26 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) -> {ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay), ok. --spec del_subscription(emqx_ds:iterator_id() | undefined, emqx_types:topic(), emqx_ds:session_id()) -> +-spec del_subscription(emqx_types:topic(), emqx_ds:session_id()) -> ok | {skipped, disabled}. -del_subscription(IteratorID, TopicFilterBin, DSSessionID) -> +del_subscription(TopicFilterBin, DSSessionID) -> ?WHEN_ENABLED( begin TopicFilter = emqx_topic:words(TopicFilterBin), - Ctx = #{iterator_id => IteratorID}, - ?tp_span( - persistent_session_ds_close_iterators, - Ctx, - ok = ensure_iterator_closed_on_all_shards(IteratorID) - ), + case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of + {error, not_found} -> + %% already gone + ok; + {ok, IteratorID} -> + ?tp_span( + persistent_session_ds_close_iterators, + #{iterator_id => IteratorID}, + ok = ensure_iterator_closed_on_all_shards(IteratorID) + ) + end, ?tp_span( persistent_session_ds_iterator_delete, - Ctx, + #{}, emqx_ds:session_del_iterator(DSSessionID, TopicFilter) ) end diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 859cee76b..ebc1a00a3 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -269,9 +269,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt; -info(iterators, #session{iterators = Iterators}) -> - Iterators. + CreatedAt. %% @doc Get stats of the session. -spec stats(session()) -> emqx_types:stats(). @@ -320,13 +318,8 @@ is_subscriptions_full(#session{ -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> session(). add_persistent_subscription(TopicFilterBin, ClientId, Session) -> - case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of - {ok, IteratorId, _IsNew} -> - Iterators = Session#session.iterators, - Session#session{iterators = Iterators#{TopicFilterBin => IteratorId}}; - _ -> - Session - end. + _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId), + Session. %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE @@ -356,15 +349,8 @@ unsubscribe( -spec remove_persistent_subscription(session(), emqx_types:topic(), emqx_types:clientid()) -> session(). remove_persistent_subscription(Session, TopicFilterBin, ClientId) -> - Iterators = Session#session.iterators, - case maps:get(TopicFilterBin, Iterators, undefined) of - undefined -> - ok; - IteratorId -> - _ = emqx_persistent_session_ds:del_subscription(IteratorId, TopicFilterBin, ClientId), - ok - end, - Session#session{iterators = maps:remove(TopicFilterBin, Iterators)}. + _ = emqx_persistent_session_ds:del_subscription(TopicFilterBin, ClientId), + Session. %%-------------------------------------------------------------------- %% Client -> Broker: PUBLISH