Merge pull request #6096 from emqx/live-channel-count

feat(stats): track live channel / client count

Track connected client count

In order to correctly display the number of connected clients in our monitor dashboard, we need to track those connections that are actually connected to clients, not considering connections from persistent sessions that are disconnected. Today, the
connections.count that is displayed in the dashboards considers those disconnected persistent sessions as well.

The new statistics can be found in the `emqx_management` plugin, under `/api/v4/stats`, in the keys `live_connections.{max,count}`.

Ported from #6056 .
This commit is contained in:
Thales Macedo Garitezi 2021-11-10 14:54:34 -03:00 committed by GitHub
commit 6a71cdb055
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 406 additions and 55 deletions

View File

@ -20,7 +20,7 @@
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
]}. ]}.
{plugins, [{rebar3_proper, "0.12.1"}]}. {plugins, [{rebar3_proper, "0.12.1"}]}.

View File

@ -328,10 +328,16 @@ handle_in(Packet = ?AUTH_PACKET(ReasonCode, _Properties),
connecting -> connecting ->
process_connect(NProperties, ensure_connected(NChannel)); process_connect(NProperties, ensure_connected(NChannel));
_ -> _ ->
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel#channel{conn_state = connected}) handle_out( auth
, {?RC_SUCCESS, NProperties}
, NChannel#channel{conn_state = connected}
)
end; end;
{continue, NProperties, NChannel} -> {continue, NProperties, NChannel} ->
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel#channel{conn_state = reauthenticating}); handle_out( auth
, {?RC_CONTINUE_AUTHENTICATION, NProperties}
, NChannel#channel{conn_state = reauthenticating}
);
{error, NReasonCode} -> {error, NReasonCode} ->
case ConnState of case ConnState of
connecting -> connecting ->
@ -632,7 +638,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "dropped_qos2_packet", msg => "dropped_qos2_packet",
reason => emqx_reason_codes:name(RC), reason => emqx_reason_codes:name(RC),
packetId => PacketId packet_id => PacketId
}), }),
ok = emqx_metrics:inc('packets.publish.dropped'), ok = emqx_metrics:inc('packets.publish.dropped'),
handle_out(pubrec, {PacketId, RC}, Channel) handle_out(pubrec, {PacketId, RC}, Channel)
@ -790,7 +796,9 @@ handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings, pendings = Pendings,
session = Session, session = Session,
clientinfo = #{clientid := ClientId}}) -> clientinfo = #{clientid := ClientId}}) ->
NPendings = lists:append(Pendings, emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), NPendings = lists:append(
Pendings,
emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
{ok, Channel#channel{pendings = NPendings}}; {ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{session = Session, handle_deliver(Delivers, Channel = #channel{session = Session,
@ -1365,16 +1373,19 @@ authenticate(?AUTH_PACKET(_, #{'Authentication-Method' := AuthMethod} = Properti
{error, ?RC_BAD_AUTHENTICATION_METHOD} {error, ?RC_BAD_AUTHENTICATION_METHOD}
end. end.
do_authenticate(#{auth_method := AuthMethod} = Credential, #channel{clientinfo = ClientInfo} = Channel) -> do_authenticate(#{auth_method := AuthMethod} = Credential,
#channel{clientinfo = ClientInfo} = Channel) ->
Properties = #{'Authentication-Method' => AuthMethod}, Properties = #{'Authentication-Method' => AuthMethod},
case emqx_access_control:authenticate(Credential) of case emqx_access_control:authenticate(Credential) of
{ok, Result} -> {ok, Result} ->
{ok, Properties, {ok, Properties,
Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, Channel#channel{
clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)},
auth_cache = #{}}}; auth_cache = #{}}};
{ok, Result, AuthData} -> {ok, Result, AuthData} ->
{ok, Properties#{'Authentication-Data' => AuthData}, {ok, Properties#{'Authentication-Data' => AuthData},
Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, Channel#channel{
clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)},
auth_cache = #{}}}; auth_cache = #{}}};
{continue, AuthCache} -> {continue, AuthCache} ->
{continue, Properties, Channel#channel{auth_cache = AuthCache}}; {continue, Properties, Channel#channel{auth_cache = AuthCache}};
@ -1612,6 +1623,8 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) -> clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
ChanPid = self(),
emqx_cm:mark_channel_connected(ChanPid),
Channel#channel{conninfo = NConnInfo, Channel#channel{conninfo = NConnInfo,
conn_state = connected conn_state = connected
}. }.
@ -1697,6 +1710,8 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) -> clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
ChanPid = self(),
emqx_cm:mark_channel_disconnected(ChanPid),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -73,7 +73,11 @@
]). ]).
%% Internal export %% Internal export
-export([stats_fun/0]). -export([ stats_fun/0
, mark_channel_connected/1
, mark_channel_disconnected/1
, get_connected_client_count/0
]).
-type(chan_pid() :: pid()). -type(chan_pid() :: pid()).
@ -81,11 +85,13 @@
-define(CHAN_TAB, emqx_channel). -define(CHAN_TAB, emqx_channel).
-define(CHAN_CONN_TAB, emqx_channel_conn). -define(CHAN_CONN_TAB, emqx_channel_conn).
-define(CHAN_INFO_TAB, emqx_channel_info). -define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).
-define(CHAN_STATS, -define(CHAN_STATS,
[{?CHAN_TAB, 'channels.count', 'channels.max'}, [{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'} {?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
]). ]).
%% Batch drain %% Batch drain
@ -96,6 +102,11 @@
-define(T_TAKEOVER, 15000). -define(T_TAKEOVER, 15000).
%% linting overrides
-elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}
, {elvis_style, god_modules, #{ignore => [emqx_cm]}}
]).
%% @doc Start the channel manager. %% @doc Start the channel manager.
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->
@ -239,7 +250,10 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
pendings => Pendings}}; pendings => Pendings}};
{living, ConnMod, ChanPid, Session} -> {living, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), Session1 = emqx_persistent_session:persist( ClientInfo
, ConnInfo
, Session
),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session1, {ok, #{session => Session1,
@ -248,12 +262,18 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
{expired, OldSession} -> {expired, OldSession} ->
_ = emqx_persistent_session:discard(ClientId, OldSession), _ = emqx_persistent_session:discard(ClientId, OldSession),
Session = create_session(ClientInfo, ConnInfo), Session = create_session(ClientInfo, ConnInfo),
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), Session1 = emqx_persistent_session:persist( ClientInfo
, ConnInfo
, Session
),
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session1, present => false}}; {ok, #{session => Session1, present => false}};
none -> none ->
Session = create_session(ClientInfo, ConnInfo), Session = create_session(ClientInfo, ConnInfo),
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), Session1 = emqx_persistent_session:persist( ClientInfo
, ConnInfo
, Session
),
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session1, present => false}} {ok, #{session => Session1, present => false}}
end end
@ -446,8 +466,10 @@ init([]) ->
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?CHAN_LIVE_TAB, [set, {write_concurrency, true} | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #{chan_pmon => emqx_pmon:new()}}. State = #{chan_pmon => emqx_pmon:new()},
{ok, State}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
@ -456,17 +478,21 @@ handle_call(Req, _From, State) ->
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}),
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(
fun({ChanPid, _ClientID}) ->
mark_channel_disconnected(ChanPid)
end,
Items),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
@ -503,3 +529,18 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
get_chann_conn_mod(ClientId, ChanPid) -> get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
mark_channel_connected(ChanPid) ->
?tp(emqx_cm_connected_client_count_inc, #{}),
ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}),
ok.
mark_channel_disconnected(ChanPid) ->
?tp(emqx_cm_connected_client_count_dec, #{}),
ets:delete(?CHAN_LIVE_TAB, ChanPid),
ok.
get_connected_client_count() ->
case ets:info(?CHAN_LIVE_TAB, size) of
undefined -> 0;
Size -> Size
end.

View File

@ -540,7 +540,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
end, end,
?tp(info, terminate, #{reason => Reason}), ?tp(info, terminate, #{reason => Reason}),
maybe_raise_excption(Reason). maybe_raise_exception(Reason).
%% close socket, discard new state, always return ok. %% close socket, discard new state, always return ok.
close_socket_ok(State) -> close_socket_ok(State) ->
@ -548,12 +548,12 @@ close_socket_ok(State) ->
ok. ok.
%% tell truth about the original exception %% tell truth about the original exception
maybe_raise_excption(#{exception := Exception, maybe_raise_exception(#{exception := Exception,
context := Context, context := Context,
stacktrace := Stacktrace stacktrace := Stacktrace
}) -> }) ->
erlang:raise(Exception, Context, Stacktrace); erlang:raise(Exception, Context, Stacktrace);
maybe_raise_excption(Reason) -> maybe_raise_exception(Reason) ->
exit(Reason). exit(Reason).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -21,6 +21,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% APIs %% APIs
@ -66,8 +67,10 @@
%% Connection stats %% Connection stats
-define(CONNECTION_STATS, -define(CONNECTION_STATS,
['connections.count', %% Count of Concurrent Connections [ 'connections.count' %% Count of Concurrent Connections
'connections.max' %% Maximum Number of Concurrent Connections , 'connections.max' %% Maximum Number of Concurrent Connections
, 'live_connections.count' %% Count of connected clients
, 'live_connections.max' %% Maximum number of connected clients
]). ]).
%% Channel stats %% Channel stats
@ -215,6 +218,11 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
ets:insert(?TAB, {MaxStat, Val}) ets:insert(?TAB, {MaxStat, Val})
end, end,
safe_update_element(Stat, Val), safe_update_element(Stat, Val),
?tp(emqx_stats_setstat,
#{ count_stat => Stat
, max_stat => MaxStat
, value => Val
}),
{noreply, State}; {noreply, State};
handle_cast({update_interval, Update = #update{name = Name}}, handle_cast({update_interval, Update = #update{name = Name}},

View File

@ -23,20 +23,71 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
all() -> emqx_common_test_helpers:all(?MODULE). all() ->
[ {group, all_cases}
, {group, connected_client_count_group}
].
init_per_suite(Config) -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
ConnClientTCs = [ t_connected_client_count_persistent
, t_connected_client_count_anonymous
, t_connected_client_stats
],
OtherTCs = TCs -- ConnClientTCs,
[ {all_cases, [], OtherTCs}
, {connected_client_count_group, [ {group, tcp}
, {group, ws}
, {group, quic}
]}
, {tcp, [], ConnClientTCs}
, {ws, [], ConnClientTCs}
, {quic, [], ConnClientTCs}
].
init_per_group(connected_client_count_group, Config) ->
Config;
init_per_group(tcp, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
[{conn_fun, connect} | Config];
init_per_group(ws, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
[ {ssl, false}
, {enable_websocket, true}
, {conn_fun, ws_connect}
, {port, 8083}
, {host, "localhost"}
| Config
];
init_per_group(quic, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
[ {conn_fun, quic_connect}
, {port, 14567}
| Config];
init_per_group(_Group, Config) ->
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_group(connected_client_count_group, _Config) ->
ok;
end_per_group(_Group, _Config) ->
emqx_common_test_helpers:stop_apps([]). emqx_common_test_helpers:stop_apps([]).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}). ?MODULE:Case({init, Config}).
@ -277,6 +328,236 @@ t_shard({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>), emqx_broker:unsubscribe(<<"topic">>),
ok = meck:unload(emqx_broker_helper). ok = meck:unload(emqx_broker_helper).
%% persistent sessions, when gone, do not contribute to connected
%% client count
t_connected_client_count_persistent({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_persistent(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
ClientID = <<"clientid">>,
?assertEqual(0, emqx_cm:get_connected_client_count()),
{ok, ConnPid0} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid0) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
{ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end,
[emqx_cm_connected_client_count_dec]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% reconnecting
{ok, ConnPid1} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% taking over
{ok, ConnPid2} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config
]),
{{ok, _}, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid2) end,
[ emqx_cm_connected_client_count_inc
, emqx_cm_connected_client_count_dec
],
500
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
ChanPids = emqx_cm:all_channels(),
{ok, {ok, [_, _]}} = wait_for_events(
fun() ->
lists:foreach(
fun(ChanPid) -> exit(ChanPid, kill) end,
ChanPids)
end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_persistent({'end', _Config}) ->
snabbkaffe:stop(),
ok.
%% connections without client_id also contribute to connected client
%% count
t_connected_client_count_anonymous({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_anonymous(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% first client
{ok, ConnPid0} = emqtt:start_link([ {clean_start, true}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid0) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% second client
{ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(2, emqx_cm:get_connected_client_count()),
%% when first client disconnects, shouldn't affect the second
{ok, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% reconnecting
{ok, ConnPid2} = emqtt:start_link([ {clean_start, true}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid2) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(2, emqx_cm:get_connected_client_count()),
{ok, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid1) end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
Chans = emqx_cm:all_channels(),
{ok, {ok, [_, _]}} = wait_for_events(
fun() ->
lists:foreach(
fun(ChanPid) -> exit(ChanPid, kill) end,
Chans)
end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_anonymous({'end', _Config}) ->
snabbkaffe:stop(),
ok.
t_connected_client_stats({init, Config}) ->
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
ok = snabbkaffe:start_trace(),
Config;
t_connected_client_stats(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
?assertEqual(0, emqx_cm:get_connected_client_count()),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(0, emqx_stats:getstat('live_connections.max')),
{ok, ConnPid} = emqtt:start_link([ {clean_start, true}
, {clientid, <<"clientid">>}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid) end,
[emqx_cm_connected_client_count_inc]
),
%% ensure stats are synchronized
{_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(1, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
{ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid) end,
[emqx_cm_connected_client_count_dec]
),
%% ensure stats are synchronized
{_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
ok;
t_connected_client_stats({'end', _Config}) ->
ok = snabbkaffe:stop(),
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
ok.
%% the count must be always non negative
t_connect_client_never_negative({init, Config}) ->
Config;
t_connect_client_never_negative(Config) when is_list(Config) ->
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% would go to -1
ChanPid = list_to_pid("<0.0.1>"),
emqx_cm:mark_channel_disconnected(ChanPid),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% would be 0, if really went to -1
emqx_cm:mark_channel_connected(ChanPid),
?assertEqual(1, emqx_cm:get_connected_client_count()),
ok;
t_connect_client_never_negative({'end', _Config}) ->
ok.
wait_for_events(Action, Kinds) ->
wait_for_events(Action, Kinds, 500).
wait_for_events(Action, Kinds, Timeout) ->
Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds)
end,
N = length(Kinds),
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
wait_for_stats(Action, Stats) ->
Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) ->
Stat = maps:with(
[ count_stat
, max_stat
], Event),
lists:member(Stat, Stats);
(_) ->
false
end,
N = length(Stats),
Timeout = 500,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
recv_msgs(Count) -> recv_msgs(Count) ->
recv_msgs(Count, []). recv_msgs(Count, []).

View File

@ -144,6 +144,8 @@ set_test_listenser_confs() ->
init_per_suite(Config) -> init_per_suite(Config) ->
%% CM Meck %% CM Meck
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Access Control Meck %% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate, ok = meck:expect(emqx_access_control, authenticate,

View File

@ -36,6 +36,8 @@ init_per_suite(Config) ->
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
%% Meck Cm %% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Meck Limiter %% Meck Limiter
ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]), ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
%% Meck Pd %% Meck Pd

View File

@ -48,7 +48,10 @@ init_per_testcase(TestCase, Config) when
TestCase =/= t_ws_pingreq_before_connected, TestCase =/= t_ws_pingreq_before_connected,
TestCase =/= t_ws_non_check_origin TestCase =/= t_ws_non_check_origin
-> ->
emqx_channel_SUITE:set_test_listenser_confs(), %% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Mock cowboy_req %% Mock cowboy_req
ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end), ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),
@ -90,7 +93,8 @@ end_per_testcase(TestCase, _Config) when
TestCase =/= t_ws_pingreq_before_connected TestCase =/= t_ws_pingreq_before_connected
-> ->
lists:foreach(fun meck:unload/1, lists:foreach(fun meck:unload/1,
[cowboy_req, [emqx_cm,
cowboy_req,
emqx_access_control, emqx_access_control,
emqx_broker, emqx_broker,
emqx_hooks, emqx_hooks,
@ -363,14 +367,12 @@ t_handle_info_close(_) ->
{[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()). {[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()).
t_handle_info_event(_) -> t_handle_info_event(_) ->
ok = meck:new(emqx_cm, [passthrough, no_history]),
ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end), ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end), ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end),
{ok, _} = ?ws_conn:handle_info({event, connected}, st()), {ok, _} = ?ws_conn:handle_info({event, connected}, st()),
{ok, _} = ?ws_conn:handle_info({event, disconnected}, st()), {ok, _} = ?ws_conn:handle_info({event, disconnected}, st()),
{ok, _} = ?ws_conn:handle_info({event, updated}, st()), {ok, _} = ?ws_conn:handle_info({event, updated}, st()).
ok = meck:unload(emqx_cm).
t_handle_timeout_idle_timeout(_) -> t_handle_timeout_idle_timeout(_) ->
TRef = make_ref(), TRef = make_ref(),

View File

@ -62,7 +62,7 @@
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}