From 7ae6e04582f856c220d93b18e6665264010fa42e Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Wed, 27 Oct 2021 14:09:08 +0200 Subject: [PATCH 1/6] fix(persistent_sessions): channels can terminate without a session --- apps/emqx/src/emqx_channel.erl | 19 +++++++++++++------ apps/emqx/src/emqx_session.erl | 4 ++++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 7df7cd42d..fc25490c4 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1179,20 +1179,27 @@ terminate(_, #channel{conn_state = idle}) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); terminate({shutdown, kicked}, Channel) -> - _ = emqx_persistent_session:persist(Channel#channel.clientinfo, - Channel#channel.conninfo, - Channel#channel.session), + persist_if_session(Channel), run_terminate_hook(kicked, Channel); terminate({shutdown, Reason}, Channel) when Reason =:= discarded; Reason =:= takeovered -> run_terminate_hook(Reason, Channel); terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), - _ = emqx_persistent_session:persist(Channel#channel.clientinfo, - Channel#channel.conninfo, - Channel#channel.session), + persist_if_session(Channel), run_terminate_hook(Reason, Channel). +persist_if_session(#channel{session = Session} = Channel) -> + case emqx_session:is_session(Session) of + true -> + _ = emqx_persistent_session:persist(Channel#channel.clientinfo, + Channel#channel.conninfo, + Channel#channel.session), + ok; + false -> + ok + end. + run_terminate_hook(_Reason, #channel{session = undefined}) -> ok; run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) -> emqx_session:terminate(ClientInfo, Reason, Session). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 03414fc60..ab80bd2be 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -58,6 +58,7 @@ -export([ info/1 , info/2 + , is_session/1 , stats/1 ]). @@ -202,6 +203,9 @@ init(Opts) -> %% Info, Stats %%-------------------------------------------------------------------- +is_session(#session{}) -> true; +is_session(_) -> false. + %% @doc Get infos of the session. -spec(info(session()) -> emqx_types:infos()). info(Session) -> From ec68d7fc589b2b7d2f5de8fdf963ebede9511077 Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Wed, 27 Oct 2021 14:12:03 +0200 Subject: [PATCH 2/6] test(persistent_sessions): stabilize flaky tests --- apps/emqx/src/emqx_cm.erl | 9 ++- .../test/emqx_persistent_session_SUITE.erl | 68 ++++++++++++------- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 83f77050a..dbd8dfa63 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -58,7 +58,9 @@ , lookup_channels/2 ]). --export([all_channels/0]). +-export([ all_channels/0 + , all_client_ids/0 + ]). %% gen_server callbacks -export([ init/1 @@ -400,6 +402,11 @@ all_channels() -> Pat = [{{'_', '$1'}, [], ['$1']}], ets:select(?CHAN_TAB, Pat). +all_client_ids() -> + Pat = [{{'$1', '_'}, [], ['$1']}], + ets:select(?CHAN_TAB, Pat). + + %% @doc Lookup channels. -spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())). lookup_channels(ClientId) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index ac51636f0..3e992ce2d 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -113,6 +113,8 @@ init_per_group(snabbkaffe, Config) -> [ {kill_connection_process, true} | Config]; init_per_group(gc_tests, Config) -> %% We need to make sure the system does not interfere with this test group. + [maybe_kill_connection_process(ClientId, [{kill_connection_process, true}]) + || ClientId <- emqx_cm:all_client_ids()], emqx_common_test_helpers:stop_apps([]), SessionMsgEts = gc_tests_session_store, MsgEts = gc_tests_msg_store, @@ -230,32 +232,48 @@ receive_messages(Count, Msgs) -> maybe_kill_connection_process(ClientId, Config) -> case ?config(kill_connection_process, Config) of true -> - [ConnectionPid] = emqx_cm:lookup_channels(ClientId), - ?assert(is_pid(ConnectionPid)), - Ref = monitor(process, ConnectionPid), - ConnectionPid ! die_if_test, - receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok - after 3000 -> error(process_did_not_die) + case emqx_cm:lookup_channels(ClientId) of + [] -> + ok; + [ConnectionPid] -> + ?assert(is_pid(ConnectionPid)), + Ref = monitor(process, ConnectionPid), + ConnectionPid ! die_if_test, + receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok + after 3000 -> error(process_did_not_die) + end, + wait_for_cm_unregister(ClientId) end; false -> ok end. -snabbkaffe_sync_publish(Topic, Payloads, Config) -> +wait_for_cm_unregister(ClientId) -> + wait_for_cm_unregister(ClientId, 10). + +wait_for_cm_unregister(_ClientId, 0) -> + error(cm_did_not_unregister); +wait_for_cm_unregister(ClientId, N) -> + case emqx_cm:lookup_channels(ClientId) of + [] -> ok; + [_] -> timer:sleep(100), wait_for_cm_unregister(ClientId, N - 1) + end. + +snabbkaffe_sync_publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2) , #{?snk_kind := ps_persist_msg, payload := Payload} ) end, - do_publish(Payloads, Fun, Config). + do_publish(Payloads, Fun). -publish(Topic, Payloads, Config) -> +publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> {ok, _} = emqtt:publish(Client, Topic, Payload, 2) end, - do_publish(Payloads, Fun, Config). + do_publish(Payloads, Fun). -do_publish(Payloads = [_|_], PublishFun, Config) -> +do_publish(Payloads = [_|_], PublishFun) -> %% Publish from another process to avoid connection confusion. {Pid, Ref} = spawn_monitor( @@ -272,8 +290,8 @@ do_publish(Payloads = [_|_], PublishFun, Config) -> {'DOWN', Ref, process, Pid, normal} -> ok; {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What}) end; -do_publish(Payload, PublishFun, Config) -> - do_publish([Payload], PublishFun, Config). +do_publish(Payload, PublishFun) -> + do_publish([Payload], PublishFun). %%-------------------------------------------------------------------- %% Test Cases @@ -297,7 +315,7 @@ t_connect_session_expiry_interval(Config) -> maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payload, Config), + publish(Topic, Payload), {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, @@ -424,7 +442,7 @@ t_process_dies_session_expires(Config) -> maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload], Config), + ok = publish(Topic, [Payload]), SessionId = case ?config(persistent_store_enabled, Config) of @@ -498,7 +516,7 @@ t_publish_while_client_is_gone(Config) -> ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload1, Payload2], Config), + ok = publish(Topic, [Payload1, Payload2]), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, {clientid, ClientId}, @@ -544,7 +562,7 @@ t_clean_start_drops_subscriptions(Config) -> maybe_kill_connection_process(ClientId, Config), %% 2. - ok = publish(Topic, Payload1, Config), + ok = publish(Topic, Payload1), %% 3. {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, @@ -556,7 +574,7 @@ t_clean_start_drops_subscriptions(Config) -> ?assertEqual(0, client_info(session_present, Client2)), {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2), - ok = publish(Topic, Payload2, Config), + ok = publish(Topic, Payload2), [Msg1] = receive_messages(1), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)), @@ -571,7 +589,7 @@ t_clean_start_drops_subscriptions(Config) -> | Config]), {ok, _} = emqtt:ConnFun(Client3), - ok = publish(Topic, Payload3, Config), + ok = publish(Topic, Payload3), [Msg2] = receive_messages(1), ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)), @@ -625,7 +643,7 @@ t_multiple_subscription_matches(Config) -> maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payload, Config), + publish(Topic, Payload), {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, @@ -675,9 +693,9 @@ t_lost_messages_because_of_gc(Config) -> {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payload1, Config), + publish(Topic, Payload1), timer:sleep(2 * Retain), - publish(Topic, Payload2, Config), + publish(Topic, Payload2), emqx_persistent_session_gc:message_gc_worker(), {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {clean_start, false}, @@ -790,7 +808,7 @@ t_snabbkaffe_pending_messages(Config) -> ?check_trace( begin - snabbkaffe_sync_publish(Topic, Payloads, Config), + snabbkaffe_sync_publish(Topic, Payloads), {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), {ok, _} = emqtt:ConnFun(Client2), Msgs = receive_messages(length(Payloads)), @@ -829,7 +847,7 @@ t_snabbkaffe_buffered_messages(Config) -> ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payloads1, Config), + publish(Topic, Payloads1), ?check_trace( begin @@ -838,7 +856,7 @@ t_snabbkaffe_buffered_messages(Config) -> #{ ?snk_kind := ps_resume_end }), spawn_link(fun() -> ?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000), - publish(Topic, Payloads2, Config) + publish(Topic, Payloads2) end), {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), {ok, _} = emqtt:ConnFun(Client2), From 1f13a6caad009b4ad5629638d35e01e3b6d6da70 Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Thu, 28 Oct 2021 09:56:36 +0200 Subject: [PATCH 3/6] chore(persistent_sessions): tune mnesia parameters for better dump behavior --- apps/emqx/etc/emqx_cloud/vm.args | 4 ++++ apps/emqx/etc/emqx_edge/vm.args | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/apps/emqx/etc/emqx_cloud/vm.args b/apps/emqx/etc/emqx_cloud/vm.args index 1e6b0b4cb..0ee4b1e15 100644 --- a/apps/emqx/etc/emqx_cloud/vm.args +++ b/apps/emqx/etc/emqx_cloud/vm.args @@ -116,3 +116,7 @@ ## patches dir -pa {{ platform_data_dir }}/patches + +## Mnesia thresholds +-mnesia dump_log_write_threshold 5000 +-mnesia dump_log_time_threshold 60000 diff --git a/apps/emqx/etc/emqx_edge/vm.args b/apps/emqx/etc/emqx_edge/vm.args index ef9749738..70ce81f9f 100644 --- a/apps/emqx/etc/emqx_edge/vm.args +++ b/apps/emqx/etc/emqx_edge/vm.args @@ -114,3 +114,7 @@ ## patches dir -pa {{ platform_data_dir }}/patches + +## Mnesia thresholds +-mnesia dump_log_write_threshold 5000 +-mnesia dump_log_time_threshold 60000 From 329dd4d780afb096f2476bce110085d72023d53f Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Mon, 1 Nov 2021 10:17:00 +0100 Subject: [PATCH 4/6] test(persistent_session): try to fix flaky snabbkaffe failure --- apps/emqx/test/emqx_persistent_session_SUITE.erl | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 3e992ce2d..fbf1696f0 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -265,33 +265,37 @@ snabbkaffe_sync_publish(Topic, Payloads) -> , #{?snk_kind := ps_persist_msg, payload := Payload} ) end, - do_publish(Payloads, Fun). + do_publish(Payloads, Fun, true). publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> {ok, _} = emqtt:publish(Client, Topic, Payload, 2) end, - do_publish(Payloads, Fun). + do_publish(Payloads, Fun, false). -do_publish(Payloads = [_|_], PublishFun) -> +do_publish(Payloads = [_|_], PublishFun, WaitForUnregister) -> %% Publish from another process to avoid connection confusion. {Pid, Ref} = spawn_monitor( fun() -> %% For convenience, always publish using tcp. %% The publish path is not what we are testing. + ClientID = <<"ps_SUITE_publisher">>, {ok, Client} = emqtt:start_link([ {proto_ver, v5} + , {clientid, ClientID} , {port, 1883} ]), {ok, _} = emqtt:connect(Client), lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads), - ok = emqtt:disconnect(Client) + ok = emqtt:disconnect(Client), + %% Snabbkaffe sometimes fails unless all processes are gone. + [wait_for_cm_unregister(ClientID) || WaitForUnregister] end), receive {'DOWN', Ref, process, Pid, normal} -> ok; {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What}) end; -do_publish(Payload, PublishFun) -> - do_publish([Payload], PublishFun). +do_publish(Payload, PublishFun, WaitForUnregister) -> + do_publish([Payload], PublishFun, WaitForUnregister). %%-------------------------------------------------------------------- %% Test Cases From ce49a281ed40c53177e9b5a62949b8dc618ab2b5 Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Mon, 1 Nov 2021 13:50:16 +0100 Subject: [PATCH 5/6] fix(persistent_sessions): protect against looking up stale data --- apps/emqx/src/emqx_persistent_session.erl | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/emqx_persistent_session.erl index 71dac02c3..13c74b62e 100644 --- a/apps/emqx/src/emqx_persistent_session.erl +++ b/apps/emqx/src/emqx_persistent_session.erl @@ -179,12 +179,17 @@ timestamp_from_conninfo(ConnInfo) -> end. lookup(ClientID) when is_binary(ClientID) -> - case lookup_session_store(ClientID) of - none -> none; - {value, #session_store{session = S} = SS} -> - case persistent_session_status(SS) of - expired -> {expired, S}; - persistent -> {persistent, S} + case is_store_enabled() of + false -> + none; + true -> + case lookup_session_store(ClientID) of + none -> none; + {value, #session_store{session = S} = SS} -> + case persistent_session_status(SS) of + expired -> {expired, S}; + persistent -> {persistent, S} + end end end. From b7ed64918519de9229213e19fcf771d5ceaa932a Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Mon, 1 Nov 2021 14:56:10 +0100 Subject: [PATCH 6/6] test(persistent_session): wait in test to avoid race --- apps/emqx/test/emqx_persistent_session_SUITE.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index fbf1696f0..d5637cad2 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -378,6 +378,8 @@ t_cancel_on_disconnect(Config) -> {ok, _} = emqtt:ConnFun(Client1), ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}), + wait_for_cm_unregister(ClientId), + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, {clean_start, false},