refactor(emqx_cm): use `counters` in place of internal state

This commit is contained in:
Thales Macedo Garitezi 2021-11-05 09:21:24 -03:00
parent 9af60ac126
commit eb5e1a5cb9
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
2 changed files with 83 additions and 123 deletions

View File

@ -22,6 +22,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-logger_header("[CM]"). -logger_header("[CM]").
@ -92,6 +93,8 @@
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'} {?CHAN_CONN_TAB, 'connections.count', 'connections.max'}
]). ]).
-define(CONN_CLIENT_CTR, connected_client_counter).
-define(CONN_CLIENT_CTR_IDX, 1).
-define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}). -define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}).
%% Batch drain %% Batch drain
@ -148,12 +151,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
do_unregister_channel(Chan) -> do_unregister_channel(Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan), ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete(?CHAN_CONN_TAB, Chan), true = ets:delete(?CHAN_CONN_TAB, Chan),
case ets:take(?CHAN_INFO_TAB, Chan) of true = ets:delete(?CHAN_INFO_TAB, Chan),
[{_, #{conn_state := connected}, _}] ->
decrement_connected_client_count();
_ ->
ok
end,
ets:delete_object(?CHAN_TAB, Chan). ets:delete_object(?CHAN_TAB, Chan).
-spec(connection_closed(emqx_types:clientid()) -> true). -spec(connection_closed(emqx_types:clientid()) -> true).
@ -439,7 +437,6 @@ rpc_call(Node, Fun, Args, Timeout) ->
%% @private %% @private
cast(Msg) -> gen_server:cast(?CM, Msg). cast(Msg) -> gen_server:cast(?CM, Msg).
call(Msg) -> gen_server:call(?CM, Msg, infinity).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -450,17 +447,15 @@ init([]) ->
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]), ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
CRef = counters:new(1, [write_concurrency]),
ok = persistent_term:put({?MODULE, ?CONN_CLIENT_CTR}, CRef),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
RefreshPeriod = application:get_env(emqx, connected_client_count_refresh_period, 60000), RefreshPeriod = application:get_env(emqx, connected_client_count_refresh_period, 60000),
State = #{ chan_pmon => emqx_pmon:new() State = #{ chan_pmon => emqx_pmon:new()
, connected_client_count => 0
, connected_client_refresh_period => RefreshPeriod , connected_client_refresh_period => RefreshPeriod
}, },
{ok, ensure_refresh_timer(State)}. {ok, ensure_refresh_timer(State)}.
handle_call({connected_client_count, get}, _From,
State = #{connected_client_count := CCCount}) ->
{reply, CCCount, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
@ -468,18 +463,6 @@ handle_call(Req, _From, State) ->
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_cast({connected_client_count, inc},
State0 = #{connected_client_count := CCCount0}) ->
?tp(emqx_cm_connected_client_count_inc, #{count_before => CCCount0}),
CCCount = CCCount0 + 1,
State = State0#{connected_client_count := CCCount},
{noreply, State};
handle_cast({connected_client_count, dec},
State0 = #{connected_client_count := CCCount0}) ->
?tp(emqx_cm_connected_client_count_dec, #{count_before => CCCount0}),
CCCount = max(0, CCCount0 - 1),
State = State0#{connected_client_count := CCCount},
{noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]), ?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}. {noreply, State}.
@ -487,17 +470,29 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(
fun({ChanPid, ClientID}) ->
Chan = {ClientID, ChanPid},
case ets:lookup(?CHAN_INFO_TAB, Chan) of
[{Chan, #{conn_state := connected}, _}] ->
decrement_connected_client_count();
_ ->
ok
end
end,
Items),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_info({timeout, TRef, refresh_connected_client_count}, handle_info({timeout, TRef, refresh_connected_client_count},
State0 = #{connected_client_refresh_timer := TRef, State0 = #{connected_client_refresh_timer := TRef}) ->
connected_client_count := _CCCount0}) ->
CCCount = refresh_connected_client_count(), CCCount = refresh_connected_client_count(),
?tp(emqx_cm_refresh_connected_client_count, ?tp(emqx_cm_refresh_connected_client_count,
#{ new_count => CCCount #{ new_count => CCCount
, old_count => _CCCount0 , old_count => get_connected_client_count()
}), }),
State = ensure_refresh_timer(State0#{connected_client_count := CCCount}), CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}),
ok = counters:put(CRef, ?CONN_CLIENT_CTR_IDX, CCCount),
State = ensure_refresh_timer(State0),
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
@ -542,13 +537,18 @@ get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
increment_connected_client_count() -> increment_connected_client_count() ->
cast({connected_client_count, inc}). ?tp(emqx_cm_connected_client_count_inc, #{}),
CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}),
ok = counters:add(CRef, ?CONN_CLIENT_CTR_IDX, 1).
decrement_connected_client_count() -> decrement_connected_client_count() ->
cast({connected_client_count, dec}). ?tp(emqx_cm_connected_client_count_dec, #{}),
CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}),
ok = counters:sub(CRef, ?CONN_CLIENT_CTR_IDX, 1).
get_connected_client_count() -> get_connected_client_count() ->
call({connected_client_count, get}). CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}),
max(0, counters:get(CRef, ?CONN_CLIENT_CTR_IDX)).
ensure_refresh_timer(State = #{connected_client_refresh_period := RefreshPeriod}) -> ensure_refresh_timer(State = #{connected_client_refresh_period := RefreshPeriod}) ->
TRef = emqx_misc:start_timer(RefreshPeriod, refresh_connected_client_count), TRef = emqx_misc:start_timer(RefreshPeriod, refresh_connected_client_count),

View File

@ -35,8 +35,7 @@ all() ->
groups() -> groups() ->
TCs = emqx_ct:all(?MODULE), TCs = emqx_ct:all(?MODULE),
ConnClientTCs = [ t_connected_client_count_refresh ConnClientTCs = [ t_connected_client_count_persistent
, t_connected_client_count_persistent
, t_connected_client_stats , t_connected_client_stats
], ],
OtherTCs = TCs -- ConnClientTCs, OtherTCs = TCs -- ConnClientTCs,
@ -328,98 +327,56 @@ t_connected_client_count_persistent({init, Config}) ->
Config; Config;
t_connected_client_count_persistent(Config) when is_list(Config) -> t_connected_client_count_persistent(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config), ConnFun = ?config(conn_fun, Config),
ClientID = <<"clientid">>,
?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_cm:get_connected_client_count()),
{ok, ConnPid0} = emqtt:start_link([ {clean_start, false} {ok, ConnPid0} = emqtt:start_link([ {clean_start, false}
, {clientid, <<"clientid">>} , {clientid, ClientID}
| Config]), | Config]),
{{ok, _}, _} = wait_for_events( {{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid0) end, fun() -> emqtt:ConnFun(ConnPid0) end,
[emqx_cm_connected_client_count_inc] [emqx_cm_connected_client_count_inc]
), ),
?assertEqual(1, emqx_cm:get_connected_client_count()), ?assertEqual(1, emqx_cm:get_connected_client_count()),
{ok, _} = wait_for_events( {ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end, fun() -> emqtt:disconnect(ConnPid0) end,
[emqx_cm_connected_client_count_dec] [emqx_cm_connected_client_count_dec]
), ),
?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_cm:get_connected_client_count()),
%% reconnecting %% reconnecting
{ok, ConnPid1} = emqtt:start_link([ {clean_start, false} {ok, ConnPid1} = emqtt:start_link([ {clean_start, false}
, {clientid, <<"clientid">>} , {clientid, ClientID}
| Config | Config
]), ]),
{{ok, _}, _} = wait_for_events( {{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end, fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc] [emqx_cm_connected_client_count_inc]
), ),
?assertEqual(1, emqx_cm:get_connected_client_count()), ?assertEqual(1, emqx_cm:get_connected_client_count()),
%% taking over %% taking over
{ok, ConnPid2} = emqtt:start_link([ {clean_start, false} {ok, ConnPid2} = emqtt:start_link([ {clean_start, false}
, {clientid, <<"clientid">>} , {clientid, ClientID}
| Config | Config
]), ]),
{{ok, _}, _} = wait_for_events( {{ok, _}, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid2) end, fun() -> emqtt:ConnFun(ConnPid2) end,
[ emqx_cm_connected_client_count_dec [ emqx_cm_connected_client_count_inc
, emqx_cm_connected_client_count_inc , emqx_cm_connected_client_count_dec
] ],
), 500
),
?assertEqual(1, emqx_cm:get_connected_client_count()), ?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process %% abnormal exit of channel process
[ChanPid] = emqx_cm:all_channels(), [ChanPid] = emqx_cm:all_channels(),
{true, _} = wait_for_events( {true, {ok, [_]}} = wait_for_events(
fun() -> exit(ChanPid, kill) end, fun() -> exit(ChanPid, kill) end,
[emqx_cm_connected_client_count_dec] [emqx_cm_connected_client_count_dec]
), ),
?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_cm:get_connected_client_count()),
ok; ok;
t_connected_client_count_persistent({'end', _Config}) -> t_connected_client_count_persistent({'end', _Config}) ->
snabbkaffe:stop(), snabbkaffe:stop(),
ok. ok.
t_connected_client_count_refresh({init, Config}) ->
ok = snabbkaffe:start_trace(),
OldConfig = application:get_env(emqx, connected_client_count_refresh_period, undefined),
application:set_env(emqx, connected_client_count_refresh_period, 100),
ok = supervisor:terminate_child(emqx_cm_sup, manager),
{ok, _} = supervisor:restart_child(emqx_cm_sup, manager),
[{old_config, OldConfig} | Config];
t_connected_client_count_refresh(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
?assertEqual(0, emqx_cm:get_connected_client_count()),
{ok, ConnPid} = emqtt:start_link([ {clean_start, false}
, {clientid, <<"clientid">>}
| Config
]),
{{ok, _}, _} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid) end,
[emqx_cm_connected_client_count_inc]
),
%% simulate count mismatch
insert_fake_channels(),
?block_until(
#{ ?snk_kind := emqx_cm_refresh_connected_client_count
, new_count := 10
, old_count := 1
},
150
),
?assertEqual(10, emqx_cm:get_connected_client_count()),
emqtt:disconnect(ConnPid),
ok;
t_connected_client_count_refresh({'end', Config}) ->
OldConfig = proplists:get_value(old_config, Config),
case OldConfig of
Val when is_integer(Val) ->
application:set_env(emqx, connected_client_count_refresh_period, OldConfig);
_ ->
skip
end,
snabbkaffe:stop(),
ets:delete_all_objects(emqx_channel_info),
ok = supervisor:terminate_child(emqx_cm_sup, manager),
{ok, _} = supervisor:restart_child(emqx_cm_sup, manager),
ok.
t_connected_client_stats({init, Config}) -> t_connected_client_stats({init, Config}) ->
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
@ -434,28 +391,28 @@ t_connected_client_stats(Config) when is_list(Config) ->
, {clientid, <<"clientid">>} , {clientid, <<"clientid">>}
| Config | Config
]), ]),
{{ok, _}, _} = wait_for_events( {{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid) end, fun() -> emqtt:ConnFun(ConnPid) end,
[emqx_cm_connected_client_count_inc] [emqx_cm_connected_client_count_inc]
), ),
%% ensure stats are synchronized %% ensure stats are synchronized
wait_for_stats( {_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0, fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count', [#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}] max_stat => 'live_connections.max'}]
), ),
?assertEqual(1, emqx_stats:getstat('live_connections.count')), ?assertEqual(1, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')), ?assertEqual(1, emqx_stats:getstat('live_connections.max')),
{ok, _} = wait_for_events( {ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid) end, fun() -> emqtt:disconnect(ConnPid) end,
[emqx_cm_connected_client_count_dec] [emqx_cm_connected_client_count_dec]
), ),
%% ensure stats are synchronized %% ensure stats are synchronized
wait_for_stats( {_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0, fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count', [#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}] max_stat => 'live_connections.max'}]
), ),
?assertEqual(0, emqx_stats:getstat('live_connections.count')), ?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')), ?assertEqual(1, emqx_stats:getstat('live_connections.max')),
ok; ok;
@ -465,16 +422,19 @@ t_connected_client_stats({'end', _Config}) ->
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
ok. ok.
wait_for_events(Action, Kinds) -> wait_for_events(Action, Kinds) ->
wait_for_events(Action, Kinds, 100).
wait_for_events(Action, Kinds, Timeout) ->
Predicate = fun(#{?snk_kind := K}) -> Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds) lists:member(K, Kinds)
end, end,
N = length(Kinds), N = length(Kinds),
Timeout = 100,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(), Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of case snabbkaffe_collector:receive_events(Sub) of
{timeout, []} -> {timeout, _} ->
{Res, timeout}; {Res, timeout};
{ok, Events} -> {ok, Events} ->
{Res, {ok, Events}} {Res, {ok, Events}}
@ -482,7 +442,7 @@ wait_for_events(Action, Kinds) ->
wait_for_stats(Action, Stats) -> wait_for_stats(Action, Stats) ->
Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) -> Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) ->
Stat = maps:take( Stat = maps:with(
[ count_stat [ count_stat
, max_stat , max_stat
], Event), ], Event),
@ -495,7 +455,7 @@ wait_for_stats(Action, Stats) ->
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(), Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of case snabbkaffe_collector:receive_events(Sub) of
{timeout, []} -> {timeout, _} ->
{Res, timeout}; {Res, timeout};
{ok, Events} -> {ok, Events} ->
{Res, {ok, Events}} {Res, {ok, Events}}