fix(sessds): Fix session garbage collection after the refactoring

This commit is contained in:
ieQu1 2024-01-10 23:46:18 +01:00
parent 963df8f941
commit 893656f092
7 changed files with 57 additions and 58 deletions

View File

@ -54,12 +54,12 @@ init_per_testcase(TestCase, Config) when
init_per_testcase(t_session_gc = TestCase, Config) -> init_per_testcase(t_session_gc = TestCase, Config) ->
Opts = #{ Opts = #{
n => 3, n => 3,
roles => [core, core, replicant], roles => [core, core, core],
extra_emqx_conf => extra_emqx_conf =>
"\n session_persistence {" "\n session_persistence {"
"\n last_alive_update_interval = 500ms " "\n last_alive_update_interval = 500ms "
"\n session_gc_interval = 2s " "\n session_gc_interval = 1s "
"\n session_gc_batch_size = 1 " "\n session_gc_batch_size = 2 "
"\n }" "\n }"
}, },
Cluster = cluster(Opts), Cluster = cluster(Opts),
@ -85,7 +85,6 @@ end_per_testcase(TestCase, Config) when
Nodes = ?config(nodes, Config), Nodes = ?config(nodes, Config),
emqx_common_test_helpers:call_janitor(60_000), emqx_common_test_helpers:call_janitor(60_000),
ok = emqx_cth_cluster:stop(Nodes), ok = emqx_cth_cluster:stop(Nodes),
snabbkaffe:stop(),
ok; ok;
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(60_000), emqx_common_test_helpers:call_janitor(60_000),
@ -151,6 +150,7 @@ start_client(Opts0 = #{}) ->
Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
ct:pal("starting client with opts:\n ~p", [Opts]), ct:pal("starting client with opts:\n ~p", [Opts]),
{ok, Client} = emqtt:start_link(Opts), {ok, Client} = emqtt:start_link(Opts),
unlink(Client),
on_exit(fun() -> catch emqtt:stop(Client) end), on_exit(fun() -> catch emqtt:stop(Client) end),
Client. Client.
@ -182,20 +182,6 @@ list_all_subscriptions(Node) ->
list_all_pubranges(Node) -> list_all_pubranges(Node) ->
erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []). erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
prop_only_cores_run_gc(CoreNodes) ->
{"only core nodes run gc", fun(Trace) -> ?MODULE:prop_only_cores_run_gc(Trace, CoreNodes) end}.
prop_only_cores_run_gc(Trace, CoreNodes) ->
GCNodes = lists:usort([
N
|| #{
?snk_kind := K,
?snk_meta := #{node := N}
} <- Trace,
lists:member(K, [ds_session_gc, ds_session_gc_lock_taken]),
N =/= node()
]),
?assertEqual(lists:usort(CoreNodes), GCNodes).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -236,6 +222,7 @@ t_session_subscription_idempotency(Config) ->
?check_trace( ?check_trace(
#{timetrap => 30_000}, #{timetrap => 30_000},
begin begin
#{timetrap => 20_000},
?force_ordering( ?force_ordering(
#{?snk_kind := persistent_session_ds_subscription_added}, #{?snk_kind := persistent_session_ds_subscription_added},
_NEvents0 = 1, _NEvents0 = 1,
@ -298,10 +285,10 @@ t_session_unsubscription_idempotency(Config) ->
?check_trace( ?check_trace(
#{timetrap => 30_000}, #{timetrap => 30_000},
begin begin
#{timetrap => 20_000},
?force_ordering( ?force_ordering(
#{ #{
?snk_kind := persistent_session_ds_subscription_delete, ?snk_kind := persistent_session_ds_subscription_delete
?snk_span := {complete, _}
}, },
_NEvents0 = 1, _NEvents0 = 1,
#{?snk_kind := will_restart_node}, #{?snk_kind := will_restart_node},
@ -452,6 +439,8 @@ do_t_session_discard(Params) ->
ok. ok.
t_session_expiration1(Config) -> t_session_expiration1(Config) ->
%% This testcase verifies that the properties passed in the
%% CONNECT packet are respected by the GC process:
ClientId = atom_to_binary(?FUNCTION_NAME), ClientId = atom_to_binary(?FUNCTION_NAME),
Opts = #{ Opts = #{
clientid => ClientId, clientid => ClientId,
@ -464,6 +453,9 @@ t_session_expiration1(Config) ->
do_t_session_expiration(Config, Opts). do_t_session_expiration(Config, Opts).
t_session_expiration2(Config) -> t_session_expiration2(Config) ->
%% This testcase updates the expiry interval for the session in
%% the _DISCONNECT_ packet. This setting should be respected by GC
%% process:
ClientId = atom_to_binary(?FUNCTION_NAME), ClientId = atom_to_binary(?FUNCTION_NAME),
Opts = #{ Opts = #{
clientid => ClientId, clientid => ClientId,
@ -478,6 +470,8 @@ t_session_expiration2(Config) ->
do_t_session_expiration(Config, Opts). do_t_session_expiration(Config, Opts).
do_t_session_expiration(_Config, Opts) -> do_t_session_expiration(_Config, Opts) ->
%% Sequence is a list of pairs of properties passed through the
%% CONNECT and for the DISCONNECT for each session:
#{ #{
clientid := ClientId, clientid := ClientId,
sequence := [ sequence := [
@ -510,7 +504,7 @@ do_t_session_expiration(_Config, Opts) ->
?assertEqual([], Subs1), ?assertEqual([], Subs1),
emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn), emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn),
ct:sleep(1_500), ct:sleep(2_500),
Params2 = maps:merge(CommonParams, ThirdConn), Params2 = maps:merge(CommonParams, ThirdConn),
Client2 = start_client(Params2), Client2 = start_client(Params2),
@ -525,7 +519,6 @@ do_t_session_expiration(_Config, Opts) ->
#{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId), #{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId),
?assertEqual([], maps:to_list(Subs3)), ?assertEqual([], maps:to_list(Subs3)),
emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn), emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn),
ok ok
end, end,
[] []
@ -541,6 +534,7 @@ t_session_gc(Config) ->
Port2, Port2,
Port3 Port3
] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes), ] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes),
ct:pal("Ports: ~p", [[Port1, Port2, Port3]]),
CommonParams = #{ CommonParams = #{
clean_start => false, clean_start => false,
proto_ver => v5 proto_ver => v5
@ -618,11 +612,8 @@ t_session_gc(Config) ->
?block_until( ?block_until(
#{ #{
?snk_kind := ds_session_gc_cleaned, ?snk_kind := ds_session_gc_cleaned,
?snk_meta := #{node := N, time := T}, session_id := ClientId1
session_ids := [ClientId1] }
} when
N =/= node() andalso T > T0,
4 * GCInterval + 1_000
) )
), ),
?assertMatch( ?assertMatch(
@ -630,19 +621,14 @@ t_session_gc(Config) ->
?block_until( ?block_until(
#{ #{
?snk_kind := ds_session_gc_cleaned, ?snk_kind := ds_session_gc_cleaned,
?snk_meta := #{node := N, time := T}, session_id := ClientId2
session_ids := [ClientId2] }
} when
N =/= node() andalso T > T0,
4 * GCInterval + 1_000
) )
), ),
?assertMatch([_], list_all_sessions(Node1), sessions), ?assertMatch([_], list_all_sessions(Node1), sessions),
?assertMatch([_], list_all_subscriptions(Node1), subscriptions), ?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
ok ok
end, end,
[ []
prop_only_cores_run_gc(CoreNodes)
]
), ),
ok. ok.

View File

@ -331,10 +331,10 @@ unsubscribe(
-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
emqx_persistent_session_ds_state:t(). emqx_persistent_session_ds_state:t().
do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) ->
S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
?tp(persistent_session_ds_subscription_delete, #{ ?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter session_id => SessionId, topic_filter => TopicFilter
}), }),
S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1), S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1),
?tp_span( ?tp_span(
persistent_session_ds_subscription_route_delete, persistent_session_ds_subscription_route_delete,
@ -510,9 +510,12 @@ replay_batch(Ifs0, Session, ClientInfo) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{s := S0}, _ConnInfo) -> disconnect(Session = #{s := S0}, ConnInfo) ->
OldConnInfo = emqx_persistent_session_ds_state:get_conninfo(S0),
NewConnInfo = maps:merge(OldConnInfo, maps:with([expiry_interval], ConnInfo)),
S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0), S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0),
S = emqx_persistent_session_ds_state:commit(S1), S2 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S1),
S = emqx_persistent_session_ds_state:commit(S2),
{shutdown, Session#{s => S}}. {shutdown, Session#{s => S}}.
-spec terminate(Reason :: term(), session()) -> ok. -spec terminate(Reason :: term(), session()) -> ok.
@ -861,7 +864,7 @@ ensure_timers(Session0) ->
-spec inc_send_quota(session()) -> session(). -spec inc_send_quota(session()) -> session().
inc_send_quota(Session = #{inflight := Inflight0}) -> inc_send_quota(Session = #{inflight := Inflight0}) ->
{_NInflight, Inflight} = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0), Inflight = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0),
pull_now(Session#{inflight => Inflight}). pull_now(Session#{inflight => Inflight}).
-spec pull_now(session()) -> session(). -spec pull_now(session()) -> session().

View File

@ -113,18 +113,25 @@ start_gc() ->
gc_loop(MinLastAlive, It0) -> gc_loop(MinLastAlive, It0) ->
GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]), GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of
{[], _} -> {[], _It} ->
ok; ok;
{Sessions, It} -> {Sessions, It} ->
do_gc([ [
Key do_gc(SessionId, MinLastAlive, LastAliveAt, EI)
|| {Key, #{last_alive_at := LastAliveAt}} <- Sessions, || {SessionId, #{last_alive_at := LastAliveAt, conninfo := #{expiry_interval := EI}}} <-
LastAliveAt < MinLastAlive Sessions
]), ],
gc_loop(MinLastAlive, It) gc_loop(MinLastAlive, It)
end. end.
do_gc(DSSessionIds) -> do_gc(SessionId, MinLastAlive, LastAliveAt, EI) when LastAliveAt + EI < MinLastAlive ->
lists:foreach(fun emqx_persistent_session_ds:destroy_session/1, DSSessionIds), emqx_persistent_session_ds:destroy_session(SessionId),
?tp(ds_session_gc_cleaned, #{session_ids => DSSessionIds}), ?tp(error, ds_session_gc_cleaned, #{
session_id => SessionId,
last_alive_at => LastAliveAt,
expiry_interval => EI,
min_last_alive => MinLastAlive
}),
ok;
do_gc(_SessionId, _MinLastAliveAt, _LastAliveAt, _EI) ->
ok. ok.

View File

@ -125,10 +125,10 @@ n_inflight(#inflight{n_inflight = NInflight}) ->
NInflight. NInflight.
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control
-spec inc_send_quota(t()) -> {non_neg_integer(), t()}. -spec inc_send_quota(t()) -> t().
inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) -> inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) ->
NInflight = max(NInflight0 - 1, 0), NInflight = max(NInflight0 - 1, 0),
{NInflight, Rec#inflight{n_inflight = NInflight}}. Rec#inflight{n_inflight = NInflight}.
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions

View File

@ -367,7 +367,7 @@ make_session_iterator() ->
'$end_of_table' -> '$end_of_table' ->
'$end_of_table'; '$end_of_table';
Key -> Key ->
{true, Key} Key
end. end.
-spec session_iterator_next(session_iterator(), pos_integer()) -> -spec session_iterator_next(session_iterator(), pos_integer()) ->
@ -377,8 +377,11 @@ session_iterator_next(Cursor, 0) ->
session_iterator_next('$end_of_table', _N) -> session_iterator_next('$end_of_table', _N) ->
{[], '$end_of_table'}; {[], '$end_of_table'};
session_iterator_next(Cursor0, N) -> session_iterator_next(Cursor0, N) ->
ThisVal = [{Cursor0, Metadata} || Metadata <- mnesia:dirty_read(?session_tab, Cursor0)], ThisVal = [
{NextVals, Cursor} = session_iterator_next(Cursor0, N - 1), {Cursor0, Metadata}
|| #kv{v = Metadata} <- mnesia:dirty_read(?session_tab, Cursor0)
],
{NextVals, Cursor} = session_iterator_next(mnesia:dirty_next(?session_tab, Cursor0), N - 1),
{ThisVal ++ NextVals, Cursor}. {ThisVal ++ NextVals, Cursor}.
%%================================================================================ %%================================================================================

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -645,7 +645,7 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
#mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1}, #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1},
#mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1}, #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1},
#mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1}, #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1} #mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M7">>, qos = 1}
], ],
ok = publish_many(Pubs1), ok = publish_many(Pubs1),
NPubs1 = length(Pubs1), NPubs1 = length(Pubs1),
@ -686,11 +686,11 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
maybe_kill_connection_process(ClientId, Config), maybe_kill_connection_process(ClientId, Config),
Pubs2 = [ Pubs2 = [
#mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M8">>, qos = 1}, #mqtt_msg{topic = <<"loc/3/4/6">>, payload = <<"M8">>, qos = 1},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1}, #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1}, #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1}, #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1} #mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M12">>, qos = 1}
], ],
ok = publish_many(Pubs2), ok = publish_many(Pubs2),
NPubs2 = length(Pubs2), NPubs2 = length(Pubs2),
@ -719,7 +719,7 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}), ?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}),
%% ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}), %% ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}),
%% ?assert(NMsgs2 >= NPubs - NAcked, Msgs2), %% ?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
NSame = NMsgs2 - NPubs2, NSame = max(0, NMsgs2 - NPubs2),
?assert( ?assert(
lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame)) lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame))
), ),