From 2b5fe9179ef4cb4f9cfe0e325a63ff341b4a3367 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 8 Dec 2021 15:15:27 -0300 Subject: [PATCH] fix(live_conn): fix live connection count on race condition (5.0) Port from #6406 to 5.0. When multiple clients try to connect concurrently using the same client ID, they all call `emqx_channel:ensure_connected`, increasing the live connection count, but only one will successfully acquire the lock for that client ID. This means that all other clients that increased the live connection count will not get to call neither `emqx_channel:ensure_disconnected` nor be monitored for `DOWN` messages, effectively causing a count leak. By moving the increment to `emqx_cm:register_channel`, which is only called inside the lock, we can remove this leakage. Also, during the handling of `DOWN` messages, we now iterate over all channel PIDs returned by `eqmx_misc:drain_down`, since it could be that one or more PIDs are not contained in the `pmon` state. --- apps/emqx/src/emqx_channel.erl | 2 - apps/emqx/src/emqx_cm.erl | 7 +-- apps/emqx/test/emqx_broker_SUITE.erl | 75 ++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d2878c926..cf22fa0ae 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1641,8 +1641,6 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), - ChanPid = self(), - emqx_cm:mark_channel_connected(ChanPid), Channel#channel{conninfo = NConnInfo, conn_state = connected }. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index f1209dd6f..fef3e4f19 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -142,6 +142,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) true = ets:insert(?CHAN_TAB, Chan), true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), ok = emqx_cm_registry:register_channel(Chan), + mark_channel_connected(ChanPid), cast({registered, Chan}). %% @doc Unregister a channel. @@ -549,11 +550,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} ?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}), ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - lists:foreach( - fun({ChanPid, _ClientID}) -> - mark_channel_disconnected(ChanPid) - end, - Items), + lists:foreach(fun mark_channel_disconnected/1, ChanPids), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index 440e0fe42..c1d55c256 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -37,6 +37,7 @@ groups() -> TCs = emqx_common_test_helpers:all(?MODULE), ConnClientTCs = [ t_connected_client_count_persistent , t_connected_client_count_anonymous + , t_connected_client_count_transient_takeover , t_connected_client_stats ], OtherTCs = TCs -- ConnClientTCs, @@ -461,6 +462,80 @@ t_connected_client_count_anonymous({'end', _Config}) -> snabbkaffe:stop(), ok. +t_connected_client_count_transient_takeover({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_transient_takeover(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), + ClientID = <<"clientid">>, + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% we spawn several clients simultaneously to cause the race + %% condition for the client id lock + NumClients = 20, + {ok, {ok, [_, _]}} = + wait_for_events( + fun() -> + lists:foreach( + fun(_) -> + spawn( + fun() -> + {ok, ConnPid} = + emqtt:start_link([ {clean_start, true} + , {clientid, ClientID} + | Config]), + %% don't assert the result: most of them fail + %% during the race + emqtt:ConnFun(ConnPid), + ok + end), + ok + end, + lists:seq(1, NumClients)) + end, + %% there can be only one channel that wins the race for the + %% lock for this client id. we also expect a decrement + %% event because the client dies along with the ephemeral + %% process. + [ emqx_cm_connected_client_count_inc + , emqx_cm_connected_client_count_dec + ], + 1000), + %% Since more than one pair of inc/dec may be emitted, we need to + %% wait for full stabilization + timer:sleep(100), + %% It must be 0 again because we spawn-linked the clients in + %% ephemeral processes above, and all should be dead now. + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% connecting again + {ok, ConnPid1} = emqtt:start_link([ {clean_start, true} + , {clientid, ClientID} + | Config + ]), + {{ok, _}, {ok, [_]}} = + wait_for_events( + fun() -> emqtt:ConnFun(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + [ChanPid] = emqx_cm:all_channels(), + {ok, {ok, [_, _]}} = + wait_for_events( + fun() -> + exit(ChanPid, kill), + ok + end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_transient_takeover({'end', _Config}) -> + snabbkaffe:stop(), + ok. + t_connected_client_stats({init, Config}) -> ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),