refactor(emqx_cm): use an ETS table instead of counters

It'll be much easier to mantain consistency in the counter this way
This commit is contained in:
Thales Macedo Garitezi 2021-11-05 17:12:28 -03:00
parent dbb519ee0e
commit 8a4e0a3ecb
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
4 changed files with 33 additions and 30 deletions

View File

@ -93,8 +93,9 @@
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'}
]).
-define(CONN_CLIENT_CTR, connected_client_counter).
-define(CONN_CLIENT_CTR_IDX, 1).
-define(CONN_CLIENT_TAB, connected_client_counter).
-define(CONN_CLIENT_TAB_KEY, connected_client_count).
-define(CONN_CLIENT_TAB_IDX, 2).
-define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}).
%% Batch drain
@ -447,8 +448,8 @@ init([]) ->
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
CRef = counters:new(1, [write_concurrency]),
ok = persistent_term:put({?MODULE, ?CONN_CLIENT_CTR}, CRef),
ok = emqx_tables:new(?CONN_CLIENT_TAB, [set | TabOpts]),
true = ets:insert(?CONN_CLIENT_TAB, {?CONN_CLIENT_TAB_KEY, 0}),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
State = #{chan_pmon => emqx_pmon:new()},
{ok, State}.
@ -524,21 +525,17 @@ get_chann_conn_mod(ClientId, ChanPid) ->
increment_connected_client_count() ->
?tp(emqx_cm_connected_client_count_inc, #{}),
CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}),
ok = counters:add(CRef, ?CONN_CLIENT_CTR_IDX, 1).
ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY,
{?CONN_CLIENT_TAB_IDX, 1}),
ok.
decrement_connected_client_count() ->
?tp(emqx_cm_connected_client_count_dec, #{}),
CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}),
ok = counters:sub(CRef, ?CONN_CLIENT_CTR_IDX, 1).
Threshold = 0,
SetValue = 0,
ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY,
{?CONN_CLIENT_TAB_IDX, -1, Threshold, SetValue}),
ok.
get_connected_client_count() ->
CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}),
%% check if inconsistent; if so, reset to 0
case counters:get(CRef, ?CONN_CLIENT_CTR_IDX) of
N when N < 0 ->
counters:put(CRef, ?CONN_CLIENT_CTR_IDX, 0),
0;
N ->
N
end.
ets:lookup_element(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, ?CONN_CLIENT_TAB_IDX).

View File

@ -33,6 +33,8 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
%% CM Meck
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end),
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
@ -835,4 +837,3 @@ session(InitFields) when is_map(InitFields) ->
quota() ->
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
{overall_messages_routing, {10, 1}}]).

View File

@ -36,6 +36,8 @@ init_per_suite(Config) ->
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
%% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end),
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end),
%% Meck Limiter
ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
%% Meck Pd
@ -112,7 +114,7 @@ t_ws_pingreq_before_connected(_) ->
t_info(_) ->
CPid = spawn(fun() ->
receive
receive
{'$gen_call', From, info} ->
gen_server:reply(From, emqx_connection:info(st()))
after
@ -132,7 +134,7 @@ t_info_limiter(_) ->
t_stats(_) ->
CPid = spawn(fun() ->
receive
receive
{'$gen_call', From, stats} ->
gen_server:reply(From, emqx_connection:stats(st()))
after
@ -147,10 +149,10 @@ t_stats(_) ->
{send_pend,0}| _] , Stats).
t_process_msg(_) ->
with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in,
fun(_Packet, Channel) ->
{ok, Channel}
with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in,
fun(_Packet, Channel) ->
{ok, Channel}
end),
CPid ! {incoming, ?PACKET(?PINGREQ)},
CPid ! {incoming, undefined},
@ -318,7 +320,7 @@ t_with_channel(_) ->
t_handle_outgoing(_) ->
?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
t_handle_info(_) ->
?assertMatch({ok, {event,running}, _NState},
emqx_connection:handle_info(activate_socket, st())),
@ -345,7 +347,7 @@ t_activate_socket(_) ->
State = st(),
{ok, NStats} = emqx_connection:activate_socket(State),
?assertEqual(running, emqx_connection:info(sockstate, NStats)),
State1 = st(#{sockstate => blocked}),
?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)),

View File

@ -48,6 +48,10 @@ init_per_testcase(TestCase, Config) when
TestCase =/= t_ws_pingreq_before_connected,
TestCase =/= t_ws_non_check_origin
->
%% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end),
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end),
%% Mock cowboy_req
ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),
@ -95,7 +99,8 @@ end_per_testcase(TestCase, _Config) when
TestCase =/= t_ws_pingreq_before_connected
->
lists:foreach(fun meck:unload/1,
[cowboy_req,
[emqx_cm,
cowboy_req,
emqx_zone,
emqx_access_control,
emqx_broker,
@ -389,14 +394,12 @@ t_handle_info_close(_) ->
{[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()).
t_handle_info_event(_) ->
ok = meck:new(emqx_cm, [passthrough, no_history]),
ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end),
{ok, _} = ?ws_conn:handle_info({event, connected}, st()),
{ok, _} = ?ws_conn:handle_info({event, disconnected}, st()),
{ok, _} = ?ws_conn:handle_info({event, updated}, st()),
ok = meck:unload(emqx_cm).
{ok, _} = ?ws_conn:handle_info({event, updated}, st()).
t_handle_timeout_idle_timeout(_) ->
TRef = make_ref(),