From 437837d6871b1da5724facde35c57380db9af87c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 29 Oct 2021 18:26:37 +0800 Subject: [PATCH 1/7] chore(cm): remove needless logs --- src/emqx_channel.erl | 5 ++++- src/emqx_connection.erl | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index e0789382a..da412126e 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -946,7 +946,10 @@ handle_info({sock_closed, Reason}, Channel = end; handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?LOG(error, "Unexpected sock_closed: ~p", [Reason]), + %% Since sock_closed messages can be generated multiple times, + %% we can simply ignore errors of this type in the disconnected state. + %% e.g. when the socket send function returns an error, there is already + %% a tcp_closed delivered to the process mailbox {ok, Channel}; handle_info(clean_acl_cache, Channel) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 69337adf2..ffec9055e 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -685,7 +685,10 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> end; handle_info({sock_error, Reason}, State) -> - Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]), + case Reason =/= closed andalso Reason =/= einval of + true -> ?LOG(warning, "socket_error: ~p", [Reason]); + false -> ok + end, handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State) -> From 649bf2f4cbc0d83639ddf7357b8d7a205bf12c31 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 31 Oct 2021 09:59:01 +0800 Subject: [PATCH 2/7] chore(appup): update appup.src --- src/emqx.appup.src | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index ae5c1e481..b9805abdb 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -120,8 +120,9 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, - {<<"4.2.6">>, [ - {load_module, emqx_channel, brutal_purge, soft_purge, []} + {<<"4.2.8">>, [ + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -244,8 +245,9 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, - {<<"4.2.6">>, [ - {load_module, emqx_channel, brutal_purge, soft_purge, []} + {<<"4.2.8">>, [ + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] From 915c827fdc6a4056a35aade78f07b3fdbf093c4c Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sat, 30 Oct 2021 14:07:15 +0200 Subject: [PATCH 3/7] fix(session): force kill session for 'kick' and 'discard' Prior to this fix, 'kick' and 'discard' calls may timeout (or fail for other reason), failures lead to only a log, then continue to allow the new session to get registered. As a result, in case a client is stuck, there is no way to force it to step down, end up with multiple connections (sessions) for the client ID in dashboard. After this fix, the stale pids are notified to shutdown via a gen_server:call, and forced with a exit(Pid, kill) for any exception happend to the gen_server:call --- src/emqx_cm.erl | 153 ++++++++++++++++++++++++++--------------- test/emqx_cm_SUITE.erl | 123 +++++++++++++++++++++++++-------- 2 files changed, 194 insertions(+), 82 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 36a280714..10a5dbfff 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -70,7 +70,7 @@ ]). %% Internal export --export([stats_fun/0]). +-export([stats_fun/0, clean_down/1]). -type(chan_pid() :: pid()). @@ -93,6 +93,10 @@ %% Server name -define(CM, ?MODULE). +-define(T_KICK, 5_000). +-define(T_GET_INFO, 5_000). +-define(T_TAKEOVER, 15_000). + %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). start_link() -> @@ -161,7 +165,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_info(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Update infos of the channel. -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()). @@ -186,7 +190,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Set channel's stats. -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()). @@ -254,7 +258,7 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), + ?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -266,78 +270,113 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> + %% TODO: if takeover times out, maybe kill the old? Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, ConnMod, ChanPid, Session} end; - takeover_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]). + rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). %% @doc Discard all the sessions identified by the ClientId. -spec(discard_session(emqx_types:clientid()) -> ok). discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; - ChanPids -> - lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -do_discard_session(ClientId, Pid) -> +%% @private Kick a local stale session to force it step down. +%% If failed to kick (e.g. timeout) force a kill. +%% Keeping the stale pid around, or returning error or raise an exception +%% benefits nobody. +-spec kick_or_kill(kick | discard, module(), pid()) -> ok. +kick_or_kill(Action, ConnMod, Pid) -> try - discard_session(ClientId, Pid) + %% this is essentailly a gen_server:call implemented in emqx_connection + %% and emqx_ws_connection. + %% the handle_call is implemented in emqx_channel + ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) catch _ : noproc -> % emqx_ws_connection: call - ?LOG(debug, "session_already_gone: ~p", [Pid]), - ok; + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); _ : {noproc, _} -> % emqx_connection: gen_server:call - ?LOG(debug, "session_already_gone: ~p", [Pid]), - ok; - _ : {'EXIT', {noproc, _}} -> % rpc_call/3 - ?LOG(debug, "session_already_gone: ~p", [Pid]), - ok; + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + _ : {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); _ : {{shutdown, _}, _} -> - ?LOG(debug, "session_already_shutdown: ~p", [Pid]), - ok; + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "session_kick_timeout", + #{pid => Pid, + action => Action, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid); _ : Error : St -> - ?LOG(debug, "failed_to_discard_session: ~p, " - "error: ~p, stacktrace: ~0p", [Pid, Error, St]) + ?tp(error, "session_kick_exception", + #{pid => Pid, + action => Action, + reason => Error, + stacktrace => St, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid) end. -discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chann_conn_mod(ClientId, ChanPid) of - undefined -> ok; - ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard, ?T_TAKEOVER) - end; +force_kill(Pid) -> + exit(Pid, kill), + ok. + +stale_channel_info(Pid) -> + process_info(Pid, [status, message_queue_len, current_stacktrace]). discard_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]). + kick_session(discard, ClientId, ChanPid). + +kick_session(ClientId, ChanPid) -> + kick_session(kick, ClientId, ChanPid). + +%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). +kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> + case get_chann_conn_mod(ClientId, ChanPid) of + undefined -> + %% already deregistered + ok; + ConnMod when is_atom(ConnMod) -> + ok = kick_or_kill(Action, ConnMod, ChanPid) + end; +kick_session(Action, ClientId, ChanPid) -> + %% call remote node on the old APIs because we do not know if they have upgraded + %% to have kick_session/3 + Function = case Action of + discard -> discard_session; + kick -> kick_session + end, + try + rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK) + catch + Error : Reason -> + %% This should mostly be RPC failures. + %% However, if the node is still running the old version + %% code (prior to emqx app 4.3.10) some of the RPC handler + %% exceptions may get propagated to a new version node + ?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p", + [node(ChanPid), Action, Error, Reason]) + end. kick_session(ClientId) -> case lookup_channels(ClientId) of - [] -> {error, not_found}; - [ChanPid] -> - kick_session(ClientId, ChanPid); + [] -> + ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]), + ok; ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), - lists:foreach(fun(StalePid) -> - catch discard_session(ClientId, StalePid) - end, StalePids), - kick_session(ClientId, ChanPid) + case length(ChanPids) > 1 of + true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]); + false -> ok + end, + lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) end. -kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(ClientId, ChanPid) of - #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick, ?T_TAKEOVER); - undefined -> - {error, not_found} - end; - -kick_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]). - %% @doc Is clean start? % is_clean_start(#{clean_start := false}) -> false; % is_clean_start(_Attrs) -> true. @@ -373,10 +412,16 @@ lookup_channels(local, ClientId) -> [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of - {badrpc, Reason} -> error(Reason); - Res -> Res +rpc_call(Node, Fun, Args, Timeout) -> + case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of + {badrpc, Reason} -> + %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler + %% should catch all exceptions and always return 'ok'. + %% This leaves 'badrpc' only possible when there is problem + %% calling the remote node. + error({badrpc, Reason}); + Res -> + Res end. %% @private @@ -409,7 +454,7 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), + ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> @@ -445,5 +490,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chann_conn_mod(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index b32cdaf23..62c745828 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -31,6 +31,12 @@ conn_mod => emqx_connection, receive_maximum => 100}}). +-define(WAIT(PATTERN, TIMEOUT, RET), + fun() -> + receive PATTERN -> RET + after TIMEOUT -> error({timeout, ?LINE}) end + end()). + %%-------------------------------------------------------------------- %% CT callbacks %%-------------------------------------------------------------------- @@ -151,19 +157,95 @@ t_open_session_race_condition(_) -> exit(Pid, kill), timer:sleep(100), ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)). -t_discard_session(_) -> - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = meck:unload(emqx_connection). +t_kick_session_discard_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_discard_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_discard_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_discard_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_discard_noproc(_) -> + test_kick_session(discard, noproc). + +t_kick_session_kick_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_kick_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_kick_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_kick_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_kick_noproc(_) -> + test_kick_session(discard, noproc). + +test_kick_session(Action, Reason) -> + ClientId = rand_client_id(), + #{conninfo := ConnInfo} = ?ChanInfo, + FakeSessionFun = + fun Loop() -> + receive + {'$gen_call', From, A} when A =:= kick orelse + A =:= discard -> + case Reason of + normal -> + gen_server:reply(From, ok); + timeout -> + %% no response to the call + Loop(); + _ -> + exit(Reason) + end; + Msg -> + ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), + Loop() + end + end, + {Pid1, _} = spawn_monitor(FakeSessionFun), + {Pid2, _} = spawn_monitor(FakeSessionFun), + ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo), + ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), + case Reason of + noproc -> exit(Pid1, kill), exit(Pid2, kill); + _ -> ok + end, + ok = case Action of + kick -> emqx_cm:kick_session(ClientId); + discard -> emqx_cm:discard_session(ClientId) + end, + case Reason =:= timeout orelse Reason =:= noproc of + true -> + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)); + false -> + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) + end, + ok = flush_emqx_pool(), + ?assertEqual([], emqx_cm:lookup_channels(ClientId)). + +%% Channel deregistration is delegated to emqx_pool as a sync tasks. +%% The emqx_pool is pool of workers, and there is no way to know +%% which worker was picked for the last deregistration task. +%% This help function creates a large enough number of async tasks +%% to sync with the pool workers. +%% The number of tasks should be large enough to ensure all workers have +%% the chance to work on at least one of the tasks. +flush_emqx_pool() -> + Self = self(), + L = lists:seq(1, 1000), + lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), + lists:foreach(fun(I) -> receive {done, I} -> ok end end, L). t_takeover_session(_) -> {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), @@ -178,21 +260,6 @@ t_takeover_session(_) -> {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). -t_kick_session(_) -> - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end), - {error, not_found} = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - test = emqx_cm:kick_session(<<"clientid">>), - erlang:spawn(fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - timer:sleep(1000) - end), - ct:sleep(100), - test = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = meck:unload(emqx_connection). - t_all_channels(_) -> ?assertEqual(true, is_list(emqx_cm:all_channels())). From 3d7f4335a072877d3302f7898b57651e7c66f323 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 9 Nov 2021 18:27:17 +0800 Subject: [PATCH 4/7] fix(emqx_cm): replace ?tp with ?LOG --- src/emqx_channel.erl | 2 +- src/emqx_cm.erl | 43 +++++++++++++++++++++--------------------- test/emqx_cm_SUITE.erl | 17 ++++++++++------- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index da412126e..1b20753aa 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -945,7 +945,7 @@ handle_info({sock_closed, Reason}, Channel = Shutdown -> Shutdown end; -handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> +handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> %% Since sock_closed messages can be generated multiple times, %% we can simply ignore errors of this type in the disconnected state. %% e.g. when the socket send function returns an error, there is already diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 10a5dbfff..2e917ca79 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -72,6 +72,9 @@ %% Internal export -export([stats_fun/0, clean_down/1]). +%% Test export +-export([register_channel_/3]). + -type(chan_pid() :: pid()). %% Tables for channel management. @@ -88,14 +91,12 @@ %% Batch drain -define(BATCH_SIZE, 100000). --define(T_TAKEOVER, 15000). - %% Server name -define(CM, ?MODULE). --define(T_KICK, 5_000). --define(T_GET_INFO, 5_000). --define(T_TAKEOVER, 15_000). +-define(T_KICK, 5000). +-define(T_GET_INFO, 5000). +-define(T_TAKEOVER, 15000). %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). @@ -298,28 +299,26 @@ kick_or_kill(Action, ConnMod, Pid) -> ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) catch _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), + ok; _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), + ok; _ : {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), + ok; _ : {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), + ok; _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "session_kick_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), + ?LOG(warning, "session_kick_timeout: ~p, action: ~p, " + "stale_channel: ~p", + [Pid, Action, stale_channel_info(Pid)]), ok = force_kill(Pid); - _ : Error : St -> - ?tp(error, "session_kick_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), + _ : Error -> + ?LOG(error, "session_kick_exception: ~p, action: ~p, " + "reason: ~p, stacktrace: ~p, stale_channel: ~p", + [Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]), ok = force_kill(Pid) end. diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 62c745828..ec660429a 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -211,9 +211,9 @@ test_kick_session(Action, Reason) -> end, {Pid1, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun), - ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), - ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), - ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid2, ConnInfo), ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), case Reason of noproc -> exit(Pid1, kill), exit(Pid2, kill); @@ -225,15 +225,18 @@ test_kick_session(Action, Reason) -> end, case Reason =:= timeout orelse Reason =:= noproc of true -> - ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), - ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)); + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)), + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R)); false -> - ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), - ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)), + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R)) end, ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). +rand_client_id() -> + list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())). + %% Channel deregistration is delegated to emqx_pool as a sync tasks. %% The emqx_pool is pool of workers, and there is no way to know %% which worker was picked for the last deregistration task. From edb2c5f3c1585df6d0971df071fc4833bd1b0e1f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 10:38:37 +0800 Subject: [PATCH 5/7] chore(appup): update appup.src --- src/emqx.appup.src | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b9805abdb..2e9c125c8 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -122,7 +122,8 @@ ]}, {<<"4.2.8">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_connection, brutal_purge, soft_purge, []} + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -247,7 +248,8 @@ ]}, {<<"4.2.8">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_connection, brutal_purge, soft_purge, []} + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] From a11208b307e54908a0853a0d9ea81b11367890bd Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 12 Nov 2021 11:31:35 +0800 Subject: [PATCH 6/7] fix(frame): variable byte num not limited in 4 bytes --- src/emqx.appup.src | 8 +++++++- src/emqx_frame.erl | 7 ++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 2e9c125c8..2c0d887f8 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,4 +1,4 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- {VSN, [ @@ -92,6 +92,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, @@ -110,6 +111,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, @@ -121,6 +123,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.8">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []} @@ -218,6 +221,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, @@ -236,6 +240,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, @@ -247,6 +252,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.8">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []} diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 94fbda4b7..7caa92ea8 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -67,6 +67,8 @@ version => ?MQTT_PROTO_V4 }). +-define(MULTIPLIER_MAX, 16#200000). + -dialyzer({no_match, [serialize_utf8_string/2]}). %%-------------------------------------------------------------------- @@ -142,7 +144,7 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 2, Options); parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options) - when Multiplier > 2097152 -> + when Multiplier > ?MULTIPLIER_MAX -> error(malformed_variable_byte_integer); parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); @@ -411,6 +413,9 @@ parse_property(<<16#2A, Val, Bin/binary>>, Props) -> parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin, 1, 0). +parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) + when Multiplier > ?MULTIPLIER_MAX -> + error(malformed_variable_byte_integer); parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> From cf9d82073c93ea91ffbb9dc3f621a1abce7216d6 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 12 Nov 2021 11:11:35 +0800 Subject: [PATCH 7/7] fix(ekka): update ekka to 0.7.10 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 2370a9b87..fe785724a 100644 --- a/rebar.config +++ b/rebar.config @@ -7,7 +7,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.9"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.10"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}.