609 lines
21 KiB
Erlang
609 lines
21 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_dashboard_monitor_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-import(emqx_dashboard_SUITE, [auth_header_/0]).
|
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
|
-include("emqx_dashboard.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("emqx/include/asserts.hrl").
|
|
|
|
-define(SERVER, "http://127.0.0.1:18083").
|
|
-define(BASE_PATH, "/api/v5").
|
|
|
|
-define(BASE_RETAINER_CONF, <<
|
|
"retainer {\n"
|
|
" enable = true\n"
|
|
" msg_clear_interval = 0s\n"
|
|
" msg_expiry_interval = 0s\n"
|
|
" max_payload_size = 1MB\n"
|
|
" flow_control {\n"
|
|
" batch_read_number = 0\n"
|
|
" batch_deliver_number = 0\n"
|
|
" }\n"
|
|
" backend {\n"
|
|
" type = built_in_database\n"
|
|
" storage_type = ram\n"
|
|
" max_retained_messages = 0\n"
|
|
" }\n"
|
|
"}"
|
|
>>).
|
|
|
|
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% CT boilerplate
|
|
%%--------------------------------------------------------------------
|
|
|
|
all() ->
|
|
[
|
|
{group, common},
|
|
{group, persistent_sessions}
|
|
].
|
|
|
|
groups() ->
|
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
|
PSTCs = persistent_session_testcases(),
|
|
[
|
|
{common, [], AllTCs -- PSTCs},
|
|
{persistent_sessions, [], PSTCs}
|
|
].
|
|
|
|
persistent_session_testcases() ->
|
|
[
|
|
t_persistent_session_stats
|
|
].
|
|
|
|
init_per_suite(Config) ->
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
ok.
|
|
|
|
init_per_group(persistent_sessions = Group, Config) ->
|
|
case emqx_ds_test_helpers:skip_if_norepl() of
|
|
false ->
|
|
AppSpecsFn = fun(Enable) ->
|
|
Port =
|
|
case Enable of
|
|
true -> "18083";
|
|
false -> "0"
|
|
end,
|
|
[
|
|
emqx_conf,
|
|
{emqx, "durable_sessions {enable = true}"},
|
|
{emqx_retainer, ?BASE_RETAINER_CONF},
|
|
emqx_management,
|
|
emqx_mgmt_api_test_util:emqx_dashboard(
|
|
lists:concat([
|
|
"dashboard.listeners.http { bind = " ++ Port ++ " }\n",
|
|
"dashboard.sample_interval = 1s\n",
|
|
"dashboard.listeners.http.enable = " ++ atom_to_list(Enable)
|
|
])
|
|
)
|
|
]
|
|
end,
|
|
NodeSpecs = [
|
|
{dashboard_monitor1, #{apps => AppSpecsFn(true)}},
|
|
{dashboard_monitor2, #{apps => AppSpecsFn(false)}}
|
|
],
|
|
Nodes =
|
|
[N1 | _] = emqx_cth_cluster:start(
|
|
NodeSpecs,
|
|
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
|
|
),
|
|
?ON(N1, {ok, _} = emqx_common_test_http:create_default_app()),
|
|
[{cluster, Nodes} | Config];
|
|
Yes ->
|
|
Yes
|
|
end;
|
|
init_per_group(common = Group, Config) ->
|
|
Apps = emqx_cth_suite:start(
|
|
[
|
|
emqx,
|
|
emqx_conf,
|
|
{emqx_retainer, ?BASE_RETAINER_CONF},
|
|
emqx_management,
|
|
emqx_mgmt_api_test_util:emqx_dashboard(
|
|
"dashboard.listeners.http { enable = true, bind = 18083 }\n"
|
|
"dashboard.sample_interval = 1s"
|
|
)
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
|
|
),
|
|
{ok, _} = emqx_common_test_http:create_default_app(),
|
|
[{apps, Apps} | Config].
|
|
|
|
end_per_group(persistent_sessions, Config) ->
|
|
Cluster = ?config(cluster, Config),
|
|
emqx_cth_cluster:stop(Cluster),
|
|
ok;
|
|
end_per_group(common, Config) ->
|
|
Apps = ?config(apps, Config),
|
|
emqx_cth_suite:stop(Apps),
|
|
ok.
|
|
|
|
init_per_testcase(_TestCase, Config) ->
|
|
ok = snabbkaffe:start_trace(),
|
|
ct:timetrap({seconds, 30}),
|
|
Config.
|
|
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ok = snabbkaffe:stop(),
|
|
emqx_common_test_helpers:call_janitor(),
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test Cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_monitor_samplers_all(_Config) ->
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
Size = mnesia:table_info(emqx_dashboard_monitor, size),
|
|
All = emqx_dashboard_monitor:samplers(all, infinity),
|
|
All2 = emqx_dashboard_monitor:samplers(),
|
|
?assert(erlang:length(All) == Size),
|
|
?assert(erlang:length(All2) == Size),
|
|
ok.
|
|
|
|
t_monitor_samplers_latest(_Config) ->
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
?retry(1_000, 10, begin
|
|
Samplers = emqx_dashboard_monitor:samplers(node(), 2),
|
|
Latest = emqx_dashboard_monitor:samplers(node(), 1),
|
|
?assert(erlang:length(Samplers) == 2, #{samplers => Samplers}),
|
|
?assert(erlang:length(Latest) == 1, #{latest => Latest}),
|
|
?assert(hd(Latest) == lists:nth(2, Samplers))
|
|
end),
|
|
ok.
|
|
|
|
t_monitor_sampler_format(_Config) ->
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_event(#{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
Latest = hd(emqx_dashboard_monitor:samplers(node(), 1)),
|
|
SamplerKeys = maps:keys(Latest),
|
|
[?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST],
|
|
ok.
|
|
|
|
t_handle_old_monitor_data(_Config) ->
|
|
Now = erlang:system_time(second),
|
|
FakeOldData = maps:from_list(
|
|
lists:map(
|
|
fun(N) ->
|
|
Time = (Now - N) * 1000,
|
|
{Time, #{foo => 123}}
|
|
end,
|
|
lists:seq(0, 9)
|
|
)
|
|
),
|
|
|
|
Self = self(),
|
|
|
|
ok = meck:new(emqx, [passthrough, no_history]),
|
|
ok = meck:expect(emqx, running_nodes, fun() -> [node(), 'other@node'] end),
|
|
ok = meck:new(emqx_dashboard_proto_v1, [passthrough, no_history]),
|
|
ok = meck:expect(emqx_dashboard_proto_v1, do_sample, fun('other@node', _Time) ->
|
|
Self ! sample_called,
|
|
FakeOldData
|
|
end),
|
|
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_event(#{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
?assertMatch(
|
|
#{},
|
|
hd(emqx_dashboard_monitor:samplers())
|
|
),
|
|
?assertReceive(sample_called, 1_000),
|
|
ok = meck:unload([emqx, emqx_dashboard_proto_v1]),
|
|
ok.
|
|
|
|
t_monitor_api(_) ->
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
{ok, Samplers} = request(["monitor"], "latest=200"),
|
|
?assert(erlang:length(Samplers) >= 2, #{samplers => Samplers}),
|
|
Fun =
|
|
fun(Sampler) ->
|
|
Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)],
|
|
[?assert(lists:member(SamplerName, Keys)) || SamplerName <- ?SAMPLER_LIST]
|
|
end,
|
|
[Fun(Sampler) || Sampler <- Samplers],
|
|
{ok, NodeSamplers} = request(["monitor", "nodes", node()]),
|
|
[Fun(NodeSampler) || NodeSampler <- NodeSamplers],
|
|
ok.
|
|
|
|
t_monitor_current_api(_) ->
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
{ok, Rate} = request(["monitor_current"]),
|
|
[
|
|
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|
|
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
|
|
%% We rename `durable_subscriptions' key.
|
|
Key =/= durable_subscriptions
|
|
],
|
|
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
|
|
?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)),
|
|
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
|
|
ExpectedKeys = lists:map(
|
|
fun atom_to_binary/1,
|
|
(?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)) -- ?CLUSTERONLY_SAMPLER_LIST
|
|
),
|
|
?assertEqual(
|
|
[],
|
|
ExpectedKeys -- maps:keys(NodeRate),
|
|
NodeRate
|
|
),
|
|
?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)),
|
|
?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)),
|
|
?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)),
|
|
ok.
|
|
|
|
t_monitor_current_api_live_connections(_) ->
|
|
process_flag(trap_exit, true),
|
|
ClientId = <<"live_conn_tests">>,
|
|
ClientId1 = <<"live_conn_tests1">>,
|
|
{ok, C} = emqtt:start_link([{clean_start, false}, {clientid, ClientId}]),
|
|
{ok, _} = emqtt:connect(C),
|
|
ok = emqtt:disconnect(C),
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, ClientId1}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
ok = waiting_emqx_stats_and_monitor_update('live_connections.max'),
|
|
?retry(1_100, 5, begin
|
|
{ok, Rate} = request(["monitor_current"]),
|
|
?assertEqual(1, maps:get(<<"live_connections">>, Rate)),
|
|
?assertEqual(2, maps:get(<<"connections">>, Rate))
|
|
end),
|
|
%% clears
|
|
ok = emqtt:disconnect(C1),
|
|
{ok, C2} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
ok = emqtt:disconnect(C2).
|
|
|
|
t_monitor_current_retained_count(_) ->
|
|
process_flag(trap_exit, true),
|
|
ClientId = <<"live_conn_tests">>,
|
|
{ok, C} = emqtt:start_link([{clean_start, false}, {clientid, ClientId}]),
|
|
{ok, _} = emqtt:connect(C),
|
|
_ = emqtt:publish(C, <<"t1">>, <<"qos1-retain">>, [{qos, 1}, {retain, true}]),
|
|
|
|
ok = waiting_emqx_stats_and_monitor_update('retained.count'),
|
|
{ok, Res} = request(["monitor_current"]),
|
|
{ok, ResNode} = request(["monitor_current", "nodes", node()]),
|
|
|
|
?assertEqual(1, maps:get(<<"retained_msg_count">>, Res)),
|
|
?assertEqual(1, maps:get(<<"retained_msg_count">>, ResNode)),
|
|
ok = emqtt:disconnect(C),
|
|
ok.
|
|
|
|
t_monitor_current_shared_subscription(_) ->
|
|
process_flag(trap_exit, true),
|
|
ShareT = <<"$share/group1/t/1">>,
|
|
AssertFun = fun(Num) ->
|
|
{ok, Res} = request(["monitor_current"]),
|
|
{ok, ResNode} = request(["monitor_current", "nodes", node()]),
|
|
?assertEqual(Num, maps:get(<<"shared_subscriptions">>, Res)),
|
|
?assertEqual(Num, maps:get(<<"shared_subscriptions">>, ResNode)),
|
|
ok
|
|
end,
|
|
|
|
ok = AssertFun(0),
|
|
|
|
ClientId1 = <<"live_conn_tests1">>,
|
|
ClientId2 = <<"live_conn_tests2">>,
|
|
{ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, ClientId1}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
_ = emqtt:subscribe(C1, {ShareT, 1}),
|
|
|
|
ok = ?retry(100, 10, AssertFun(1)),
|
|
|
|
{ok, C2} = emqtt:start_link([{clean_start, true}, {clientid, ClientId2}]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
_ = emqtt:subscribe(C2, {ShareT, 1}),
|
|
ok = ?retry(100, 10, AssertFun(2)),
|
|
|
|
_ = emqtt:unsubscribe(C2, ShareT),
|
|
ok = ?retry(100, 10, AssertFun(1)),
|
|
_ = emqtt:subscribe(C2, {ShareT, 1}),
|
|
ok = ?retry(100, 10, AssertFun(2)),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
%% C1: clean_start = false, proto_ver = 3.1.1
|
|
%% means disconnected but the session pid with a share-subscription is still alive
|
|
ok = ?retry(100, 10, AssertFun(2)),
|
|
|
|
_ = emqx_cm:kick_session(ClientId1),
|
|
ok = ?retry(100, 10, AssertFun(1)),
|
|
|
|
ok = emqtt:disconnect(C2),
|
|
ok = ?retry(100, 10, AssertFun(0)),
|
|
ok.
|
|
|
|
t_monitor_reset(_) ->
|
|
restart_monitor(),
|
|
{ok, Rate} = request(["monitor_current"]),
|
|
[
|
|
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|
|
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
|
|
%% We rename `durable_subscriptions' key.
|
|
Key =/= durable_subscriptions
|
|
],
|
|
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
{ok, Samplers} = request(["monitor"], "latest=1"),
|
|
?assertEqual(1, erlang:length(Samplers)),
|
|
ok.
|
|
|
|
t_monitor_api_error(_) ->
|
|
{error, {404, #{<<"code">> := <<"NOT_FOUND">>}}} =
|
|
request(["monitor", "nodes", 'emqx@127.0.0.2']),
|
|
{error, {404, #{<<"code">> := <<"NOT_FOUND">>}}} =
|
|
request(["monitor_current", "nodes", 'emqx@127.0.0.2']),
|
|
{error, {400, #{<<"code">> := <<"BAD_REQUEST">>}}} =
|
|
request(["monitor"], "latest=0"),
|
|
{error, {400, #{<<"code">> := <<"BAD_REQUEST">>}}} =
|
|
request(["monitor"], "latest=-1"),
|
|
ok.
|
|
|
|
%% Verifies that subscriptions from persistent sessions are correctly accounted for.
|
|
t_persistent_session_stats(Config) ->
|
|
[N1, N2 | _] = ?config(cluster, Config),
|
|
%% pre-condition
|
|
true = ?ON(N1, emqx_persistent_message:is_persistence_enabled()),
|
|
Port1 = get_mqtt_port(N1, tcp),
|
|
Port2 = get_mqtt_port(N2, tcp),
|
|
|
|
NonPSClient = start_and_connect(#{
|
|
port => Port1,
|
|
clientid => <<"non-ps">>,
|
|
expiry_interval => 0
|
|
}),
|
|
PSClient1 = start_and_connect(#{
|
|
port => Port1,
|
|
clientid => <<"ps1">>,
|
|
expiry_interval => 30
|
|
}),
|
|
PSClient2 = start_and_connect(#{
|
|
port => Port2,
|
|
clientid => <<"ps2">>,
|
|
expiry_interval => 30
|
|
}),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic/+">>, 2),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic">>, 2),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic/+">>, 2),
|
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic">>, 2),
|
|
{ok, _} =
|
|
snabbkaffe:block_until(
|
|
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
|
|
infinity
|
|
),
|
|
?retry(1_000, 10, begin
|
|
?assertMatch(
|
|
{ok, #{
|
|
<<"connections">> := 3,
|
|
<<"disconnected_durable_sessions">> := 0,
|
|
%% N.B.: we currently don't perform any deduplication between persistent
|
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
|
%% instead of 6 here.
|
|
<<"topics">> := 8,
|
|
<<"subscriptions">> := 8,
|
|
<<"subscriptions_ram">> := 4,
|
|
<<"subscriptions_durable">> := 4
|
|
}},
|
|
?ON(N1, request(["monitor_current"]))
|
|
)
|
|
end),
|
|
%% Sanity checks
|
|
PSRouteCount = ?ON(N1, emqx_persistent_session_ds_router:stats(n_routes)),
|
|
?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
|
|
PSSubCount = ?ON(N1, emqx_persistent_session_bookkeeper:get_subscription_count()),
|
|
?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
|
|
|
|
%% Now with disconnected but alive persistent sessions
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
emqtt:disconnect(PSClient1),
|
|
#{?snk_kind := dashboard_monitor_flushed}
|
|
),
|
|
?retry(1_000, 10, begin
|
|
?assertMatch(
|
|
{ok, #{
|
|
<<"connections">> := 3,
|
|
<<"disconnected_durable_sessions">> := 1,
|
|
%% N.B.: we currently don't perform any deduplication between persistent
|
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
|
%% instead of 6 here.
|
|
<<"topics">> := 8,
|
|
<<"subscriptions">> := 8,
|
|
<<"subscriptions_ram">> := 4,
|
|
<<"subscriptions_durable">> := 4
|
|
}},
|
|
?ON(N1, request(["monitor_current"]))
|
|
)
|
|
end),
|
|
%% Verify that historical metrics are in line with the current ones.
|
|
?assertMatch(
|
|
{ok, [
|
|
#{
|
|
<<"time_stamp">> := _,
|
|
<<"connections">> := 3,
|
|
<<"disconnected_durable_sessions">> := 1,
|
|
<<"topics">> := 8,
|
|
<<"subscriptions">> := 8,
|
|
<<"subscriptions_ram">> := 4,
|
|
<<"subscriptions_durable">> := 4
|
|
}
|
|
]},
|
|
?ON(N1, request(["monitor"], "latest=1"))
|
|
),
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
emqtt:disconnect(PSClient2),
|
|
#{?snk_kind := dashboard_monitor_flushed}
|
|
),
|
|
?retry(1_000, 10, begin
|
|
?assertMatch(
|
|
{ok, #{
|
|
<<"connections">> := 3,
|
|
<<"disconnected_durable_sessions">> := 2,
|
|
%% N.B.: we currently don't perform any deduplication between persistent
|
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
|
%% instead of 6 here.
|
|
<<"topics">> := 8,
|
|
<<"subscriptions">> := 8,
|
|
<<"subscriptions_ram">> := 4,
|
|
<<"subscriptions_durable">> := 4
|
|
}},
|
|
?ON(N1, request(["monitor_current"]))
|
|
)
|
|
end),
|
|
|
|
ok.
|
|
|
|
request(Path) ->
|
|
request(Path, "").
|
|
|
|
request(Path, QS) ->
|
|
Url = url(Path, QS),
|
|
do_request_api(get, {Url, [auth_header_()]}).
|
|
|
|
url(Parts, QS) ->
|
|
case QS of
|
|
"" ->
|
|
?SERVER ++ filename:join([?BASE_PATH | Parts]);
|
|
_ ->
|
|
?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS
|
|
end.
|
|
|
|
do_request_api(Method, Request) ->
|
|
ct:pal("Req ~p ~p~n", [Method, Request]),
|
|
case httpc:request(Method, Request, [], []) of
|
|
{error, socket_closed_remotely} ->
|
|
{error, socket_closed_remotely};
|
|
{ok, {{"HTTP/1.1", Code, _}, _, Return}} when
|
|
Code >= 200 andalso Code =< 299
|
|
->
|
|
ct:pal("Resp ~p ~p~n", [Code, Return]),
|
|
{ok, emqx_utils_json:decode(Return, [return_maps])};
|
|
{ok, {{"HTTP/1.1", Code, _}, _, Return}} ->
|
|
ct:pal("Resp ~p ~p~n", [Code, Return]),
|
|
{error, {Code, emqx_utils_json:decode(Return, [return_maps])}};
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
restart_monitor() ->
|
|
OldMonitor = erlang:whereis(emqx_dashboard_monitor),
|
|
erlang:exit(OldMonitor, kill),
|
|
?assertEqual(ok, wait_new_monitor(OldMonitor, 10)).
|
|
|
|
wait_new_monitor(_OldMonitor, Count) when Count =< 0 -> timeout;
|
|
wait_new_monitor(OldMonitor, Count) ->
|
|
NewMonitor = erlang:whereis(emqx_dashboard_monitor),
|
|
case is_pid(NewMonitor) andalso NewMonitor =/= OldMonitor of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
timer:sleep(100),
|
|
wait_new_monitor(OldMonitor, Count - 1)
|
|
end.
|
|
|
|
waiting_emqx_stats_and_monitor_update(WaitKey) ->
|
|
Self = self(),
|
|
meck:new(emqx_stats, [passthrough]),
|
|
meck:expect(
|
|
emqx_stats,
|
|
setstat,
|
|
fun(Stat, MaxStat, Val) ->
|
|
(Stat =:= WaitKey orelse MaxStat =:= WaitKey) andalso (Self ! updated),
|
|
meck:passthrough([Stat, MaxStat, Val])
|
|
end
|
|
),
|
|
receive
|
|
updated -> ok
|
|
after 5000 ->
|
|
error(waiting_emqx_stats_update_timeout)
|
|
end,
|
|
meck:unload([emqx_stats]),
|
|
%% manually call monitor update
|
|
_ = emqx_dashboard_monitor:current_rate_cluster(),
|
|
ok.
|
|
|
|
start_and_connect(Opts) ->
|
|
Defaults = #{
|
|
clean_start => false,
|
|
expiry_interval => 30,
|
|
port => 1883
|
|
},
|
|
#{
|
|
clientid := ClientId,
|
|
clean_start := CleanStart,
|
|
expiry_interval := EI,
|
|
port := Port
|
|
} = maps:merge(Defaults, Opts),
|
|
{ok, Client} = emqtt:start_link([
|
|
{clientid, ClientId},
|
|
{clean_start, CleanStart},
|
|
{port, Port},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => EI}}
|
|
]),
|
|
on_exit(fun() ->
|
|
catch emqtt:disconnect(Client, ?RC_NORMAL_DISCONNECTION, #{'Session-Expiry-Interval' => 0})
|
|
end),
|
|
{ok, _} = emqtt:connect(Client),
|
|
Client.
|
|
|
|
get_mqtt_port(Node, Type) ->
|
|
{_IP, Port} = ?ON(Node, emqx_config:get([listeners, Type, default, bind])),
|
|
Port.
|