diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index bad053845..5524a391d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1548,8 +1548,6 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), - ChanPid = self(), - emqx_cm:mark_channel_connected(ChanPid), Channel#channel{conninfo = NConnInfo, conn_state = connected }. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index c54089e41..1c6d4080a 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -137,6 +137,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) true = ets:insert(?CHAN_TAB, Chan), true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), ok = emqx_cm_registry:register_channel(Chan), + mark_channel_connected(ChanPid), cast({registered, Chan}). %% @doc Unregister a channel. @@ -465,11 +466,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} ?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}), ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - lists:foreach( - fun({ChanPid, _ClientID}) -> - mark_channel_disconnected(ChanPid) - end, - Items), + lists:foreach(fun mark_channel_disconnected/1, ChanPids), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 57f0d6acf..d44bb3412 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -37,6 +37,7 @@ groups() -> TCs = emqx_ct:all(?MODULE), ConnClientTCs = [ t_connected_client_count_persistent , t_connected_client_count_anonymous + , t_connected_client_count_transient_takeover , t_connected_client_stats ], OtherTCs = TCs -- ConnClientTCs, @@ -451,6 +452,80 @@ t_connected_client_count_anonymous({'end', _Config}) -> snabbkaffe:stop(), ok. +t_connected_client_count_transient_takeover({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_transient_takeover(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), + ClientID = <<"clientid">>, + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% we spawn several clients simultaneously to cause the race + %% condition for the client id lock + NumClients = 20, + {ok, {ok, [_, _]}} = + wait_for_events( + fun() -> + lists:foreach( + fun(_) -> + spawn( + fun() -> + {ok, ConnPid} = + emqtt:start_link([ {clean_start, true} + , {clientid, ClientID} + | Config]), + %% don't assert the result: most of them fail + %% during the race + emqtt:ConnFun(ConnPid), + ok + end), + ok + end, + lists:seq(1, NumClients)) + end, + %% there can be only one channel that wins the race for the + %% lock for this client id. we also expect a decrement + %% event because the client dies along with the ephemeral + %% process. + [ emqx_cm_connected_client_count_inc + , emqx_cm_connected_client_count_dec + ], + 1000), + %% Since more than one pair of inc/dec may be emitted, we need to + %% wait for full stabilization + timer:sleep(100), + %% It must be 0 again because we spawn-linked the clients in + %% ephemeral processes above, and all should be dead now. + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% connecting again + {ok, ConnPid1} = emqtt:start_link([ {clean_start, true} + , {clientid, ClientID} + | Config + ]), + {{ok, _}, {ok, [_]}} = + wait_for_events( + fun() -> emqtt:ConnFun(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + [ChanPid] = emqx_cm:all_channels(), + {ok, {ok, [_, _]}} = + wait_for_events( + fun() -> + exit(ChanPid, kill), + ok + end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_transient_takeover({'end', _Config}) -> + snabbkaffe:stop(), + ok. + t_connected_client_stats({init, Config}) -> ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),