From 6dd0b49dd2ec81469271701ac95f9982fca1b450 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 16 Feb 2022 14:51:01 +0800 Subject: [PATCH 1/7] feat(cm): force shutdown of processe that cannot answer takeover event Related PR: #6030 --- src/emqx_cm.erl | 84 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 23f078568..4811313d8 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -226,18 +226,24 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> - case takeover_session(ClientId) of - {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session, - present => true, - pendings => Pendings}}; - {error, not_found} -> + CreateSess = + fun() -> Session = create_session(ClientInfo, ConnInfo), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} + end, + case takeover_session(ClientId) of + {ok, ConnMod, ChanPid, Session} -> + ok = emqx_session:resume(ClientInfo, Session), + case call_or_kill({takeover, 'end'}, ConnMod, ChanPid) of + {error, _} -> CreateSess(); + Pendings -> + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session, + present => true, + pendings => Pendings}} + end; + {error, _Reason} -> CreateSess() end end, emqx_cm_locker:trans(ClientId, ResumeStart). @@ -271,9 +277,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - %% TODO: if takeover times out, maybe kill the old? - Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), - {ok, ConnMod, ChanPid, Session} + case call_or_kill({takeover, 'begin'}, ConnMod, ChanPid) of + {error, Reason} -> + {error, Reason}; + Session -> + {ok, ConnMod, ChanPid, Session} + end end; takeover_session(ClientId, ChanPid) -> rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). @@ -286,44 +295,63 @@ discard_session(ClientId) when is_binary(ClientId) -> ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -%% @private Kick a local stale session to force it step down. -%% If failed to kick (e.g. timeout) force a kill. +%% @private call a local stale session to execute an Action. +%% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec kick_or_kill(kick | discard, module(), pid()) -> ok. -kick_or_kill(Action, ConnMod, Pid) -> +-spec call_or_kill(Action, module(), pid()) + -> ok %% returned by kick, discard + | emqx_session:session() %% returned by {takeover, 'begin'} + | list(emqx_type:deliver()) %% returned by {takeover, 'end'} + when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. +call_or_kill(Action, ConnMod, Pid) -> try + Timeout = case Action of + {takeover, _} -> ?T_TAKEOVER; + _ -> ?T_KICK + end, %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) + apply(ConnMod, call, [Pid, Action, Timeout]) catch _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, noproc); _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, noproc); + _ : Reason = {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, Reason); + _ : Reason = {{shutdown, _}, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, Reason); _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "session_kick_timeout", + ?tp(warning, "call_session_timeout", #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid) }), - ok = force_kill(Pid); + ok = force_kill(Pid), + return_error_if_action_is_takeover(Action, timeout); _ : Error : St -> - ?tp(error, "session_kick_exception", + ?tp(error, "call_session_exception", #{pid => Pid, action => Action, reason => Error, stacktrace => St, stale_channel => stale_channel_info(Pid) }), - ok = force_kill(Pid) + ok = force_kill(Pid), + return_error_if_action_is_takeover(Action, Error) end. +return_error_if_action_is_takeover({takeover, _}, Reason) -> + {error, Reason}; +return_error_if_action_is_takeover(_, _) -> + ok. + force_kill(Pid) -> exit(Pid, kill), ok. @@ -344,7 +372,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = kick_or_kill(Action, ConnMod, ChanPid) + ok = call_or_kill(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded From 66807f17dfb1258bead39807385fe1732de97438 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 16 Feb 2022 15:43:31 +0800 Subject: [PATCH 2/7] test(cm): cover {takeover, 'begin'/'end'} action --- test/emqx_cm_SUITE.erl | 115 ++++++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 41 deletions(-) diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index a46c046ca..c82e525f0 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -187,57 +187,89 @@ t_open_session_race_condition(_) -> ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). -t_kick_session_discard_normal(_) -> - test_kick_session(discard, normal). +t_call_session_discard_normal(_) -> + test_call_session(discard, normal). -t_kick_session_discard_shutdown(_) -> - test_kick_session(discard, shutdown). +t_call_session_discard_shutdown(_) -> + test_call_session(discard, shutdown). -t_kick_session_discard_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). +t_call_session_discard_shutdown_with_reason(_) -> + test_call_session(discard, {shutdown, discard}). -t_kick_session_discard_timeout(_) -> - test_kick_session(discard, timeout). +t_call_session_discard_timeout(_) -> + test_call_session(discard, timeout). -t_kick_session_discard_noproc(_) -> - test_kick_session(discard, noproc). +t_call_session_discard_noproc(_) -> + test_call_session(discard, noproc). -t_kick_session_kick_normal(_) -> - test_kick_session(discard, normal). +t_call_session_kick_normal(_) -> + test_call_session(kick, normal). -t_kick_session_kick_shutdown(_) -> - test_kick_session(discard, shutdown). +t_call_session_kick_shutdown(_) -> + test_call_session(kick, shutdown). -t_kick_session_kick_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). +t_call_session_kick_shutdown_with_reason(_) -> + test_call_session(kick, {shutdown, discard}). -t_kick_session_kick_timeout(_) -> - test_kick_session(discard, timeout). +t_call_session_kick_timeout(_) -> + test_call_session(kick, timeout). -t_kick_session_kick_noproc(_) -> - test_kick_session(discard, noproc). +t_call_session_kick_noproc(_) -> + test_call_session(discard, noproc). -test_kick_session(Action, Reason) -> +t_call_session_takeover_begin_normal(_) -> + test_call_session({takeover, 'begin'}, normal). + +t_call_session_takeover_begin_shutdown(_) -> + test_call_session({takeover, 'begin'}, shutdown). + +t_call_session_takeover_begin_shutdown_with_reason(_) -> + test_call_session({takeover, 'begin'}, {shutdown, discard}). + +t_call_session_takeover_begin_timeout(_) -> + test_call_session({takeover, 'begin'}, timeout). + +t_call_session_takeover_begin_noproc(_) -> + test_call_session({takeover, 'begin'}, noproc). + +t_call_session_takeover_end_normal(_) -> + test_call_session({takeover, 'end'}, normal). + +t_call_session_takeover_end_shutdown(_) -> + test_call_session({takeover, 'end'}, shutdown). + +t_call_session_takeover_end_shutdown_with_reason(_) -> + test_call_session({takeover, 'end'}, {shutdown, discard}). + +t_call_session_takeover_end_timeout(_) -> + test_call_session({takeover, 'end'}, timeout). + +t_call_session_takeover_end_noproc(_) -> + test_call_session({takeover, 'end'}, noproc). + +test_call_session(Action, Reason) -> ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, FakeSessionFun = fun Loop() -> - receive - {'$gen_call', From, A} when A =:= kick orelse - A =:= discard -> - case Reason of - normal -> - gen_server:reply(From, ok); - timeout -> - %% no response to the call - Loop(); - _ -> - exit(Reason) - end; - Msg -> - ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), - Loop() - end + receive + {'$gen_call', From, A} when A =:= kick orelse + A =:= discard orelse + A =:= {takeover, 'begin'} orelse + A =:= {takeover, 'end'} -> + case Reason of + normal when A =:= kick orelse A =:= discard -> + gen_server:reply(From, ok); + timeout -> + %% no response to the call + Loop(); + _ -> + exit(Reason) + end; + Msg -> + ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), + Loop() + end end, {Pid1, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun), @@ -249,10 +281,11 @@ test_kick_session(Action, Reason) -> noproc -> exit(Pid1, kill), exit(Pid2, kill); _ -> ok end, - ok = case Action of - kick -> emqx_cm:kick_session(ClientId); - discard -> emqx_cm:discard_session(ClientId) - end, + _ = case Action of + kick -> emqx_cm:kick_session(ClientId); + discard -> emqx_cm:discard_session(ClientId); + {takeover, _} -> emqx_cm:takeover_session(ClientId) + end, case Reason =:= timeout orelse Reason =:= noproc of true -> ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), From bfd0fd901932ee23d7aba0d7b4a986f208bafaf9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 22 Feb 2022 10:16:29 +0800 Subject: [PATCH 3/7] refactor(cm): rename call_or_kill to takeover --- src/emqx_cm.erl | 117 +++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 57 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 4811313d8..002b1a746 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -235,13 +235,14 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - case call_or_kill({takeover, 'end'}, ConnMod, ChanPid) of - {error, _} -> CreateSess(); - Pendings -> + case takeover('end', ConnMod, ChanPid) of + {ok, Pendings} -> register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, - pendings => Pendings}} + pendings => Pendings}}; + {error, _} -> + CreateSess() end; {error, _Reason} -> CreateSess() end @@ -277,11 +278,11 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - case call_or_kill({takeover, 'begin'}, ConnMod, ChanPid) of + case takeover('begin', ConnMod, ChanPid) of + {ok, Session} -> + {ok, ConnMod, ChanPid, Session}; {error, Reason} -> - {error, Reason}; - Session -> - {ok, ConnMod, ChanPid, Session} + {error, Reason} end end; takeover_session(ClientId, ChanPid) -> @@ -299,59 +300,61 @@ discard_session(ClientId) when is_binary(ClientId) -> %% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec call_or_kill(Action, module(), pid()) - -> ok %% returned by kick, discard - | emqx_session:session() %% returned by {takeover, 'begin'} - | list(emqx_type:deliver()) %% returned by {takeover, 'end'} - when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. -call_or_kill(Action, ConnMod, Pid) -> - try - Timeout = case Action of - {takeover, _} -> ?T_TAKEOVER; - _ -> ?T_KICK - end, +-spec takeover(Action, module(), pid()) + -> ok + | {ok, emqx_session:session() | list(emqx_type:deliver())} + | {error, term()} + when Action :: kick | discard | 'begin' | 'end'. +takeover(Action, ConnMod, Pid) -> + {NAction, Timeout} = + case Action == kick orelse Action == discard of + true -> {Action, ?T_KICK}; + _ -> {{takeover, Action},?T_TAKEOVER} + end, + Return = %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - apply(ConnMod, call, [Pid, Action, Timeout]) - catch - _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, noproc); - _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, noproc); - _ : Reason = {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, Reason); - _ : Reason = {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, Reason); - _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "call_session_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid), - return_error_if_action_is_takeover(Action, timeout); - _ : Error : St -> - ?tp(error, "call_session_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid), - return_error_if_action_is_takeover(Action, Error) + try apply(ConnMod, call, [Pid, NAction, Timeout]) of + ok -> ok; + Reply -> {ok, Reply} + catch + _ : noproc -> % emqx_ws_connection: call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : Reason = {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : Reason = {{shutdown, _}, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "takeover_session_timeout", + #{pid => Pid, + action => Action, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, timeout}; + _ : Error : St -> + ?tp(error, "takeover_session_exception", + #{pid => Pid, + action => Action, + reason => Error, + stacktrace => St, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, Error} + end, + case Action == kick orelse Action == discard of + true -> ok; + _ -> Return end. -return_error_if_action_is_takeover({takeover, _}, Reason) -> - {error, Reason}; -return_error_if_action_is_takeover(_, _) -> - ok. - force_kill(Pid) -> exit(Pid, kill), ok. @@ -372,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = call_or_kill(Action, ConnMod, ChanPid) + ok = takeover(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded From 4673ca43a0f7838a09fdfb62297aa141e47e4281 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 16 Feb 2022 15:49:59 +0800 Subject: [PATCH 4/7] chore(emqx): update appup.src --- src/emqx.appup.src | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 610b40c75..6e9c43d01 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -6,6 +6,7 @@ {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -17,6 +18,7 @@ {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, @@ -34,6 +36,7 @@ {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -54,6 +57,7 @@ {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -374,6 +378,8 @@ {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -382,6 +388,7 @@ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -398,6 +405,7 @@ {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -417,6 +425,7 @@ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, From bf565efc99a5506ba68a79a8c07579cc1897edd5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 16 Feb 2022 19:55:01 +0800 Subject: [PATCH 5/7] chore: update CHANGES-4.3.md --- CHANGES-4.3.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 148627628..af96c295a 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -22,6 +22,7 @@ File format: * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, to force an immediate reload of all certificates after the files are updated on disk. * Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] +* Force shutdown of processe that cannot answer takeover event [#7026] ### Bug fixes From 72362063730cd655bd214d2443cf22fcc66cd0e2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 17 Feb 2022 09:35:30 +0800 Subject: [PATCH 6/7] fix(exproto): handle discard call --- apps/emqx_exproto/src/emqx_exproto_channel.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 130ad155a..527f2e9b7 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -366,6 +366,9 @@ handle_call({publish, Topic, Qos, Payload}, handle_call(kick, Channel) -> {shutdown, kicked, ok, Channel}; +handle_call(discard, Channel) -> + {shutdown, discarded, ok, Channel}; + handle_call(Req, Channel) -> ?LOG(warning, "Unexpected call: ~p", [Req]), {reply, {error, unexpected_call}, Channel}. From d943cc2f1cf940aab4cf4e803e5515dee6f1eb1e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 17 Feb 2022 09:57:55 +0800 Subject: [PATCH 7/7] chore(exproto): update appup.src --- apps/emqx_exproto/src/emqx_exproto.appup.src | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 590a2106f..e66e8a58c 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,18 +1,22 @@ %% -*- mode: erlang -*- {VSN, - [{<<"4\\.3\\.[2-5]">>, + [{<<"4\\.3\\.[4-5]">>, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, + {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[2-5]">>, + [{<<"4\\.3\\.[4-5]">>, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, + {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},