diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index fa64f3528..92c6bb3d2 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1536,7 +1536,9 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), - emqx_cm:increment_connected_client_count(), + ClientID = info(clientid, Channel), + Chan = {ClientID, self()}, + emqx_cm:increment_connected_client_count(Chan), Channel#channel{conninfo = NConnInfo, conn_state = connected }. @@ -1625,7 +1627,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), - emqx_cm:decrement_connected_client_count(), + ClientID = info(clientid, Channel), + Chan = {ClientID, self()}, + emqx_cm:decrement_connected_client_count(Chan), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 1c5bca93a..c14e3c19e 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -75,8 +75,8 @@ %% Internal export -export([ stats_fun/0 , clean_down/1 - , increment_connected_client_count/0 - , decrement_connected_client_count/0 + , increment_connected_client_count/1 + , decrement_connected_client_count/1 , get_connected_client_count/0 ]). @@ -86,18 +86,15 @@ -define(CHAN_TAB, emqx_channel). -define(CHAN_CONN_TAB, emqx_channel_conn). -define(CHAN_INFO_TAB, emqx_channel_info). +-define(CONN_CLIENT_TAB, connected_client_counter). -define(CHAN_STATS, [{?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, - {?CHAN_CONN_TAB, 'connections.count', 'connections.max'} + {?CHAN_CONN_TAB, 'connections.count', 'connections.max'}, + {?CONN_CLIENT_TAB, 'live_connections.count', 'live_connections.max'} ]). --define(CONN_CLIENT_TAB, connected_client_counter). --define(CONN_CLIENT_TAB_KEY, connected_client_count). --define(CONN_CLIENT_TAB_IDX, 2). --define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}). - %% Batch drain -define(BATCH_SIZE, 100000). @@ -448,8 +445,7 @@ init([]) -> ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), - ok = emqx_tables:new(?CONN_CLIENT_TAB, [set | TabOpts]), - true = ets:insert(?CONN_CLIENT_TAB, {?CONN_CLIENT_TAB_KEY, 0}), + ok = emqx_tables:new(?CONN_CLIENT_TAB, [set, {write_concurrency, true} | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), State = #{chan_pmon => emqx_pmon:new()}, {ok, State}. @@ -466,6 +462,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> + ?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}), ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), lists:foreach( @@ -473,7 +470,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} Chan = {ClientID, ChanPid}, case ets:lookup(?CHAN_INFO_TAB, Chan) of [{Chan, #{conn_state := connected}, _}] -> - decrement_connected_client_count(); + decrement_connected_client_count(Chan); _ -> ok end @@ -499,8 +496,7 @@ clean_down({ChanPid, ClientId}) -> do_unregister_channel({ClientId, ChanPid}). stats_fun() -> - lists:foreach(fun update_stats/1, ?CHAN_STATS), - update_connected_client_stats(). + lists:foreach(fun update_stats/1, ?CHAN_STATS). update_stats({Tab, Stat, MaxStat}) -> case ets:info(Tab, size) of @@ -508,12 +504,6 @@ update_stats({Tab, Stat, MaxStat}) -> Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. -update_connected_client_stats() -> - {CountStat, MaxStat} = ?CONNECTED_CLIENT_STATS, - CCCount = get_connected_client_count(), - emqx_stats:setstat(CountStat, MaxStat, CCCount), - ok. - get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod @@ -523,19 +513,18 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> get_chann_conn_mod(ClientId, ChanPid) -> rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). -increment_connected_client_count() -> +increment_connected_client_count(Chan) -> ?tp(emqx_cm_connected_client_count_inc, #{}), - ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, - {?CONN_CLIENT_TAB_IDX, 1}), + ets:insert_new(?CONN_CLIENT_TAB, {Chan, true}), ok. -decrement_connected_client_count() -> +decrement_connected_client_count(Chan) -> ?tp(emqx_cm_connected_client_count_dec, #{}), - Threshold = 0, - SetValue = 0, - ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, - {?CONN_CLIENT_TAB_IDX, -1, Threshold, SetValue}), + ets:delete(?CONN_CLIENT_TAB, Chan), ok. get_connected_client_count() -> - ets:lookup_element(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, ?CONN_CLIENT_TAB_IDX). + case ets:info(?CONN_CLIENT_TAB, size) of + undefined -> 0; + Size -> Size + end. diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 575832bf8..b6e0e4ef4 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -36,6 +36,7 @@ all() -> groups() -> TCs = emqx_ct:all(?MODULE), ConnClientTCs = [ t_connected_client_count_persistent + , t_connected_client_count_anonymous , t_connected_client_stats ], OtherTCs = TCs -- ConnClientTCs, @@ -319,8 +320,8 @@ t_stats_fun({'end', _Config}) -> ok = emqx_broker:unsubscribe(<<"topic">>), ok = emqx_broker:unsubscribe(<<"topic2">>). -%% persistent sessions, when gone, do not contribute to connect client -%% count +%% persistent sessions, when gone, do not contribute to connected +%% client count t_connected_client_count_persistent({init, Config}) -> ok = snabbkaffe:start_trace(), process_flag(trap_exit, true), @@ -366,17 +367,90 @@ t_connected_client_count_persistent(Config) when is_list(Config) -> ), ?assertEqual(1, emqx_cm:get_connected_client_count()), %% abnormal exit of channel process - [ChanPid] = emqx_cm:all_channels(), - {true, {ok, [_]}} = wait_for_events( - fun() -> exit(ChanPid, kill) end, - [emqx_cm_connected_client_count_dec] - ), + ChanPids = emqx_cm:all_channels(), + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> + lists:foreach( + fun(ChanPid) -> exit(ChanPid, kill) end, + ChanPids) + end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), ?assertEqual(0, emqx_cm:get_connected_client_count()), ok; t_connected_client_count_persistent({'end', _Config}) -> snabbkaffe:stop(), ok. +%% connections without client_id also contribute to connected client +%% count +t_connected_client_count_anonymous({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_anonymous(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% first client + {ok, ConnPid0} = emqtt:start_link([ {clean_start, true} + | Config]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid0) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% second client + {ok, ConnPid1} = emqtt:start_link([ {clean_start, true} + | Config]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(2, emqx_cm:get_connected_client_count()), + %% when first client disconnects, shouldn't affect the second + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid0) end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% reconnecting + {ok, ConnPid2} = emqtt:start_link([ {clean_start, true} + | Config + ]), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid2) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(2, emqx_cm:get_connected_client_count()), + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid1) end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + Chans = emqx_cm:all_channels(), + {ok, {ok, [_, _]}} = wait_for_events( + fun() -> + lists:foreach( + fun(ChanPid) -> exit(ChanPid, kill) end, + Chans) + end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_process_down + ] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_anonymous({'end', _Config}) -> + snabbkaffe:stop(), + ok. + t_connected_client_stats({init, Config}) -> ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), @@ -422,16 +496,17 @@ t_connected_client_stats({'end', _Config}) -> {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), ok. -%% alwayseven if +%% the count must be always non negative t_connect_client_never_negative({init, Config}) -> Config; t_connect_client_never_negative(Config) when is_list(Config) -> ?assertEqual(0, emqx_cm:get_connected_client_count()), %% would go to -1 - emqx_cm:decrement_connected_client_count(), + Chan = {<<"clientid">>, list_to_pid("<0.0.1>")}, + emqx_cm:decrement_connected_client_count(Chan), ?assertEqual(0, emqx_cm:get_connected_client_count()), %% would be 0, if really went to -1 - emqx_cm:increment_connected_client_count(), + emqx_cm:increment_connected_client_count(Chan), ?assertEqual(1, emqx_cm:get_connected_client_count()), ok; t_connect_client_never_negative({'end', _Config}) -> diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 5d42ffb99..d08769c52 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -33,8 +33,8 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> %% CM Meck ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), - ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end), - ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end), + ok = meck:expect(emqx_cm, increment_connected_client_count, fun(_) -> ok end), + ok = meck:expect(emqx_cm, decrement_connected_client_count, fun(_) -> ok end), %% Access Control Meck ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), ok = meck:expect(emqx_access_control, authenticate, diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index f4e8e909e..acbb8a759 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -36,8 +36,8 @@ init_per_suite(Config) -> ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), - ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end), - ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end), + ok = meck:expect(emqx_cm, increment_connected_client_count, fun(_) -> ok end), + ok = meck:expect(emqx_cm, decrement_connected_client_count, fun(_) -> ok end), %% Meck Limiter ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), %% Meck Pd diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index bac0444b4..1ac0b934b 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -50,8 +50,8 @@ init_per_testcase(TestCase, Config) when -> %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), - ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end), - ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end), + ok = meck:expect(emqx_cm, increment_connected_client_count, fun(_) -> ok end), + ok = meck:expect(emqx_cm, decrement_connected_client_count, fun(_) -> ok end), %% Mock cowboy_req ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),