From d435f1211ee08e8b2083e7ab3699ff3384917e04 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 8 Dec 2021 10:33:57 -0300 Subject: [PATCH] fix(live_conn): fix live connection count on race condition When multiple clients try to connect concurrently using the same client ID, they all call `emqx_channel:ensure_connected`, increasing the live connection count, but only one will successfully acquire the lock for that client ID. This means that all other clients that increased the live connection count will not get to call neither `emqx_channel:ensure_disconnected` nor be monitored for `DOWN` messages, effectively causing a count leak. By moving the increment to `emqx_cm:register_channel`, which is only called inside the lock, we can remove this leakage. Also, during the handling of `DOWN` messages, we now iterate over all channel PIDs returned by `eqmx_misc:drain_down`, since it could be that one or more PIDs are not contained in the `pmon` state. --- src/emqx_channel.erl | 2 - src/emqx_cm.erl | 7 +--- test/emqx_broker_SUITE.erl | 75 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index bad053845..5524a391d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1548,8 +1548,6 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), - ChanPid = self(), - emqx_cm:mark_channel_connected(ChanPid), Channel#channel{conninfo = NConnInfo, conn_state = connected }. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index c54089e41..1c6d4080a 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -137,6 +137,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) true = ets:insert(?CHAN_TAB, Chan), true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), ok = emqx_cm_registry:register_channel(Chan), + mark_channel_connected(ChanPid), cast({registered, Chan}). %% @doc Unregister a channel. @@ -465,11 +466,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} ?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}), ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - lists:foreach( - fun({ChanPid, _ClientID}) -> - mark_channel_disconnected(ChanPid) - end, - Items), + lists:foreach(fun mark_channel_disconnected/1, ChanPids), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 57f0d6acf..d44bb3412 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -37,6 +37,7 @@ groups() -> TCs = emqx_ct:all(?MODULE), ConnClientTCs = [ t_connected_client_count_persistent , t_connected_client_count_anonymous + , t_connected_client_count_transient_takeover , t_connected_client_stats ], OtherTCs = TCs -- ConnClientTCs, @@ -451,6 +452,80 @@ t_connected_client_count_anonymous({'end', _Config}) -> snabbkaffe:stop(), ok. +t_connected_client_count_transient_takeover({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_transient_takeover(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), + ClientID = <<"clientid">>, + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% we spawn several clients simultaneously to cause the race + %% condition for the client id lock + NumClients = 20, + {ok, {ok, [_, _]}} = + wait_for_events( + fun() -> + lists:foreach( + fun(_) -> + spawn( + fun() -> + {ok, ConnPid} = + emqtt:start_link([ {clean_start, true} + , {clientid, ClientID} + | Config]), + %% don't assert the result: most of them fail + %% during the race + emqtt:ConnFun(ConnPid), + ok + end), + ok + end, + lists:seq(1, NumClients)) + end, + %% there can be only one channel that wins the race for the + %% lock for this client id. we also expect a decrement + %% event because the client dies along with the ephemeral + %% process. + [ emqx_cm_connected_client_count_inc + , emqx_cm_connected_client_count_dec + ], + 1000), + %% Since more than one pair of inc/dec may be emitted, we need to + %% wait for full stabilization + timer:sleep(100), + %% It must be 0 again because we spawn-linked the clients in + %% ephemeral processes above, and all should be dead now. + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% connecting again + {ok, ConnPid1} = emqtt:start_link([ {clean_start, true} + , {clientid, ClientID} + | Config + ]), + {{ok, _}, {ok, [_]}} = + wait_for_events( + fun() -> emqtt:ConnFun(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + [ChanPid] = emqx_cm:all_channels(), + {ok, {ok, [_, _]}} = + wait_for_events( + fun() -> + exit(ChanPid, kill), + ok + end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_transient_takeover({'end', _Config}) -> + snabbkaffe:stop(), + ok. + t_connected_client_stats({init, Config}) -> ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),