From f0769cb765cbed054226123f983986481c42c8e1 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 10 Oct 2022 13:52:09 +0800 Subject: [PATCH 1/3] fix(shared_sub): kick session should not cause session message redispatch --- apps/emqx/src/emqx_session.erl | 9 ++- apps/emqx/test/emqx_shared_sub_SUITE.erl | 71 ++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index b285d0a88..6567bf5c8 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -802,8 +802,7 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. terminate(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session), - Reason =/= takenover andalso - redispatch_shared_messages(Session), + redispatch_shared_messages(Reason, Session), ok. run_terminate_hooks(ClientInfo, discarded, Session) -> @@ -813,7 +812,11 @@ run_terminate_hooks(ClientInfo, takenover, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> +redispatch_shared_messages(takenover, _Session) -> + ok; +redispatch_shared_messages(kicked, _Session) -> + ok; +redispatch_shared_messages(_Reason, #session{inflight = Inflight, mqueue = Q}) -> AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight), F = fun ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 291286aa2..2cdf2bc05 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -817,6 +817,77 @@ t_dispatch_qos0(Config) when is_list(Config) -> emqtt:stop(ConnPid2), ok. +t_redispatch_when_kicked({init, Config}) when is_list(Config) -> + Config; +t_redispatch_when_kicked({'end', Config}) when is_list(Config) -> + ok; +t_redispatch_when_kicked(_) -> + ok = ensure_config(sticky, true), + + Group = <<"group1">>, + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + Parent = self(), + + %% emqx_cm:kick_session will cause the emqtt to exit abnormally + %% so here need to isolate emqtt with a separate process + MkSub = fun(ClientId) -> + {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid), + {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, { + <<"$share/", Group/binary, "/foo/bar">>, 1 + }), + + Loop = fun(Self) -> + receive + wait -> + case last_message(<<"hello1">>, [ConnPid], 6000) of + {true, _} -> + Parent ! {got, ClientId}; + _ -> + Parent ! nothing + end, + Self(Self); + stop -> + stop + end + end, + + Loop(Loop) + end, + + Receive = fun() -> + receive + {got, _} = Got -> Got; + nothing -> nothing + after 6000 -> + nothing + end + end, + + Subs = [erlang:spawn(fun() -> MkSub(ClientId) end) || ClientId <- [ClientId1, ClientId2]], + timer:sleep(500), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + emqx:publish(Message), + + [Sub ! wait || Sub <- Subs], + timer:sleep(200), + Res = Receive(), + ?assertMatch({got, _}, Res), + + {got, ClientId} = Res, + emqx_cm:kick_session(ClientId), + + [Sub ! wait || Sub <- Subs], + timer:sleep(200), + Res1 = Receive(), + ?assertMatch(nothing, Res1), + + [Sub ! stop || Sub <- Subs], + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- From b4eb0f18f9b493fe953bdc03bd4c1dfe170da6d4 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 10 Oct 2022 18:06:55 +0800 Subject: [PATCH 2/3] fix(shared_subs): rename redispatch function name and simply test case --- apps/emqx/src/emqx_session.erl | 11 +++++++---- apps/emqx/test/emqx_shared_sub_SUITE.erl | 11 ++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 6567bf5c8..4d1594a99 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -47,6 +47,7 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -ifdef(TEST). -compile(export_all). @@ -802,7 +803,7 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. terminate(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session), - redispatch_shared_messages(Reason, Session), + maybe_redispatch_shared_messages(Reason, Session), ok. run_terminate_hooks(ClientInfo, discarded, Session) -> @@ -812,11 +813,13 @@ run_terminate_hooks(ClientInfo, takenover, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -redispatch_shared_messages(takenover, _Session) -> +maybe_redispatch_shared_messages(takenover, _Session) -> + ?tp(debug, ignore_redispatch_shared_messages, #{reason => takenover}), ok; -redispatch_shared_messages(kicked, _Session) -> +maybe_redispatch_shared_messages(kicked, _Session) -> + ?tp(debug, ignore_redispatch_shared_messages, #{reason => kicked}), ok; -redispatch_shared_messages(_Reason, #session{inflight = Inflight, mqueue = Q}) -> +maybe_redispatch_shared_messages(_Reason, #session{inflight = Inflight, mqueue = Q}) -> AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight), F = fun ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 2cdf2bc05..f048df397 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SUITE, ?MODULE). @@ -869,23 +870,23 @@ t_redispatch_when_kicked(_) -> Subs = [erlang:spawn(fun() -> MkSub(ClientId) end) || ClientId <- [ClientId1, ClientId2]], timer:sleep(500), + ok = snabbkaffe:start_trace(), + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), emqx:publish(Message), [Sub ! wait || Sub <- Subs], - timer:sleep(200), Res = Receive(), ?assertMatch({got, _}, Res), {got, ClientId} = Res, emqx_cm:kick_session(ClientId), - [Sub ! wait || Sub <- Subs], - timer:sleep(200), - Res1 = Receive(), - ?assertMatch(nothing, Res1), + Trace = snabbkaffe:collect_trace(500), + ?assertMatch([#{reason := kicked}], ?of_kind(ignore_redispatch_shared_messages, Trace)), [Sub ! stop || Sub <- Subs], + snabbkaffe:stop(), ok. %%-------------------------------------------------------------------- From 7b3c67fbe9faab5c962809f989625be5edab7adf Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 18 Oct 2022 13:47:57 +0800 Subject: [PATCH 3/3] test(shared_sub): ensure snabbkaffe will be stopped --- apps/emqx/test/emqx_shared_sub_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index f048df397..45d9b4fad 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -821,6 +821,7 @@ t_dispatch_qos0(Config) when is_list(Config) -> t_redispatch_when_kicked({init, Config}) when is_list(Config) -> Config; t_redispatch_when_kicked({'end', Config}) when is_list(Config) -> + snabbkaffe:stop(), ok; t_redispatch_when_kicked(_) -> ok = ensure_config(sticky, true),