diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 12043346e..1c5bca93a 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -93,8 +93,9 @@ {?CHAN_CONN_TAB, 'connections.count', 'connections.max'} ]). --define(CONN_CLIENT_CTR, connected_client_counter). --define(CONN_CLIENT_CTR_IDX, 1). +-define(CONN_CLIENT_TAB, connected_client_counter). +-define(CONN_CLIENT_TAB_KEY, connected_client_count). +-define(CONN_CLIENT_TAB_IDX, 2). -define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}). %% Batch drain @@ -447,8 +448,8 @@ init([]) -> ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), - CRef = counters:new(1, [write_concurrency]), - ok = persistent_term:put({?MODULE, ?CONN_CLIENT_CTR}, CRef), + ok = emqx_tables:new(?CONN_CLIENT_TAB, [set | TabOpts]), + true = ets:insert(?CONN_CLIENT_TAB, {?CONN_CLIENT_TAB_KEY, 0}), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), State = #{chan_pmon => emqx_pmon:new()}, {ok, State}. @@ -524,21 +525,17 @@ get_chann_conn_mod(ClientId, ChanPid) -> increment_connected_client_count() -> ?tp(emqx_cm_connected_client_count_inc, #{}), - CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}), - ok = counters:add(CRef, ?CONN_CLIENT_CTR_IDX, 1). + ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, + {?CONN_CLIENT_TAB_IDX, 1}), + ok. decrement_connected_client_count() -> ?tp(emqx_cm_connected_client_count_dec, #{}), - CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}), - ok = counters:sub(CRef, ?CONN_CLIENT_CTR_IDX, 1). + Threshold = 0, + SetValue = 0, + ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, + {?CONN_CLIENT_TAB_IDX, -1, Threshold, SetValue}), + ok. get_connected_client_count() -> - CRef = persistent_term:get({?MODULE, ?CONN_CLIENT_CTR}), - %% check if inconsistent; if so, reset to 0 - case counters:get(CRef, ?CONN_CLIENT_CTR_IDX) of - N when N < 0 -> - counters:put(CRef, ?CONN_CLIENT_CTR_IDX, 0), - 0; - N -> - N - end. + ets:lookup_element(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, ?CONN_CLIENT_TAB_IDX). diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 9558dfd28..5d42ffb99 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -33,6 +33,8 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> %% CM Meck ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end), + ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end), %% Access Control Meck ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), ok = meck:expect(emqx_access_control, authenticate, @@ -835,4 +837,3 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, {overall_messages_routing, {10, 1}}]). - diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index a6b2b614a..f4e8e909e 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -36,6 +36,8 @@ init_per_suite(Config) -> ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end), + ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end), %% Meck Limiter ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), %% Meck Pd @@ -112,7 +114,7 @@ t_ws_pingreq_before_connected(_) -> t_info(_) -> CPid = spawn(fun() -> - receive + receive {'$gen_call', From, info} -> gen_server:reply(From, emqx_connection:info(st())) after @@ -132,7 +134,7 @@ t_info_limiter(_) -> t_stats(_) -> CPid = spawn(fun() -> - receive + receive {'$gen_call', From, stats} -> gen_server:reply(From, emqx_connection:stats(st())) after @@ -147,10 +149,10 @@ t_stats(_) -> {send_pend,0}| _] , Stats). t_process_msg(_) -> - with_conn(fun(CPid) -> - ok = meck:expect(emqx_channel, handle_in, - fun(_Packet, Channel) -> - {ok, Channel} + with_conn(fun(CPid) -> + ok = meck:expect(emqx_channel, handle_in, + fun(_Packet, Channel) -> + {ok, Channel} end), CPid ! {incoming, ?PACKET(?PINGREQ)}, CPid ! {incoming, undefined}, @@ -318,7 +320,7 @@ t_with_channel(_) -> t_handle_outgoing(_) -> ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). - + t_handle_info(_) -> ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())), @@ -345,7 +347,7 @@ t_activate_socket(_) -> State = st(), {ok, NStats} = emqx_connection:activate_socket(State), ?assertEqual(running, emqx_connection:info(sockstate, NStats)), - + State1 = st(#{sockstate => blocked}), ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 56d038c23..bac0444b4 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -48,6 +48,10 @@ init_per_testcase(TestCase, Config) when TestCase =/= t_ws_pingreq_before_connected, TestCase =/= t_ws_non_check_origin -> + %% Meck Cm + ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end), + ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end), %% Mock cowboy_req ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end), @@ -95,7 +99,8 @@ end_per_testcase(TestCase, _Config) when TestCase =/= t_ws_pingreq_before_connected -> lists:foreach(fun meck:unload/1, - [cowboy_req, + [emqx_cm, + cowboy_req, emqx_zone, emqx_access_control, emqx_broker, @@ -389,14 +394,12 @@ t_handle_info_close(_) -> {[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()). t_handle_info_event(_) -> - ok = meck:new(emqx_cm, [passthrough, no_history]), ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end), {ok, _} = ?ws_conn:handle_info({event, connected}, st()), {ok, _} = ?ws_conn:handle_info({event, disconnected}, st()), - {ok, _} = ?ws_conn:handle_info({event, updated}, st()), - ok = meck:unload(emqx_cm). + {ok, _} = ?ws_conn:handle_info({event, updated}, st()). t_handle_timeout_idle_timeout(_) -> TRef = make_ref(),