diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index f806a57fc..96fd523e6 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -85,6 +85,7 @@ end_per_testcase(TestCase, Config) when Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), + snabbkaffe:stop(), ok; end_per_testcase(_TestCase, _Config) -> emqx_common_test_helpers:call_janitor(60_000), @@ -164,10 +165,19 @@ is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) EI > 0. list_all_sessions(Node) -> - erpc:call(Node, emqx_persistent_session_ds, list_all_sessions, []). + erpc:call(Node, emqx_persistent_session_ds_state, list_sessions, []). list_all_subscriptions(Node) -> - erpc:call(Node, emqx_persistent_session_ds, list_all_subscriptions, []). + Sessions = list_all_sessions(Node), + lists:flatmap( + fun(ClientId) -> + #{s := #{subscriptions := Subs}} = erpc:call( + Node, emqx_persistent_session_ds, print_session, [ClientId] + ), + maps:to_list(Subs) + end, + Sessions + ). list_all_pubranges(Node) -> erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []). @@ -485,7 +495,7 @@ do_t_session_expiration(_Config, Opts) -> Client0 = start_client(Params0), {ok, _} = emqtt:connect(Client0), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2), - #{subscriptions := Subs0} = emqx_persistent_session_ds:print_session(ClientId), + #{s := #{subscriptions := Subs0}} = emqx_persistent_session_ds:print_session(ClientId), ?assertEqual(1, map_size(Subs0), #{subs => Subs0}), Info0 = maps:from_list(emqtt:info(Client0)), ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}), @@ -512,7 +522,8 @@ do_t_session_expiration(_Config, Opts) -> emqtt:publish(Client2, Topic, <<"payload">>), ?assertNotReceive({publish, #{topic := Topic}}), %% ensure subscriptions are absent from table. - ?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()), + #{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId), + ?assertEqual([], maps:to_list(Subs3)), emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn), ok @@ -580,10 +591,8 @@ t_session_gc(Config) -> ), ?assertMatch({ok, _}, Res0), {ok, #{?snk_meta := #{time := T0}}} = Res0, - Sessions0 = list_all_sessions(Node1), - Subs0 = list_all_subscriptions(Node1), - ?assertEqual(3, map_size(Sessions0), #{sessions => Sessions0}), - ?assertEqual(3, map_size(Subs0), #{subs => Subs0}), + ?assertMatch([_, _, _], list_all_sessions(Node1), sessions), + ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions), %% Now we disconnect 2 of them; only those should be GC'ed. ?assertMatch( @@ -628,11 +637,8 @@ t_session_gc(Config) -> 4 * GCInterval + 1_000 ) ), - Sessions1 = list_all_sessions(Node1), - Subs1 = list_all_subscriptions(Node1), - ?assertEqual(1, map_size(Sessions1), #{sessions => Sessions1}), - ?assertEqual(1, map_size(Subs1), #{subs => Subs1}), - + ?assertMatch([_], list_all_sessions(Node1), sessions), + ?assertMatch([_], list_all_subscriptions(Node1), subscriptions), ok end, [ diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b26a4e983..e5f08a6bb 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -20,7 +20,7 @@ -include("emqx.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). -include("emqx_mqtt.hrl"). @@ -188,7 +188,7 @@ destroy(#{clientid := ClientID}) -> destroy_session(ClientID). destroy_session(ClientID) -> - session_drop(ClientID). + session_drop(ClientID, destroy). %%-------------------------------------------------------------------- %% Info, Stats @@ -321,19 +321,28 @@ unsubscribe( Session = #{id := ID, s := S0} ) -> case subs_lookup(TopicFilter, S0) of - #{props := SubOpts, id := SubId} -> - S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), - S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1), - ?tp_span( - persistent_session_ds_subscription_route_delete, - #{session_id => ID}, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, ID) - ), - {ok, Session#{s => S}, SubOpts}; undefined -> - {error, ?RC_NO_SUBSCRIPTION_EXISTED} + {error, ?RC_NO_SUBSCRIPTION_EXISTED}; + Subscription = #{props := SubOpts} -> + S = do_unsubscribe(ID, TopicFilter, Subscription, S0), + {ok, Session#{s => S}, SubOpts} end. +-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds_state:t(). +do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> + ?tp(persistent_session_ds_subscription_delete, #{ + session_id => SessionId, topic_filter => TopicFilter + }), + S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), + S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1), + ?tp_span( + persistent_session_ds_subscription_route_delete, + #{session_id => SessionId, topic_filter => TopicFilter}, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) + ), + S. + -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. get_subscription(TopicFilter, #{s := S}) -> @@ -534,12 +543,6 @@ sync(ClientId) -> {error, noproc} end. --define(IS_EXPIRED(NOW_MS, LAST_ALIVE_AT, EI), - (is_number(LAST_ALIVE_AT) andalso - is_number(EI) andalso - (NOW_MS >= LAST_ALIVE_AT + EI)) -). - %% @doc Called when a client connects. This function looks up a %% session or returns `false` if previous one couldn't be found. %% @@ -553,11 +556,12 @@ session_open(SessionId, NewConnInfo) -> {ok, S0} -> EI = expiry_interval(emqx_persistent_session_ds_state:get_conninfo(S0)), LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0), - case ?IS_EXPIRED(NowMS, LastAliveAt, EI) of + case NowMS >= LastAliveAt + EI of true -> - emqx_persistent_session_ds_state:delete(SessionId), + session_drop(SessionId, expired), false; false -> + ?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}), %% New connection being established S1 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S0), S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1), @@ -608,9 +612,22 @@ session_ensure_new(Id, ConnInfo, Conf) -> %% @doc Called when a client reconnects with `clean session=true' or %% during session GC --spec session_drop(id()) -> ok. -session_drop(ID) -> - emqx_persistent_session_ds_state:delete(ID). +-spec session_drop(id(), _Reason) -> ok. +session_drop(ID, Reason) -> + case emqx_persistent_session_ds_state:open(ID) of + {ok, S0} -> + ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}), + _S = subs_fold( + fun(TopicFilter, Subscription, S) -> + do_unsubscribe(ID, TopicFilter, Subscription, S) + end, + S0, + S0 + ), + emqx_persistent_session_ds_state:delete(ID); + undefined -> + ok + end. now_ms() -> erlang:system_time(millisecond). @@ -1083,7 +1100,7 @@ seqno_proper_test_() -> end, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}}. -apply_n_times(0, Fun, A) -> +apply_n_times(0, _Fun, A) -> A; apply_n_times(N, Fun, A) when N > 0 -> apply_n_times(N - 1, Fun, Fun(A)). diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 504e9649c..0c2bc450b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -159,7 +159,7 @@ open(SessionId) -> streams => pmap_open(?stream_tab, SessionId), seqnos => pmap_open(?seqno_tab, SessionId), ranks => pmap_open(?rank_tab, SessionId), - dirty => false + ?unset_dirty }, {ok, Rec}; [] -> @@ -222,17 +222,16 @@ commit( ranks := Ranks } ) -> - check_sequence( - transaction(fun() -> - kv_persist(?session_tab, SessionId, Metadata), - Rec#{ - streams => pmap_commit(SessionId, Streams), - seqnos => pmap_commit(SessionId, SeqNos), - ranks => pmap_commit(SessionId, Ranks), - ?unset_dirty - } - end) - ). + check_sequence(Rec), + transaction(fun() -> + kv_persist(?session_tab, SessionId, Metadata), + Rec#{ + streams => pmap_commit(SessionId, Streams), + seqnos => pmap_commit(SessionId, SeqNos), + ranks => pmap_commit(SessionId, Ranks), + ?unset_dirty + } + end). -spec create_new(emqx_persistent_session_ds:id()) -> t(). create_new(SessionId) -> diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 7acfb6214..6da60b809 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -216,31 +216,7 @@ t_session_subscription_iterators(Config) -> messages => [Message1, Message2, Message3, Message4] } end, - fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), - case ?of_kind(ds_session_subscription_added, Trace) of - [] -> - %% Since `emqx_durable_storage' is a dependency of `emqx', it gets - %% compiled in "prod" mode when running emqx standalone tests. - ok; - [_ | _] -> - ?assertMatch( - [ - #{?snk_kind := ds_session_subscription_added}, - #{?snk_kind := ds_session_subscription_present} - ], - ?of_kind( - [ - ds_session_subscription_added, - ds_session_subscription_present - ], - Trace - ) - ), - ok - end, - ok - end + [] ), ok. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 23f69a81b..6f21fc216 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -594,7 +594,7 @@ fields("node") -> sc( hoconsc:enum([gen_rpc, distr]), #{ - mapping => "mria.shardp_transport", + mapping => "mria.shard_transport", importance => ?IMPORTANCE_HIDDEN, default => distr, desc => ?DESC(db_default_shard_transport)