diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index b567ea8cc..063bbf324 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -22,6 +22,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[CM]"). @@ -92,6 +93,8 @@ {?CHAN_CONN_TAB, 'connections.count', 'connections.max'} ]). +-define(CONN_CLIENT_CTR, connected_client_counter). +-define(CONN_CLIENT_CTR_IDX, 1). -define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}). %% Batch drain @@ -148,12 +151,7 @@ unregister_channel(ClientId) when is_binary(ClientId) -> do_unregister_channel(Chan) -> ok = emqx_cm_registry:unregister_channel(Chan), true = ets:delete(?CHAN_CONN_TAB, Chan), - case ets:take(?CHAN_INFO_TAB, Chan) of - [{_, #{conn_state := connected}, _}] -> - decrement_connected_client_count(); - _ -> - ok - end, + true = ets:delete(?CHAN_INFO_TAB, Chan), ets:delete_object(?CHAN_TAB, Chan). -spec(connection_closed(emqx_types:clientid()) -> true). @@ -439,7 +437,6 @@ rpc_call(Node, Fun, Args, Timeout) -> %% @private cast(Msg) -> gen_server:cast(?CM, Msg). -call(Msg) -> gen_server:call(?CM, Msg, infinity). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -450,17 +447,15 @@ init([]) -> ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), + CRef = counters:new(1, [write_concurrency]), + ok = persistent_term:put({?MODULE, ?CONN_CLIENT_CTR}, CRef), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), RefreshPeriod = application:get_env(emqx, connected_client_count_refresh_period, 60000), State = #{ chan_pmon => emqx_pmon:new() - , connected_client_count => 0 , connected_client_refresh_period => RefreshPeriod }, {ok, ensure_refresh_timer(State)}. -handle_call({connected_client_count, get}, _From, - State = #{connected_client_count := CCCount}) -> - {reply, CCCount, State}; handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -468,18 +463,6 @@ handle_call(Req, _From, State) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), {noreply, State#{chan_pmon := PMon1}}; -handle_cast({connected_client_count, inc}, - State0 = #{connected_client_count := CCCount0}) -> - ?tp(emqx_cm_connected_client_count_inc, #{count_before => CCCount0}), - CCCount = CCCount0 + 1, - State = State0#{connected_client_count := CCCount}, - {noreply, State}; -handle_cast({connected_client_count, dec}, - State0 = #{connected_client_count := CCCount0}) -> - ?tp(emqx_cm_connected_client_count_dec, #{count_before => CCCount0}), - CCCount = max(0, CCCount0 - 1), - State = State0#{connected_client_count := CCCount}, - {noreply, State}; handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -487,17 +470,29 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), + lists:foreach( + fun({ChanPid, ClientID}) -> + Chan = {ClientID, ChanPid}, + case ets:lookup(?CHAN_INFO_TAB, Chan) of + [{Chan, #{conn_state := connected}, _}] -> + decrement_connected_client_count(); + _ -> + ok + end + end, + Items), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info({timeout, TRef, refresh_connected_client_count}, - State0 = #{connected_client_refresh_timer := TRef, - connected_client_count := _CCCount0}) -> + State0 = #{connected_client_refresh_timer := TRef}) -> CCCount = refresh_connected_client_count(), ?tp(emqx_cm_refresh_connected_client_count, #{ new_count => CCCount - , old_count => _CCCount0 + , old_count => get_connected_client_count() }), - State = ensure_refresh_timer(State0#{connected_client_count := CCCount}), + CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}), + ok = counters:put(CRef, ?CONN_CLIENT_CTR_IDX, CCCount), + State = ensure_refresh_timer(State0), {noreply, State}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), @@ -542,13 +537,18 @@ get_chann_conn_mod(ClientId, ChanPid) -> rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). increment_connected_client_count() -> - cast({connected_client_count, inc}). + ?tp(emqx_cm_connected_client_count_inc, #{}), + CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}), + ok = counters:add(CRef, ?CONN_CLIENT_CTR_IDX, 1). decrement_connected_client_count() -> - cast({connected_client_count, dec}). + ?tp(emqx_cm_connected_client_count_dec, #{}), + CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}), + ok = counters:sub(CRef, ?CONN_CLIENT_CTR_IDX, 1). get_connected_client_count() -> - call({connected_client_count, get}). + CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}), + max(0, counters:get(CRef, ?CONN_CLIENT_CTR_IDX)). ensure_refresh_timer(State = #{connected_client_refresh_period := RefreshPeriod}) -> TRef = emqx_misc:start_timer(RefreshPeriod, refresh_connected_client_count), diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 4dc73701f..b6531a4c0 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -35,8 +35,7 @@ all() -> groups() -> TCs = emqx_ct:all(?MODULE), - ConnClientTCs = [ t_connected_client_count_refresh - , t_connected_client_count_persistent + ConnClientTCs = [ t_connected_client_count_persistent , t_connected_client_stats ], OtherTCs = TCs -- ConnClientTCs, @@ -328,98 +327,56 @@ t_connected_client_count_persistent({init, Config}) -> Config; t_connected_client_count_persistent(Config) when is_list(Config) -> ConnFun = ?config(conn_fun, Config), + ClientID = <<"clientid">>, ?assertEqual(0, emqx_cm:get_connected_client_count()), {ok, ConnPid0} = emqtt:start_link([ {clean_start, false} - , {clientid, <<"clientid">>} + , {clientid, ClientID} | Config]), - {{ok, _}, _} = wait_for_events( - fun() -> emqtt:ConnFun(ConnPid0) end, - [emqx_cm_connected_client_count_inc] - ), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid0) end, + [emqx_cm_connected_client_count_inc] + ), ?assertEqual(1, emqx_cm:get_connected_client_count()), - {ok, _} = wait_for_events( - fun() -> emqtt:disconnect(ConnPid0) end, - [emqx_cm_connected_client_count_dec] - ), + {ok, {ok, [_]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid0) end, + [emqx_cm_connected_client_count_dec] + ), ?assertEqual(0, emqx_cm:get_connected_client_count()), %% reconnecting {ok, ConnPid1} = emqtt:start_link([ {clean_start, false} - , {clientid, <<"clientid">>} + , {clientid, ClientID} | Config ]), - {{ok, _}, _} = wait_for_events( - fun() -> emqtt:ConnFun(ConnPid1) end, - [emqx_cm_connected_client_count_inc] - ), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), ?assertEqual(1, emqx_cm:get_connected_client_count()), %% taking over {ok, ConnPid2} = emqtt:start_link([ {clean_start, false} - , {clientid, <<"clientid">>} + , {clientid, ClientID} | Config ]), - {{ok, _}, _} = wait_for_events( - fun() -> emqtt:ConnFun(ConnPid2) end, - [ emqx_cm_connected_client_count_dec - , emqx_cm_connected_client_count_inc - ] - ), + {{ok, _}, {ok, [_, _]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid2) end, + [ emqx_cm_connected_client_count_inc + , emqx_cm_connected_client_count_dec + ], + 500 + ), ?assertEqual(1, emqx_cm:get_connected_client_count()), %% abnormal exit of channel process [ChanPid] = emqx_cm:all_channels(), - {true, _} = wait_for_events( - fun() -> exit(ChanPid, kill) end, - [emqx_cm_connected_client_count_dec] - ), + {true, {ok, [_]}} = wait_for_events( + fun() -> exit(ChanPid, kill) end, + [emqx_cm_connected_client_count_dec] + ), ?assertEqual(0, emqx_cm:get_connected_client_count()), ok; t_connected_client_count_persistent({'end', _Config}) -> snabbkaffe:stop(), ok. -t_connected_client_count_refresh({init, Config}) -> - ok = snabbkaffe:start_trace(), - OldConfig = application:get_env(emqx, connected_client_count_refresh_period, undefined), - application:set_env(emqx, connected_client_count_refresh_period, 100), - ok = supervisor:terminate_child(emqx_cm_sup, manager), - {ok, _} = supervisor:restart_child(emqx_cm_sup, manager), - [{old_config, OldConfig} | Config]; -t_connected_client_count_refresh(Config) when is_list(Config) -> - ConnFun = ?config(conn_fun, Config), - ?assertEqual(0, emqx_cm:get_connected_client_count()), - {ok, ConnPid} = emqtt:start_link([ {clean_start, false} - , {clientid, <<"clientid">>} - | Config - ]), - {{ok, _}, _} = wait_for_events( - fun() -> emqtt:ConnFun(ConnPid) end, - [emqx_cm_connected_client_count_inc] - ), - %% simulate count mismatch - insert_fake_channels(), - ?block_until( - #{ ?snk_kind := emqx_cm_refresh_connected_client_count - , new_count := 10 - , old_count := 1 - }, - 150 - ), - ?assertEqual(10, emqx_cm:get_connected_client_count()), - emqtt:disconnect(ConnPid), - ok; -t_connected_client_count_refresh({'end', Config}) -> - OldConfig = proplists:get_value(old_config, Config), - case OldConfig of - Val when is_integer(Val) -> - application:set_env(emqx, connected_client_count_refresh_period, OldConfig); - _ -> - skip - end, - snabbkaffe:stop(), - ets:delete_all_objects(emqx_channel_info), - ok = supervisor:terminate_child(emqx_cm_sup, manager), - {ok, _} = supervisor:restart_child(emqx_cm_sup, manager), - ok. - t_connected_client_stats({init, Config}) -> ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), @@ -434,28 +391,28 @@ t_connected_client_stats(Config) when is_list(Config) -> , {clientid, <<"clientid">>} | Config ]), - {{ok, _}, _} = wait_for_events( - fun() -> emqtt:ConnFun(ConnPid) end, - [emqx_cm_connected_client_count_inc] - ), + {{ok, _}, {ok, [_]}} = wait_for_events( + fun() -> emqtt:ConnFun(ConnPid) end, + [emqx_cm_connected_client_count_inc] + ), %% ensure stats are synchronized - wait_for_stats( - fun emqx_cm:stats_fun/0, - [#{count_stat => 'live_connections.count', - max_stat => 'live_connections.max'}] - ), + {_, {ok, [_]}} = wait_for_stats( + fun emqx_cm:stats_fun/0, + [#{count_stat => 'live_connections.count', + max_stat => 'live_connections.max'}] + ), ?assertEqual(1, emqx_stats:getstat('live_connections.count')), ?assertEqual(1, emqx_stats:getstat('live_connections.max')), - {ok, _} = wait_for_events( - fun() -> emqtt:disconnect(ConnPid) end, - [emqx_cm_connected_client_count_dec] - ), + {ok, {ok, [_]}} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid) end, + [emqx_cm_connected_client_count_dec] + ), %% ensure stats are synchronized - wait_for_stats( - fun emqx_cm:stats_fun/0, - [#{count_stat => 'live_connections.count', - max_stat => 'live_connections.max'}] - ), + {_, {ok, [_]}} = wait_for_stats( + fun emqx_cm:stats_fun/0, + [#{count_stat => 'live_connections.count', + max_stat => 'live_connections.max'}] + ), ?assertEqual(0, emqx_stats:getstat('live_connections.count')), ?assertEqual(1, emqx_stats:getstat('live_connections.max')), ok; @@ -465,16 +422,19 @@ t_connected_client_stats({'end', _Config}) -> {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), ok. + wait_for_events(Action, Kinds) -> + wait_for_events(Action, Kinds, 100). + +wait_for_events(Action, Kinds, Timeout) -> Predicate = fun(#{?snk_kind := K}) -> lists:member(K, Kinds) end, N = length(Kinds), - Timeout = 100, {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), Res = Action(), case snabbkaffe_collector:receive_events(Sub) of - {timeout, []} -> + {timeout, _} -> {Res, timeout}; {ok, Events} -> {Res, {ok, Events}} @@ -482,7 +442,7 @@ wait_for_events(Action, Kinds) -> wait_for_stats(Action, Stats) -> Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) -> - Stat = maps:take( + Stat = maps:with( [ count_stat , max_stat ], Event), @@ -495,7 +455,7 @@ wait_for_stats(Action, Stats) -> {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), Res = Action(), case snabbkaffe_collector:receive_events(Sub) of - {timeout, []} -> + {timeout, _} -> {Res, timeout}; {ok, Events} -> {Res, {ok, Events}}