diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 96fd523e6..f834b8098 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -54,12 +54,12 @@ init_per_testcase(TestCase, Config) when init_per_testcase(t_session_gc = TestCase, Config) -> Opts = #{ n => 3, - roles => [core, core, replicant], + roles => [core, core, core], extra_emqx_conf => "\n session_persistence {" "\n last_alive_update_interval = 500ms " - "\n session_gc_interval = 2s " - "\n session_gc_batch_size = 1 " + "\n session_gc_interval = 1s " + "\n session_gc_batch_size = 2 " "\n }" }, Cluster = cluster(Opts), @@ -85,7 +85,6 @@ end_per_testcase(TestCase, Config) when Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), - snabbkaffe:stop(), ok; end_per_testcase(_TestCase, _Config) -> emqx_common_test_helpers:call_janitor(60_000), @@ -151,6 +150,7 @@ start_client(Opts0 = #{}) -> Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), ct:pal("starting client with opts:\n ~p", [Opts]), {ok, Client} = emqtt:start_link(Opts), + unlink(Client), on_exit(fun() -> catch emqtt:stop(Client) end), Client. @@ -182,20 +182,6 @@ list_all_subscriptions(Node) -> list_all_pubranges(Node) -> erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []). -prop_only_cores_run_gc(CoreNodes) -> - {"only core nodes run gc", fun(Trace) -> ?MODULE:prop_only_cores_run_gc(Trace, CoreNodes) end}. -prop_only_cores_run_gc(Trace, CoreNodes) -> - GCNodes = lists:usort([ - N - || #{ - ?snk_kind := K, - ?snk_meta := #{node := N} - } <- Trace, - lists:member(K, [ds_session_gc, ds_session_gc_lock_taken]), - N =/= node() - ]), - ?assertEqual(lists:usort(CoreNodes), GCNodes). - %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -236,6 +222,7 @@ t_session_subscription_idempotency(Config) -> ?check_trace( #{timetrap => 30_000}, begin + #{timetrap => 20_000}, ?force_ordering( #{?snk_kind := persistent_session_ds_subscription_added}, _NEvents0 = 1, @@ -298,10 +285,10 @@ t_session_unsubscription_idempotency(Config) -> ?check_trace( #{timetrap => 30_000}, begin + #{timetrap => 20_000}, ?force_ordering( #{ - ?snk_kind := persistent_session_ds_subscription_delete, - ?snk_span := {complete, _} + ?snk_kind := persistent_session_ds_subscription_delete }, _NEvents0 = 1, #{?snk_kind := will_restart_node}, @@ -452,6 +439,8 @@ do_t_session_discard(Params) -> ok. t_session_expiration1(Config) -> + %% This testcase verifies that the properties passed in the + %% CONNECT packet are respected by the GC process: ClientId = atom_to_binary(?FUNCTION_NAME), Opts = #{ clientid => ClientId, @@ -464,6 +453,9 @@ t_session_expiration1(Config) -> do_t_session_expiration(Config, Opts). t_session_expiration2(Config) -> + %% This testcase updates the expiry interval for the session in + %% the _DISCONNECT_ packet. This setting should be respected by GC + %% process: ClientId = atom_to_binary(?FUNCTION_NAME), Opts = #{ clientid => ClientId, @@ -478,6 +470,8 @@ t_session_expiration2(Config) -> do_t_session_expiration(Config, Opts). do_t_session_expiration(_Config, Opts) -> + %% Sequence is a list of pairs of properties passed through the + %% CONNECT and for the DISCONNECT for each session: #{ clientid := ClientId, sequence := [ @@ -510,7 +504,7 @@ do_t_session_expiration(_Config, Opts) -> ?assertEqual([], Subs1), emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn), - ct:sleep(1_500), + ct:sleep(2_500), Params2 = maps:merge(CommonParams, ThirdConn), Client2 = start_client(Params2), @@ -525,7 +519,6 @@ do_t_session_expiration(_Config, Opts) -> #{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId), ?assertEqual([], maps:to_list(Subs3)), emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn), - ok end, [] @@ -541,6 +534,7 @@ t_session_gc(Config) -> Port2, Port3 ] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes), + ct:pal("Ports: ~p", [[Port1, Port2, Port3]]), CommonParams = #{ clean_start => false, proto_ver => v5 @@ -618,11 +612,8 @@ t_session_gc(Config) -> ?block_until( #{ ?snk_kind := ds_session_gc_cleaned, - ?snk_meta := #{node := N, time := T}, - session_ids := [ClientId1] - } when - N =/= node() andalso T > T0, - 4 * GCInterval + 1_000 + session_id := ClientId1 + } ) ), ?assertMatch( @@ -630,19 +621,14 @@ t_session_gc(Config) -> ?block_until( #{ ?snk_kind := ds_session_gc_cleaned, - ?snk_meta := #{node := N, time := T}, - session_ids := [ClientId2] - } when - N =/= node() andalso T > T0, - 4 * GCInterval + 1_000 + session_id := ClientId2 + } ) ), ?assertMatch([_], list_all_sessions(Node1), sessions), ?assertMatch([_], list_all_subscriptions(Node1), subscriptions), ok end, - [ - prop_only_cores_run_gc(CoreNodes) - ] + [] ), ok. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index e5f08a6bb..d8019b6f1 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -331,10 +331,10 @@ unsubscribe( -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> + S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), - S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1), ?tp_span( persistent_session_ds_subscription_route_delete, @@ -510,9 +510,12 @@ replay_batch(Ifs0, Session, ClientInfo) -> %%-------------------------------------------------------------------- -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. -disconnect(Session = #{s := S0}, _ConnInfo) -> +disconnect(Session = #{s := S0}, ConnInfo) -> + OldConnInfo = emqx_persistent_session_ds_state:get_conninfo(S0), + NewConnInfo = maps:merge(OldConnInfo, maps:with([expiry_interval], ConnInfo)), S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0), - S = emqx_persistent_session_ds_state:commit(S1), + S2 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S1), + S = emqx_persistent_session_ds_state:commit(S2), {shutdown, Session#{s => S}}. -spec terminate(Reason :: term(), session()) -> ok. @@ -861,7 +864,7 @@ ensure_timers(Session0) -> -spec inc_send_quota(session()) -> session(). inc_send_quota(Session = #{inflight := Inflight0}) -> - {_NInflight, Inflight} = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0), + Inflight = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0), pull_now(Session#{inflight => Inflight}). -spec pull_now(session()) -> session(). diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl index 46e170492..4ff420eb8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl @@ -113,18 +113,25 @@ start_gc() -> gc_loop(MinLastAlive, It0) -> GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]), case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of - {[], _} -> + {[], _It} -> ok; {Sessions, It} -> - do_gc([ - Key - || {Key, #{last_alive_at := LastAliveAt}} <- Sessions, - LastAliveAt < MinLastAlive - ]), + [ + do_gc(SessionId, MinLastAlive, LastAliveAt, EI) + || {SessionId, #{last_alive_at := LastAliveAt, conninfo := #{expiry_interval := EI}}} <- + Sessions + ], gc_loop(MinLastAlive, It) end. -do_gc(DSSessionIds) -> - lists:foreach(fun emqx_persistent_session_ds:destroy_session/1, DSSessionIds), - ?tp(ds_session_gc_cleaned, #{session_ids => DSSessionIds}), +do_gc(SessionId, MinLastAlive, LastAliveAt, EI) when LastAliveAt + EI < MinLastAlive -> + emqx_persistent_session_ds:destroy_session(SessionId), + ?tp(error, ds_session_gc_cleaned, #{ + session_id => SessionId, + last_alive_at => LastAliveAt, + expiry_interval => EI, + min_last_alive => MinLastAlive + }), + ok; +do_gc(_SessionId, _MinLastAliveAt, _LastAliveAt, _EI) -> ok. diff --git a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl index 09962faa0..2938222e9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl @@ -125,10 +125,10 @@ n_inflight(#inflight{n_inflight = NInflight}) -> NInflight. %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control --spec inc_send_quota(t()) -> {non_neg_integer(), t()}. +-spec inc_send_quota(t()) -> t(). inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) -> NInflight = max(NInflight0 - 1, 0), - {NInflight, Rec#inflight{n_inflight = NInflight}}. + Rec#inflight{n_inflight = NInflight}. %%================================================================================ %% Internal functions diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 0c2bc450b..27519678d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -367,7 +367,7 @@ make_session_iterator() -> '$end_of_table' -> '$end_of_table'; Key -> - {true, Key} + Key end. -spec session_iterator_next(session_iterator(), pos_integer()) -> @@ -377,8 +377,11 @@ session_iterator_next(Cursor, 0) -> session_iterator_next('$end_of_table', _N) -> {[], '$end_of_table'}; session_iterator_next(Cursor0, N) -> - ThisVal = [{Cursor0, Metadata} || Metadata <- mnesia:dirty_read(?session_tab, Cursor0)], - {NextVals, Cursor} = session_iterator_next(Cursor0, N - 1), + ThisVal = [ + {Cursor0, Metadata} + || #kv{v = Metadata} <- mnesia:dirty_read(?session_tab, Cursor0) + ], + {NextVals, Cursor} = session_iterator_next(mnesia:dirty_next(?session_tab, Cursor0), N - 1), {ThisVal ++ NextVals, Cursor}. %%================================================================================ diff --git a/apps/emqx/src/emqx_persistent_session_ds_sup.erl b/apps/emqx/src/emqx_persistent_session_ds_sup.erl index 11e05be82..7b3fb7abb 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_sup.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 4647186aa..007b737c2 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -645,7 +645,7 @@ t_publish_many_while_client_is_gone_qos1(Config) -> #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1}, #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1}, #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1}, - #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1} + #mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M7">>, qos = 1} ], ok = publish_many(Pubs1), NPubs1 = length(Pubs1), @@ -686,11 +686,11 @@ t_publish_many_while_client_is_gone_qos1(Config) -> maybe_kill_connection_process(ClientId, Config), Pubs2 = [ - #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M8">>, qos = 1}, + #mqtt_msg{topic = <<"loc/3/4/6">>, payload = <<"M8">>, qos = 1}, #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1}, #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1}, #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1}, - #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1} + #mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M12">>, qos = 1} ], ok = publish_many(Pubs2), NPubs2 = length(Pubs2), @@ -719,7 +719,7 @@ t_publish_many_while_client_is_gone_qos1(Config) -> ?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}), %% ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}), %% ?assert(NMsgs2 >= NPubs - NAcked, Msgs2), - NSame = NMsgs2 - NPubs2, + NSame = max(0, NMsgs2 - NPubs2), ?assert( lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame)) ),