Merge pull request #6404 from emqx/bugfix-live-chan-count

fix(live_conn): fix live connection count on race condition

When multiple clients try to connect concurrently using the same
client ID, they all call `emqx_channel:ensure_connected`, increasing
the live connection count, but only one will successfully acquire the
lock for that client ID.  This means that all other clients that
increased the live connection count will not get to call neither
`emqx_channel:ensure_disconnected` nor be monitored for `DOWN`
messages, effectively causing a count leak.

By moving the increment to `emqx_cm:register_channel`, which is only
called inside the lock, we can remove this leakage.

Also, during the handling of `DOWN` messages, we now iterate over all
channel PIDs returned by `eqmx_misc:drain_down`, since it could be
that one or more PIDs are not contained in the `pmon` state.
This commit is contained in:
Thales Macedo Garitezi 2021-12-08 15:10:24 -03:00 committed by GitHub
commit d2b2a4ea99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 7 deletions

View File

@ -1548,8 +1548,6 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
ChanPid = self(),
emqx_cm:mark_channel_connected(ChanPid),
Channel#channel{conninfo = NConnInfo,
conn_state = connected
}.

View File

@ -137,6 +137,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid)
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
ok = emqx_cm_registry:register_channel(Chan),
mark_channel_connected(ChanPid),
cast({registered, Chan}).
%% @doc Unregister a channel.
@ -465,11 +466,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}),
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(
fun({ChanPid, _ClientID}) ->
mark_channel_disconnected(ChanPid)
end,
Items),
lists:foreach(fun mark_channel_disconnected/1, ChanPids),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) ->

View File

@ -37,6 +37,7 @@ groups() ->
TCs = emqx_ct:all(?MODULE),
ConnClientTCs = [ t_connected_client_count_persistent
, t_connected_client_count_anonymous
, t_connected_client_count_transient_takeover
, t_connected_client_stats
],
OtherTCs = TCs -- ConnClientTCs,
@ -451,6 +452,80 @@ t_connected_client_count_anonymous({'end', _Config}) ->
snabbkaffe:stop(),
ok.
t_connected_client_count_transient_takeover({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
ClientID = <<"clientid">>,
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% we spawn several clients simultaneously to cause the race
%% condition for the client id lock
NumClients = 20,
{ok, {ok, [_, _]}} =
wait_for_events(
fun() ->
lists:foreach(
fun(_) ->
spawn(
fun() ->
{ok, ConnPid} =
emqtt:start_link([ {clean_start, true}
, {clientid, ClientID}
| Config]),
%% don't assert the result: most of them fail
%% during the race
emqtt:ConnFun(ConnPid),
ok
end),
ok
end,
lists:seq(1, NumClients))
end,
%% there can be only one channel that wins the race for the
%% lock for this client id. we also expect a decrement
%% event because the client dies along with the ephemeral
%% process.
[ emqx_cm_connected_client_count_inc
, emqx_cm_connected_client_count_dec
],
1000),
%% Since more than one pair of inc/dec may be emitted, we need to
%% wait for full stabilization
timer:sleep(100),
%% It must be 0 again because we spawn-linked the clients in
%% ephemeral processes above, and all should be dead now.
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% connecting again
{ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
, {clientid, ClientID}
| Config
]),
{{ok, _}, {ok, [_]}} =
wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
[ChanPid] = emqx_cm:all_channels(),
{ok, {ok, [_, _]}} =
wait_for_events(
fun() ->
exit(ChanPid, kill),
ok
end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_transient_takeover({'end', _Config}) ->
snabbkaffe:stop(),
ok.
t_connected_client_stats({init, Config}) ->
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),