diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index fe65052cf..3604d3505 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -1,6 +1,6 @@ {application, emqx_management, [{description, "EMQ X Management API and CLI"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel,stdlib,minirest]}, diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 5048e4f0f..9ad23381e 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,12 +1,12 @@ %% -*-: erlang -*- -{"4.3.1", - [ {"4.3.0", +{"4.3.2", + [ {<<"4.3.[0-1]">>, [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} ]} ], [ - {"4.3.0", + {<<"4.3.[0-1]">>, [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} ]} ] -}. \ No newline at end of file +}. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index ce5ddb08e..ef6764241 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ [ {"4.3.1", [ {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []} ]}, @@ -20,6 +21,7 @@ [ {"4.3.1", [ {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []} ]}, diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 7f324e322..cf505a873 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -22,6 +22,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[CM]"). @@ -279,18 +280,25 @@ takeover_session(ClientId, ChanPid) -> discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; - ChanPids -> - lists:foreach( - fun(ChanPid) -> - try - discard_session(ClientId, ChanPid) - catch - _:{noproc,_}:_Stk -> ok; - _:{{shutdown,_},_}:_Stk -> ok; - _:Error:_Stk -> - ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error]) - end - end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + end. + +do_discard_session(ClientId, Pid) -> + try + discard_session(ClientId, Pid) + catch + _ : noproc -> % emqx_ws_connection: call + ?tp(debug, "session_already_gone", #{pid => Pid}), + ok; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ?tp(debug, "session_already_gone", #{pid => Pid}), + ok; + _ : {{shutdown, _}, _} -> + ?tp(debug, "session_already_shutdown", #{pid => Pid}), + ok; + _ : Error : St -> + ?tp(error, "failed_to_discard_session", + #{pid => Pid, reason => Error, stacktrace=>St}) end. discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index b8e0f9066..0cfba4737 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(CM, emqx_cm). -define(ChanInfo,#{conninfo => @@ -179,6 +180,18 @@ t_discard_session(_) -> ok = emqx_cm:unregister_channel(<<"clientid">>), ok = meck:unload(emqx_connection). +t_discard_session_race(_) -> + ok = snabbkaffe:start_trace(), + #{conninfo := ConnInfo0} = ?ChanInfo, + ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection}, + {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end), + ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo), + Pid ! stop, + receive {'DOWN', Ref, process, Pid, normal} -> ok end, + ok = emqx_cm:discard_session(<<"clientid">>), + {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000), + snabbkaffe:stop(). + t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, {error, not_found} = emqx_cm:takeover_session(<<"clientid">>),