705 lines
27 KiB
Erlang
705 lines
27 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_broker_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-define(APP, emqx).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
all() ->
|
|
[ {group, all_cases}
|
|
, {group, connected_client_count_group}
|
|
].
|
|
|
|
groups() ->
|
|
TCs = emqx_ct:all(?MODULE),
|
|
ConnClientTCs = [ t_connected_client_count_persistent
|
|
, t_connected_client_count_anonymous
|
|
, t_connected_client_count_transient_takeover
|
|
, t_connected_client_stats
|
|
],
|
|
OtherTCs = TCs -- ConnClientTCs,
|
|
[ {all_cases, [], OtherTCs}
|
|
, {connected_client_count_group, [ {group, tcp}
|
|
, {group, ws}
|
|
]}
|
|
, {tcp, [], ConnClientTCs}
|
|
, {ws, [], ConnClientTCs}
|
|
].
|
|
|
|
init_per_group(connected_client_count_group, Config) ->
|
|
Config;
|
|
init_per_group(tcp, Config) ->
|
|
emqx_ct_helpers:boot_modules(all),
|
|
emqx_ct_helpers:start_apps([]),
|
|
[{conn_fun, connect} | Config];
|
|
init_per_group(ws, Config) ->
|
|
emqx_ct_helpers:boot_modules(all),
|
|
emqx_ct_helpers:start_apps([]),
|
|
[ {ssl, false}
|
|
, {enable_websocket, true}
|
|
, {conn_fun, ws_connect}
|
|
, {port, 8083}
|
|
, {host, "localhost"}
|
|
| Config
|
|
];
|
|
init_per_group(_Group, Config) ->
|
|
emqx_ct_helpers:boot_modules(all),
|
|
emqx_ct_helpers:start_apps([]),
|
|
Config.
|
|
|
|
end_per_group(connected_client_count_group, _Config) ->
|
|
ok;
|
|
end_per_group(_Group, _Config) ->
|
|
emqx_ct_helpers:stop_apps([]).
|
|
|
|
init_per_suite(Config) ->
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
ok.
|
|
|
|
init_per_testcase(Case, Config) ->
|
|
?MODULE:Case({init, Config}).
|
|
|
|
end_per_testcase(Case, Config) ->
|
|
?MODULE:Case({'end', Config}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub Test
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_subscribed({init, Config}) ->
|
|
emqx_broker:subscribe(<<"topic">>),
|
|
Config;
|
|
t_subscribed(Config) when is_list(Config) ->
|
|
?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
|
|
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
|
|
t_subscribed({'end', _Config}) ->
|
|
emqx_broker:unsubscribe(<<"topic">>).
|
|
|
|
t_subscribed_2({init, Config}) ->
|
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
|
Config;
|
|
t_subscribed_2(Config) when is_list(Config) ->
|
|
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
|
|
t_subscribed_2({'end', _Config}) ->
|
|
emqx_broker:unsubscribe(<<"topic">>).
|
|
|
|
t_subopts({init, Config}) -> Config;
|
|
t_subopts(Config) when is_list(Config) ->
|
|
?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
|
|
?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
|
|
?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
|
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
|
|
timer:sleep(200),
|
|
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
|
emqx_broker:get_subopts(self(), <<"topic">>)),
|
|
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
|
emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),
|
|
|
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}),
|
|
?assertEqual(#{nl => 0, qos => 2, rap => 0, rh => 0, subid => <<"clientid">>},
|
|
emqx_broker:get_subopts(self(), <<"topic">>)),
|
|
|
|
?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})),
|
|
?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>},
|
|
emqx_broker:get_subopts(self(), <<"topic">>));
|
|
t_subopts({'end', _Config}) ->
|
|
emqx_broker:unsubscribe(<<"topic">>).
|
|
|
|
t_topics({init, Config}) ->
|
|
Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
|
|
[{topics, Topics} | Config];
|
|
t_topics(Config) when is_list(Config) ->
|
|
Topics = [T1, T2, T3] = proplists:get_value(topics, Config),
|
|
ok = emqx_broker:subscribe(T1, <<"clientId">>),
|
|
ok = emqx_broker:subscribe(T2, <<"clientId">>),
|
|
ok = emqx_broker:subscribe(T3, <<"clientId">>),
|
|
Topics1 = emqx_broker:topics(),
|
|
?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
|
|
case lists:member(Topic, Topics1) of
|
|
true -> Acc;
|
|
false -> false
|
|
end
|
|
end, true, Topics));
|
|
t_topics({'end', Config}) ->
|
|
Topics = proplists:get_value(topics, Config),
|
|
lists:foreach(fun(T) -> emqx_broker:unsubscribe(T) end, Topics).
|
|
|
|
t_subscribers({init, Config}) ->
|
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
|
Config;
|
|
t_subscribers(Config) when is_list(Config) ->
|
|
?assertEqual([self()], emqx_broker:subscribers(<<"topic">>));
|
|
t_subscribers({'end', _Config}) ->
|
|
emqx_broker:unsubscribe(<<"topic">>).
|
|
|
|
t_subscriptions({init, Config}) ->
|
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
|
|
Config;
|
|
t_subscriptions(Config) when is_list(Config) ->
|
|
ct:sleep(100),
|
|
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
|
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
|
|
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
|
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>)));
|
|
t_subscriptions({'end', _Config}) ->
|
|
emqx_broker:unsubscribe(<<"topic">>).
|
|
|
|
t_sub_pub({init, Config}) ->
|
|
ok = emqx_broker:subscribe(<<"topic">>),
|
|
Config;
|
|
t_sub_pub(Config) when is_list(Config) ->
|
|
ct:sleep(100),
|
|
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
|
?assert(
|
|
receive
|
|
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
|
|
true;
|
|
_ ->
|
|
false
|
|
after 100 ->
|
|
false
|
|
end);
|
|
t_sub_pub({'end', _Config}) ->
|
|
ok = emqx_broker:unsubscribe(<<"topic">>).
|
|
|
|
t_nosub_pub({init, Config}) -> Config;
|
|
t_nosub_pub({'end', _Config}) -> ok;
|
|
t_nosub_pub(Config) when is_list(Config) ->
|
|
?assertEqual(0, emqx_metrics:val('messages.dropped')),
|
|
emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
|
?assertEqual(1, emqx_metrics:val('messages.dropped')).
|
|
|
|
t_shared_subscribe({init, Config}) ->
|
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
|
|
ct:sleep(100),
|
|
Config;
|
|
t_shared_subscribe(Config) when is_list(Config) ->
|
|
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
|
?assert(receive
|
|
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
|
|
true;
|
|
Msg ->
|
|
ct:pal("Msg: ~p", [Msg]),
|
|
false
|
|
after 100 ->
|
|
false
|
|
end);
|
|
t_shared_subscribe({'end', _Config}) ->
|
|
emqx_broker:unsubscribe(<<"$share/group/topic">>).
|
|
|
|
t_shared_subscribe_2({init, Config}) -> Config;
|
|
t_shared_subscribe_2({'end', _Config}) -> ok;
|
|
t_shared_subscribe_2(_) ->
|
|
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
|
|
{ok, _} = emqtt:connect(ConnPid),
|
|
{ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),
|
|
|
|
{ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
{ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group2/topic">>, 0),
|
|
|
|
ct:sleep(10),
|
|
ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
|
|
Msgs = recv_msgs(2),
|
|
?assertEqual(2, length(Msgs)),
|
|
?assertEqual(true, lists:foldl(fun(#{payload := <<"hello">>, topic := <<"topic">>}, Acc) ->
|
|
Acc;
|
|
(_, _) ->
|
|
false
|
|
end, true, Msgs)),
|
|
emqtt:disconnect(ConnPid),
|
|
emqtt:disconnect(ConnPid2).
|
|
|
|
t_shared_subscribe_3({init, Config}) -> Config;
|
|
t_shared_subscribe_3({'end', _Config}) -> ok;
|
|
t_shared_subscribe_3(_) ->
|
|
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
|
|
{ok, _} = emqtt:connect(ConnPid),
|
|
{ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),
|
|
|
|
{ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
|
|
{ok, _} = emqtt:connect(ConnPid2),
|
|
{ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group/topic">>, 0),
|
|
|
|
ct:sleep(10),
|
|
ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
|
|
Msgs = recv_msgs(2),
|
|
?assertEqual(1, length(Msgs)),
|
|
emqtt:disconnect(ConnPid),
|
|
emqtt:disconnect(ConnPid2).
|
|
|
|
t_shard({init, Config}) ->
|
|
ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
|
|
ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
|
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
|
Config;
|
|
t_shard(Config) when is_list(Config) ->
|
|
ct:sleep(100),
|
|
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
|
?assert(
|
|
receive
|
|
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
|
|
true;
|
|
_ ->
|
|
false
|
|
after 100 ->
|
|
false
|
|
end);
|
|
t_shard({'end', _Config}) ->
|
|
emqx_broker:unsubscribe(<<"topic">>),
|
|
ok = meck:unload(emqx_broker_helper).
|
|
|
|
t_stats_fun({init, Config}) ->
|
|
Parent = self(),
|
|
F = fun Loop() ->
|
|
N1 = emqx_stats:getstat('subscribers.count'),
|
|
N2 = emqx_stats:getstat('subscriptions.count'),
|
|
N3 = emqx_stats:getstat('suboptions.count'),
|
|
case N1 + N2 + N3 =:= 0 of
|
|
true ->
|
|
Parent ! {ready, self()},
|
|
exit(normal);
|
|
false ->
|
|
receive
|
|
stop ->
|
|
exit(normal)
|
|
after
|
|
100 ->
|
|
Loop()
|
|
end
|
|
end
|
|
end,
|
|
Pid = spawn_link(F),
|
|
receive
|
|
{ready, P} when P =:= Pid->
|
|
Config
|
|
after
|
|
5000 ->
|
|
Pid ! stop,
|
|
ct:fail("timedout_waiting_for_sub_stats_to_reach_zero")
|
|
end;
|
|
t_stats_fun(Config) when is_list(Config) ->
|
|
ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
|
ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
|
|
%% ensure stats refreshed
|
|
emqx_broker:stats_fun(),
|
|
%% emqx_stats:set_stat is a gen_server cast
|
|
%% make a synced call sync
|
|
ignored = gen_server:call(emqx_stats, call, infinity),
|
|
?assertEqual(2, emqx_stats:getstat('subscribers.count')),
|
|
?assertEqual(2, emqx_stats:getstat('subscribers.max')),
|
|
?assertEqual(2, emqx_stats:getstat('subscriptions.count')),
|
|
?assertEqual(2, emqx_stats:getstat('subscriptions.max')),
|
|
?assertEqual(2, emqx_stats:getstat('suboptions.count')),
|
|
?assertEqual(2, emqx_stats:getstat('suboptions.max'));
|
|
t_stats_fun({'end', _Config}) ->
|
|
ok = emqx_broker:unsubscribe(<<"topic">>),
|
|
ok = emqx_broker:unsubscribe(<<"topic2">>).
|
|
|
|
%% persistent sessions, when gone, do not contribute to connected
|
|
%% client count
|
|
t_connected_client_count_persistent({init, Config}) ->
|
|
ok = snabbkaffe:start_trace(),
|
|
process_flag(trap_exit, true),
|
|
Config;
|
|
t_connected_client_count_persistent(Config) when is_list(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientID = <<"clientid">>,
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
{ok, ConnPid0} = emqtt:start_link([ {clean_start, false}
|
|
, {clientid, ClientID}
|
|
| Config]),
|
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid0) end,
|
|
[emqx_cm_connected_client_count_inc]
|
|
),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
{ok, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:disconnect(ConnPid0) end,
|
|
[emqx_cm_connected_client_count_dec]
|
|
),
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
%% reconnecting
|
|
{ok, ConnPid1} = emqtt:start_link([ {clean_start, false}
|
|
, {clientid, ClientID}
|
|
| Config
|
|
]),
|
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid1) end,
|
|
[emqx_cm_connected_client_count_inc]
|
|
),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
%% taking over
|
|
{ok, ConnPid2} = emqtt:start_link([ {clean_start, false}
|
|
, {clientid, ClientID}
|
|
| Config
|
|
]),
|
|
{{ok, _}, {ok, [_, _]}} = wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid2) end,
|
|
[ emqx_cm_connected_client_count_inc
|
|
, emqx_cm_connected_client_count_dec
|
|
],
|
|
500
|
|
),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
%% abnormal exit of channel process
|
|
ChanPids = emqx_cm:all_channels(),
|
|
{ok, {ok, [_, _]}} = wait_for_events(
|
|
fun() ->
|
|
lists:foreach(
|
|
fun(ChanPid) -> exit(ChanPid, kill) end,
|
|
ChanPids)
|
|
end,
|
|
[ emqx_cm_connected_client_count_dec
|
|
, emqx_cm_process_down
|
|
]
|
|
),
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
ok;
|
|
t_connected_client_count_persistent({'end', _Config}) ->
|
|
snabbkaffe:stop(),
|
|
ok.
|
|
|
|
%% connections without client_id also contribute to connected client
|
|
%% count
|
|
t_connected_client_count_anonymous({init, Config}) ->
|
|
ok = snabbkaffe:start_trace(),
|
|
process_flag(trap_exit, true),
|
|
Config;
|
|
t_connected_client_count_anonymous(Config) when is_list(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
%% first client
|
|
{ok, ConnPid0} = emqtt:start_link([ {clean_start, true}
|
|
| Config]),
|
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid0) end,
|
|
[emqx_cm_connected_client_count_inc]
|
|
),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
%% second client
|
|
{ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
|
|
| Config]),
|
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid1) end,
|
|
[emqx_cm_connected_client_count_inc]
|
|
),
|
|
?assertEqual(2, emqx_cm:get_connected_client_count()),
|
|
%% when first client disconnects, shouldn't affect the second
|
|
{ok, {ok, [_, _]}} = wait_for_events(
|
|
fun() -> emqtt:disconnect(ConnPid0) end,
|
|
[ emqx_cm_connected_client_count_dec
|
|
, emqx_cm_process_down
|
|
]
|
|
),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
%% reconnecting
|
|
{ok, ConnPid2} = emqtt:start_link([ {clean_start, true}
|
|
| Config
|
|
]),
|
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid2) end,
|
|
[emqx_cm_connected_client_count_inc]
|
|
),
|
|
?assertEqual(2, emqx_cm:get_connected_client_count()),
|
|
{ok, {ok, [_, _]}} = wait_for_events(
|
|
fun() -> emqtt:disconnect(ConnPid1) end,
|
|
[ emqx_cm_connected_client_count_dec
|
|
, emqx_cm_process_down
|
|
]
|
|
),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
%% abnormal exit of channel process
|
|
Chans = emqx_cm:all_channels(),
|
|
{ok, {ok, [_, _]}} = wait_for_events(
|
|
fun() ->
|
|
lists:foreach(
|
|
fun(ChanPid) -> exit(ChanPid, kill) end,
|
|
Chans)
|
|
end,
|
|
[ emqx_cm_connected_client_count_dec
|
|
, emqx_cm_process_down
|
|
]
|
|
),
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
ok;
|
|
t_connected_client_count_anonymous({'end', _Config}) ->
|
|
snabbkaffe:stop(),
|
|
ok.
|
|
|
|
t_connected_client_count_transient_takeover({init, Config}) ->
|
|
ok = snabbkaffe:start_trace(),
|
|
process_flag(trap_exit, true),
|
|
Config;
|
|
t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
ClientID = <<"clientid">>,
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
%% we spawn several clients simultaneously to cause the race
|
|
%% condition for the client id lock
|
|
NumClients = 20,
|
|
{ok, {ok, [_, _]}} =
|
|
wait_for_events(
|
|
fun() ->
|
|
lists:foreach(
|
|
fun(_) ->
|
|
spawn(
|
|
fun() ->
|
|
{ok, ConnPid} =
|
|
emqtt:start_link([ {clean_start, true}
|
|
, {clientid, ClientID}
|
|
| Config]),
|
|
%% don't assert the result: most of them fail
|
|
%% during the race
|
|
emqtt:ConnFun(ConnPid),
|
|
ok
|
|
end),
|
|
ok
|
|
end,
|
|
lists:seq(1, NumClients))
|
|
end,
|
|
%% there can be only one channel that wins the race for the
|
|
%% lock for this client id. we also expect a decrement
|
|
%% event because the client dies along with the ephemeral
|
|
%% process.
|
|
[ emqx_cm_connected_client_count_inc
|
|
, emqx_cm_connected_client_count_dec
|
|
],
|
|
1000),
|
|
%% Since more than one pair of inc/dec may be emitted, we need to
|
|
%% wait for full stabilization
|
|
timer:sleep(100),
|
|
%% It must be 0 again because we spawn-linked the clients in
|
|
%% ephemeral processes above, and all should be dead now.
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
%% connecting again
|
|
{ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
|
|
, {clientid, ClientID}
|
|
| Config
|
|
]),
|
|
{{ok, _}, {ok, [_]}} =
|
|
wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid1) end,
|
|
[emqx_cm_connected_client_count_inc]
|
|
),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
%% abnormal exit of channel process
|
|
[ChanPid] = emqx_cm:all_channels(),
|
|
{ok, {ok, [_, _]}} =
|
|
wait_for_events(
|
|
fun() ->
|
|
exit(ChanPid, kill),
|
|
ok
|
|
end,
|
|
[ emqx_cm_connected_client_count_dec
|
|
, emqx_cm_process_down
|
|
]
|
|
),
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
ok;
|
|
t_connected_client_count_transient_takeover({'end', _Config}) ->
|
|
snabbkaffe:stop(),
|
|
ok.
|
|
|
|
t_connected_client_stats({init, Config}) ->
|
|
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
|
|
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
|
ok = snabbkaffe:start_trace(),
|
|
Config;
|
|
t_connected_client_stats(Config) when is_list(Config) ->
|
|
ConnFun = ?config(conn_fun, Config),
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
|
|
?assertEqual(0, emqx_stats:getstat('live_connections.max')),
|
|
{ok, ConnPid} = emqtt:start_link([ {clean_start, true}
|
|
, {clientid, <<"clientid">>}
|
|
| Config
|
|
]),
|
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:ConnFun(ConnPid) end,
|
|
[emqx_cm_connected_client_count_inc]
|
|
),
|
|
%% ensure stats are synchronized
|
|
{_, {ok, [_]}} = wait_for_stats(
|
|
fun emqx_cm:stats_fun/0,
|
|
[#{count_stat => 'live_connections.count',
|
|
max_stat => 'live_connections.max'}]
|
|
),
|
|
?assertEqual(1, emqx_stats:getstat('live_connections.count')),
|
|
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
|
|
{ok, {ok, [_]}} = wait_for_events(
|
|
fun() -> emqtt:disconnect(ConnPid) end,
|
|
[emqx_cm_connected_client_count_dec]
|
|
),
|
|
%% ensure stats are synchronized
|
|
{_, {ok, [_]}} = wait_for_stats(
|
|
fun emqx_cm:stats_fun/0,
|
|
[#{count_stat => 'live_connections.count',
|
|
max_stat => 'live_connections.max'}]
|
|
),
|
|
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
|
|
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
|
|
ok;
|
|
t_connected_client_stats({'end', _Config}) ->
|
|
ok = snabbkaffe:stop(),
|
|
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
|
|
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
|
ok.
|
|
|
|
%% the count must be always non negative
|
|
t_connect_client_never_negative({init, Config}) ->
|
|
Config;
|
|
t_connect_client_never_negative(Config) when is_list(Config) ->
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
%% would go to -1
|
|
ChanPid = list_to_pid("<0.0.1>"),
|
|
emqx_cm:mark_channel_disconnected(ChanPid),
|
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
%% would be 0, if really went to -1
|
|
emqx_cm:mark_channel_connected(ChanPid),
|
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
ok;
|
|
t_connect_client_never_negative({'end', _Config}) ->
|
|
ok.
|
|
|
|
t_connack_auth_error({init, Config}) ->
|
|
process_flag(trap_exit, true),
|
|
emqx_ct_helpers:stop_apps([]),
|
|
emqx_ct_helpers:boot_modules(all),
|
|
Handler =
|
|
fun(emqx) ->
|
|
application:set_env(emqx, acl_nomatch, deny),
|
|
application:set_env(emqx, allow_anonymous, false),
|
|
application:set_env(emqx, enable_acl_cache, false),
|
|
ok;
|
|
(_) ->
|
|
ok
|
|
end,
|
|
emqx_ct_helpers:start_apps([], Handler),
|
|
Config;
|
|
t_connack_auth_error({'end', _Config}) ->
|
|
emqx_ct_helpers:stop_apps([]),
|
|
emqx_ct_helpers:boot_modules(all),
|
|
emqx_ct_helpers:start_apps([]),
|
|
ok;
|
|
t_connack_auth_error(Config) when is_list(Config) ->
|
|
%% MQTT 3.1
|
|
?assertEqual(0, emqx_metrics:val('packets.connack.auth_error')),
|
|
{ok, C0} = emqtt:start_link([{proto_ver, v4}]),
|
|
?assertEqual({error, {unauthorized_client, undefined}}, emqtt:connect(C0)),
|
|
?assertEqual(1, emqx_metrics:val('packets.connack.auth_error')),
|
|
%% MQTT 5.0
|
|
{ok, C1} = emqtt:start_link([{proto_ver, v5}]),
|
|
?assertEqual({error, {not_authorized, #{}}}, emqtt:connect(C1)),
|
|
?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
|
|
ok.
|
|
|
|
t_handle_in_empty_client_subscribe_hook({init, Config}) ->
|
|
Config;
|
|
t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
|
|
ok;
|
|
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
|
|
Hook = fun(_ClientInfo, _Username, TopicFilter) ->
|
|
EmptyFilters = [{T, Opts#{deny_subscription => true}} || {T, Opts} <- TopicFilter],
|
|
{stop, EmptyFilters}
|
|
end,
|
|
ok = emqx:hook('client.subscribe', Hook, []),
|
|
try
|
|
{ok, C} = emqtt:start_link(),
|
|
{ok, _} = emqtt:connect(C),
|
|
{ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
|
|
?assertEqual([?RC_UNSPECIFIED_ERROR], RCs),
|
|
ok
|
|
after
|
|
ok = emqx:unhook('client.subscribe', Hook)
|
|
end.
|
|
|
|
wait_for_events(Action, Kinds) ->
|
|
wait_for_events(Action, Kinds, 500).
|
|
|
|
wait_for_events(Action, Kinds, Timeout) ->
|
|
Predicate = fun(#{?snk_kind := K}) ->
|
|
lists:member(K, Kinds)
|
|
end,
|
|
N = length(Kinds),
|
|
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
|
|
Res = Action(),
|
|
case snabbkaffe_collector:receive_events(Sub) of
|
|
{timeout, _} ->
|
|
{Res, timeout};
|
|
{ok, Events} ->
|
|
{Res, {ok, Events}}
|
|
end.
|
|
|
|
wait_for_stats(Action, Stats) ->
|
|
Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) ->
|
|
Stat = maps:with(
|
|
[ count_stat
|
|
, max_stat
|
|
], Event),
|
|
lists:member(Stat, Stats);
|
|
(_) ->
|
|
false
|
|
end,
|
|
N = length(Stats),
|
|
Timeout = 500,
|
|
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
|
|
Res = Action(),
|
|
case snabbkaffe_collector:receive_events(Sub) of
|
|
{timeout, _} ->
|
|
{Res, timeout};
|
|
{ok, Events} ->
|
|
{Res, {ok, Events}}
|
|
end.
|
|
|
|
insert_fake_channels() ->
|
|
%% Insert copies to simulate missed counts
|
|
Tab = emqx_channel_info,
|
|
Key = ets:first(Tab),
|
|
[{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key),
|
|
ets:insert(Tab, [ {{"fake" ++ integer_to_list(N), undefined}, ChanInfo, Stats}
|
|
|| N <- lists:seq(1, 9)]),
|
|
%% these should not be counted
|
|
ets:insert(Tab, [ { {"fake" ++ integer_to_list(N), undefined}
|
|
, ChanInfo#{conn_state := disconnected}, Stats}
|
|
|| N <- lists:seq(10, 20)]).
|
|
|
|
recv_msgs(Count) ->
|
|
recv_msgs(Count, []).
|
|
|
|
recv_msgs(0, Msgs) ->
|
|
Msgs;
|
|
recv_msgs(Count, Msgs) ->
|
|
receive
|
|
{publish, Msg} ->
|
|
recv_msgs(Count-1, [Msg|Msgs]);
|
|
_Other -> recv_msgs(Count, Msgs)
|
|
after 100 ->
|
|
Msgs
|
|
end.
|