Merge pull request #11586 from thalesmg/fix-session-record-m-20230911

fix(session): remove recently added `iterators` field from `#session{}` record
This commit is contained in:
Thales Macedo Garitezi 2023-09-11 16:03:26 -03:00 committed by GitHub
commit a054049d12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 51 deletions

View File

@ -49,11 +49,7 @@
%% Awaiting PUBREL Timeout (Unit: millisecond) %% Awaiting PUBREL Timeout (Unit: millisecond)
await_rel_timeout :: timeout(), await_rel_timeout :: timeout(),
%% Created at %% Created at
created_at :: pos_integer(), created_at :: pos_integer()
%% Topic filter to iterator ID mapping.
%% Note: we shouldn't serialize this when persisting sessions, as this information
%% also exists in the `?ITERATOR_REF_TAB' table.
iterators = #{} :: #{emqx_topic:topic() => emqx_ds:iterator_id()}
}). }).
-endif. -endif.

View File

@ -108,12 +108,6 @@ get_all_iterator_ids(Node) ->
emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
end). end).
get_session_iterators(Node, ClientId) ->
erpc:call(Node, fun() ->
[ConnPid] = emqx_cm:lookup_channels(ClientId),
emqx_connection:info({channel, {session, iterators}}, sys:get_state(ConnPid))
end).
wait_nodeup(Node) -> wait_nodeup(Node) ->
?retry( ?retry(
_Sleep0 = 500, _Sleep0 = 500,
@ -209,18 +203,14 @@ t_session_subscription_idempotency(Config) ->
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:connect(Client1),
ct:pal("subscribing 2"), ct:pal("subscribing 2"),
{ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
SessionIterators = get_session_iterators(Node1, ClientId),
ok = emqtt:stop(Client1), ok = emqtt:stop(Client1),
#{session_iterators => SessionIterators} ok
end, end,
fun(Res, Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
#{session_iterators := SessionIterators} = Res,
%% Exactly one iterator should have been opened. %% Exactly one iterator should have been opened.
?assertEqual(1, map_size(SessionIterators), #{iterators => SessionIterators}),
?assertMatch(#{SubTopicFilter := _}, SessionIterators),
SubTopicFilterWords = emqx_topic:words(SubTopicFilter), SubTopicFilterWords = emqx_topic:words(SubTopicFilter),
?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)), ?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)),
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
@ -321,17 +311,14 @@ t_session_unsubscription_idempotency(Config) ->
}, },
15_000 15_000
), ),
SessionIterators = get_session_iterators(Node1, ClientId),
ok = emqtt:stop(Client1), ok = emqtt:stop(Client1),
#{session_iterators => SessionIterators} ok
end, end,
fun(Res, Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
#{session_iterators := SessionIterators} = Res,
%% No iterators remaining %% No iterators remaining
?assertEqual(#{}, SessionIterators),
?assertEqual([], get_all_iterator_refs(Node1)), ?assertEqual([], get_all_iterator_refs(Node1)),
?assertEqual({ok, []}, get_all_iterator_ids(Node1)), ?assertEqual({ok, []}, get_all_iterator_ids(Node1)),
ok ok

View File

@ -24,7 +24,7 @@
persist_message/1, persist_message/1,
open_session/1, open_session/1,
add_subscription/2, add_subscription/2,
del_subscription/3 del_subscription/2
]). ]).
-export([ -export([
@ -139,21 +139,26 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) ->
{ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay), {ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay),
ok. ok.
-spec del_subscription(emqx_ds:iterator_id() | undefined, emqx_types:topic(), emqx_ds:session_id()) -> -spec del_subscription(emqx_types:topic(), emqx_ds:session_id()) ->
ok | {skipped, disabled}. ok | {skipped, disabled}.
del_subscription(IteratorID, TopicFilterBin, DSSessionID) -> del_subscription(TopicFilterBin, DSSessionID) ->
?WHEN_ENABLED( ?WHEN_ENABLED(
begin begin
TopicFilter = emqx_topic:words(TopicFilterBin), TopicFilter = emqx_topic:words(TopicFilterBin),
Ctx = #{iterator_id => IteratorID}, case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
?tp_span( {error, not_found} ->
persistent_session_ds_close_iterators, %% already gone
Ctx, ok;
ok = ensure_iterator_closed_on_all_shards(IteratorID) {ok, IteratorID} ->
), ?tp_span(
persistent_session_ds_close_iterators,
#{iterator_id => IteratorID},
ok = ensure_iterator_closed_on_all_shards(IteratorID)
)
end,
?tp_span( ?tp_span(
persistent_session_ds_iterator_delete, persistent_session_ds_iterator_delete,
Ctx, #{},
emqx_ds:session_del_iterator(DSSessionID, TopicFilter) emqx_ds:session_del_iterator(DSSessionID, TopicFilter)
) )
end end

View File

@ -269,9 +269,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout; Timeout;
info(created_at, #session{created_at = CreatedAt}) -> info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt; CreatedAt.
info(iterators, #session{iterators = Iterators}) ->
Iterators.
%% @doc Get stats of the session. %% @doc Get stats of the session.
-spec stats(session()) -> emqx_types:stats(). -spec stats(session()) -> emqx_types:stats().
@ -320,13 +318,8 @@ is_subscriptions_full(#session{
-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
session(). session().
add_persistent_subscription(TopicFilterBin, ClientId, Session) -> add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
{ok, IteratorId, _IsNew} -> Session.
Iterators = Session#session.iterators,
Session#session{iterators = Iterators#{TopicFilterBin => IteratorId}};
_ ->
Session
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client -> Broker: UNSUBSCRIBE %% Client -> Broker: UNSUBSCRIBE
@ -356,15 +349,8 @@ unsubscribe(
-spec remove_persistent_subscription(session(), emqx_types:topic(), emqx_types:clientid()) -> -spec remove_persistent_subscription(session(), emqx_types:topic(), emqx_types:clientid()) ->
session(). session().
remove_persistent_subscription(Session, TopicFilterBin, ClientId) -> remove_persistent_subscription(Session, TopicFilterBin, ClientId) ->
Iterators = Session#session.iterators, _ = emqx_persistent_session_ds:del_subscription(TopicFilterBin, ClientId),
case maps:get(TopicFilterBin, Iterators, undefined) of Session.
undefined ->
ok;
IteratorId ->
_ = emqx_persistent_session_ds:del_subscription(IteratorId, TopicFilterBin, ClientId),
ok
end,
Session#session{iterators = maps:remove(TopicFilterBin, Iterators)}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client -> Broker: PUBLISH %% Client -> Broker: PUBLISH