feat(stats): track live / connected channel count for monitoring

In order to correctly display the number of connected clients in our
monitor dashboard, we need to track those connections that are
actually connected to clients, not considering connections from
persistent sessions that are disconnected.  Today, the
`connections.count` that is displayed in the dashboards considers
those disconnected persistent sessions as well.
This commit is contained in:
Thales Macedo Garitezi 2021-11-03 17:45:48 -03:00
parent 4b2586fec4
commit b9270ad719
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
9 changed files with 359 additions and 29 deletions

View File

@ -1536,6 +1536,8 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) -> clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
ChanPid = self(),
emqx_cm:mark_channel_connected(ChanPid),
Channel#channel{conninfo = NConnInfo, Channel#channel{conninfo = NConnInfo,
conn_state = connected conn_state = connected
}. }.
@ -1624,6 +1626,8 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) -> clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
ChanPid = self(),
emqx_cm:mark_channel_disconnected(ChanPid),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1725,4 +1729,3 @@ flag(false) -> 0.
set_field(Name, Value, Channel) -> set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)), Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
setelement(Pos+1, Channel, Value). setelement(Pos+1, Channel, Value).

View File

@ -22,6 +22,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-logger_header("[CM]"). -logger_header("[CM]").
@ -72,7 +73,12 @@
]). ]).
%% Internal export %% Internal export
-export([stats_fun/0, clean_down/1]). -export([ stats_fun/0
, clean_down/1
, mark_channel_connected/1
, mark_channel_disconnected/1
, get_connected_client_count/0
]).
-type(chan_pid() :: pid()). -type(chan_pid() :: pid()).
@ -80,11 +86,13 @@
-define(CHAN_TAB, emqx_channel). -define(CHAN_TAB, emqx_channel).
-define(CHAN_CONN_TAB, emqx_channel_conn). -define(CHAN_CONN_TAB, emqx_channel_conn).
-define(CHAN_INFO_TAB, emqx_channel_info). -define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).
-define(CHAN_STATS, -define(CHAN_STATS,
[{?CHAN_TAB, 'channels.count', 'channels.max'}, [{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'} {?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
]). ]).
%% Batch drain %% Batch drain
@ -437,8 +445,10 @@ init([]) ->
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]), ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?CHAN_LIVE_TAB, [set, {write_concurrency, true} | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #{chan_pmon => emqx_pmon:new()}}. State = #{chan_pmon => emqx_pmon:new()},
{ok, State}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
@ -447,17 +457,21 @@ handle_call(Req, _From, State) ->
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]), ?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}),
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(
fun({ChanPid, _ClientID}) ->
mark_channel_disconnected(ChanPid)
end,
Items),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
@ -493,3 +507,18 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
get_chann_conn_mod(ClientId, ChanPid) -> get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
mark_channel_connected(ChanPid) ->
?tp(emqx_cm_connected_client_count_inc, #{}),
ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}),
ok.
mark_channel_disconnected(ChanPid) ->
?tp(emqx_cm_connected_client_count_dec, #{}),
ets:delete(?CHAN_LIVE_TAB, ChanPid),
ok.
get_connected_client_count() ->
case ets:info(?CHAN_LIVE_TAB, size) of
undefined -> 0;
Size -> Size
end.

View File

@ -518,7 +518,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
end, end,
?tp(info, terminate, #{reason => Reason}), ?tp(info, terminate, #{reason => Reason}),
maybe_raise_excption(Reason). maybe_raise_exception(Reason).
%% close socket, discard new state, always return ok. %% close socket, discard new state, always return ok.
close_socket_ok(State) -> close_socket_ok(State) ->
@ -526,12 +526,12 @@ close_socket_ok(State) ->
ok. ok.
%% tell truth about the original exception %% tell truth about the original exception
maybe_raise_excption(#{exception := Exception, maybe_raise_exception(#{exception := Exception,
context := Context, context := Context,
stacktrace := Stacktrace stacktrace := Stacktrace
}) -> }) ->
erlang:raise(Exception, Context, Stacktrace); erlang:raise(Exception, Context, Stacktrace);
maybe_raise_excption(Reason) -> maybe_raise_exception(Reason) ->
exit(Reason). exit(Reason).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -21,6 +21,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-logger_header("[Stats]"). -logger_header("[Stats]").
@ -67,8 +68,10 @@
%% Connection stats %% Connection stats
-define(CONNECTION_STATS, -define(CONNECTION_STATS,
['connections.count', %% Count of Concurrent Connections [ 'connections.count' %% Count of Concurrent Connections
'connections.max' %% Maximum Number of Concurrent Connections , 'connections.max' %% Maximum Number of Concurrent Connections
, 'live_connections.count' %% Count of connected clients
, 'live_connections.max' %% Maximum number of connected clients
]). ]).
%% Channel stats %% Channel stats
@ -216,6 +219,11 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
ets:insert(?TAB, {MaxStat, Val}) ets:insert(?TAB, {MaxStat, Val})
end, end,
safe_update_element(Stat, Val), safe_update_element(Stat, Val),
?tp(emqx_stats_setstat,
#{ count_stat => Stat
, max_stat => MaxStat
, value => Val
}),
{noreply, State}; {noreply, State};
handle_cast({update_interval, Update = #update{name = Name}}, handle_cast({update_interval, Update = #update{name = Name}},
@ -274,4 +282,3 @@ safe_update_element(Key, Val) ->
error:badarg -> error:badarg ->
?LOG(warning, "Failed to update ~0p to ~0p", [Key, Val]) ?LOG(warning, "Failed to update ~0p to ~0p", [Key, Val])
end. end.

View File

@ -23,20 +23,63 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
all() -> emqx_ct:all(?MODULE). all() ->
[ {group, all_cases}
, {group, connected_client_count_group}
].
init_per_suite(Config) -> groups() ->
TCs = emqx_ct:all(?MODULE),
ConnClientTCs = [ t_connected_client_count_persistent
, t_connected_client_count_anonymous
, t_connected_client_stats
],
OtherTCs = TCs -- ConnClientTCs,
[ {all_cases, [], OtherTCs}
, {connected_client_count_group, [ {group, tcp}
, {group, ws}
]}
, {tcp, [], ConnClientTCs}
, {ws, [], ConnClientTCs}
].
init_per_group(connected_client_count_group, Config) ->
Config;
init_per_group(tcp, Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
[{conn_fun, connect} | Config];
init_per_group(ws, Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
[ {ssl, false}
, {enable_websocket, true}
, {conn_fun, ws_connect}
, {port, 8083}
, {host, "localhost"}
| Config
];
init_per_group(_Group, Config) ->
emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_group(connected_client_count_group, _Config) ->
ok;
end_per_group(_Group, _Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}). ?MODULE:Case({init, Config}).
@ -277,6 +320,248 @@ t_stats_fun({'end', _Config}) ->
ok = emqx_broker:unsubscribe(<<"topic">>), ok = emqx_broker:unsubscribe(<<"topic">>),
ok = emqx_broker:unsubscribe(<<"topic2">>). ok = emqx_broker:unsubscribe(<<"topic2">>).
%% persistent sessions, when gone, do not contribute to connected
%% client count
t_connected_client_count_persistent({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_persistent(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
ClientID = <<"clientid">>,
?assertEqual(0, emqx_cm:get_connected_client_count()),
{ok, ConnPid0} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid0) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
{ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end,
[emqx_cm_connected_client_count_dec]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% reconnecting
{ok, ConnPid1} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% taking over
{ok, ConnPid2} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config
]),
{{ok, _}, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid2) end,
[ emqx_cm_connected_client_count_inc
, emqx_cm_connected_client_count_dec
],
500
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
ChanPids = emqx_cm:all_channels(),
{ok, {ok, [_, _]}} = wait_for_events(
fun() ->
lists:foreach(
fun(ChanPid) -> exit(ChanPid, kill) end,
ChanPids)
end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_persistent({'end', _Config}) ->
snabbkaffe:stop(),
ok.
%% connections without client_id also contribute to connected client
%% count
t_connected_client_count_anonymous({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_anonymous(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% first client
{ok, ConnPid0} = emqtt:start_link([ {clean_start, true}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid0) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% second client
{ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(2, emqx_cm:get_connected_client_count()),
%% when first client disconnects, shouldn't affect the second
{ok, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% reconnecting
{ok, ConnPid2} = emqtt:start_link([ {clean_start, true}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid2) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(2, emqx_cm:get_connected_client_count()),
{ok, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid1) end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
Chans = emqx_cm:all_channels(),
{ok, {ok, [_, _]}} = wait_for_events(
fun() ->
lists:foreach(
fun(ChanPid) -> exit(ChanPid, kill) end,
Chans)
end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_anonymous({'end', _Config}) ->
snabbkaffe:stop(),
ok.
t_connected_client_stats({init, Config}) ->
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
ok = snabbkaffe:start_trace(),
Config;
t_connected_client_stats(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
?assertEqual(0, emqx_cm:get_connected_client_count()),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(0, emqx_stats:getstat('live_connections.max')),
{ok, ConnPid} = emqtt:start_link([ {clean_start, true}
, {clientid, <<"clientid">>}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid) end,
[emqx_cm_connected_client_count_inc]
),
%% ensure stats are synchronized
{_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(1, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
{ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid) end,
[emqx_cm_connected_client_count_dec]
),
%% ensure stats are synchronized
{_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
ok;
t_connected_client_stats({'end', _Config}) ->
ok = snabbkaffe:stop(),
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
ok.
%% the count must be always non negative
t_connect_client_never_negative({init, Config}) ->
Config;
t_connect_client_never_negative(Config) when is_list(Config) ->
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% would go to -1
ChanPid = list_to_pid("<0.0.1>"),
emqx_cm:mark_channel_disconnected(ChanPid),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% would be 0, if really went to -1
emqx_cm:mark_channel_connected(ChanPid),
?assertEqual(1, emqx_cm:get_connected_client_count()),
ok;
t_connect_client_never_negative({'end', _Config}) ->
ok.
wait_for_events(Action, Kinds) ->
wait_for_events(Action, Kinds, 500).
wait_for_events(Action, Kinds, Timeout) ->
Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds)
end,
N = length(Kinds),
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
wait_for_stats(Action, Stats) ->
Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) ->
Stat = maps:with(
[ count_stat
, max_stat
], Event),
lists:member(Stat, Stats);
(_) ->
false
end,
N = length(Stats),
Timeout = 500,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
insert_fake_channels() ->
%% Insert copies to simulate missed counts
Tab = emqx_channel_info,
Key = ets:first(Tab),
[{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key),
ets:insert(Tab, [ {{"fake" ++ integer_to_list(N), undefined}, ChanInfo, Stats}
|| N <- lists:seq(1, 9)]),
%% these should not be counted
ets:insert(Tab, [ { {"fake" ++ integer_to_list(N), undefined}
, ChanInfo#{conn_state := disconnected}, Stats}
|| N <- lists:seq(10, 20)]).
recv_msgs(Count) -> recv_msgs(Count) ->
recv_msgs(Count, []). recv_msgs(Count, []).

View File

@ -33,6 +33,8 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
%% CM Meck %% CM Meck
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Access Control Meck %% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate, ok = meck:expect(emqx_access_control, authenticate,
@ -835,4 +837,3 @@ session(InitFields) when is_map(InitFields) ->
quota() -> quota() ->
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
{overall_messages_routing, {10, 1}}]). {overall_messages_routing, {10, 1}}]).

View File

@ -36,6 +36,8 @@ init_per_suite(Config) ->
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
%% Meck Cm %% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Meck Limiter %% Meck Limiter
ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
%% Meck Pd %% Meck Pd
@ -112,7 +114,7 @@ t_ws_pingreq_before_connected(_) ->
t_info(_) -> t_info(_) ->
CPid = spawn(fun() -> CPid = spawn(fun() ->
receive receive
{'$gen_call', From, info} -> {'$gen_call', From, info} ->
gen_server:reply(From, emqx_connection:info(st())) gen_server:reply(From, emqx_connection:info(st()))
after after
@ -132,7 +134,7 @@ t_info_limiter(_) ->
t_stats(_) -> t_stats(_) ->
CPid = spawn(fun() -> CPid = spawn(fun() ->
receive receive
{'$gen_call', From, stats} -> {'$gen_call', From, stats} ->
gen_server:reply(From, emqx_connection:stats(st())) gen_server:reply(From, emqx_connection:stats(st()))
after after
@ -147,10 +149,10 @@ t_stats(_) ->
{send_pend,0}| _] , Stats). {send_pend,0}| _] , Stats).
t_process_msg(_) -> t_process_msg(_) ->
with_conn(fun(CPid) -> with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in, ok = meck:expect(emqx_channel, handle_in,
fun(_Packet, Channel) -> fun(_Packet, Channel) ->
{ok, Channel} {ok, Channel}
end), end),
CPid ! {incoming, ?PACKET(?PINGREQ)}, CPid ! {incoming, ?PACKET(?PINGREQ)},
CPid ! {incoming, undefined}, CPid ! {incoming, undefined},
@ -318,7 +320,7 @@ t_with_channel(_) ->
t_handle_outgoing(_) -> t_handle_outgoing(_) ->
?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
t_handle_info(_) -> t_handle_info(_) ->
?assertMatch({ok, {event,running}, _NState}, ?assertMatch({ok, {event,running}, _NState},
emqx_connection:handle_info(activate_socket, st())), emqx_connection:handle_info(activate_socket, st())),
@ -345,7 +347,7 @@ t_activate_socket(_) ->
State = st(), State = st(),
{ok, NStats} = emqx_connection:activate_socket(State), {ok, NStats} = emqx_connection:activate_socket(State),
?assertEqual(running, emqx_connection:info(sockstate, NStats)), ?assertEqual(running, emqx_connection:info(sockstate, NStats)),
State1 = st(#{sockstate => blocked}), State1 = st(#{sockstate => blocked}),
?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)), ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)),

View File

@ -121,7 +121,7 @@ t_priority_mqueue(_) ->
?assertEqual(5, ?Q:len(Q5)), ?assertEqual(5, ?Q:len(Q5)),
{_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), {_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
?assertEqual(5, ?Q:len(Q6)), ?assertEqual(5, ?Q:len(Q6)),
{{value, Msg}, Q7} = ?Q:out(Q6), {{value, _Msg}, Q7} = ?Q:out(Q6),
?assertEqual(4, ?Q:len(Q7)). ?assertEqual(4, ?Q:len(Q7)).
t_priority_mqueue_conservation(_) -> t_priority_mqueue_conservation(_) ->

View File

@ -48,6 +48,10 @@ init_per_testcase(TestCase, Config) when
TestCase =/= t_ws_pingreq_before_connected, TestCase =/= t_ws_pingreq_before_connected,
TestCase =/= t_ws_non_check_origin TestCase =/= t_ws_non_check_origin
-> ->
%% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Mock cowboy_req %% Mock cowboy_req
ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end), ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),
@ -95,7 +99,8 @@ end_per_testcase(TestCase, _Config) when
TestCase =/= t_ws_pingreq_before_connected TestCase =/= t_ws_pingreq_before_connected
-> ->
lists:foreach(fun meck:unload/1, lists:foreach(fun meck:unload/1,
[cowboy_req, [emqx_cm,
cowboy_req,
emqx_zone, emqx_zone,
emqx_access_control, emqx_access_control,
emqx_broker, emqx_broker,
@ -389,14 +394,12 @@ t_handle_info_close(_) ->
{[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()). {[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()).
t_handle_info_event(_) -> t_handle_info_event(_) ->
ok = meck:new(emqx_cm, [passthrough, no_history]),
ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end), ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end),
{ok, _} = ?ws_conn:handle_info({event, connected}, st()), {ok, _} = ?ws_conn:handle_info({event, connected}, st()),
{ok, _} = ?ws_conn:handle_info({event, disconnected}, st()), {ok, _} = ?ws_conn:handle_info({event, disconnected}, st()),
{ok, _} = ?ws_conn:handle_info({event, updated}, st()), {ok, _} = ?ws_conn:handle_info({event, updated}, st()).
ok = meck:unload(emqx_cm).
t_handle_timeout_idle_timeout(_) -> t_handle_timeout_idle_timeout(_) ->
TRef = make_ref(), TRef = make_ref(),