fix(test): add api test SUITE & bug fix

This commit is contained in:
DDDHuang 2022-02-25 12:44:49 +08:00
parent c21bc9d329
commit 00b83121a4
6 changed files with 188 additions and 106 deletions

View File

@ -40,7 +40,7 @@
-define(DASHBOARD_SHARD, emqx_dashboard_shard).
-ifdef(TEST).
%% for test, 2s
%% for test
-define(DEFAULT_SAMPLE_INTERVAL, 1).
-else.
%% dashboard monitor do sample interval, default 10s

View File

@ -51,6 +51,12 @@
%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds
-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000).
-ifdef(TEST).
-define(RPC_TIMEOUT, 50).
-else.
-define(RPC_TIMEOUT, 5000).
-endif.
-record(state, {
last
}).
@ -75,20 +81,43 @@ samplers() ->
format(do_sample(all, infinity)).
samplers(NodeOrCluster, Latest) ->
Now = erlang:system_time(millisecond),
MatchTime = Now - (Latest * 1000),
case format(do_sample(NodeOrCluster, MatchTime)) of
Time =
case Latest of
infinity ->
infinity;
Latest when is_integer(Latest) ->
Now = erlang:system_time(millisecond),
Now - (Latest * 1000)
end,
case format(do_sample(NodeOrCluster, Time)) of
{badrpc, Reason} ->
{badrpc, Reason};
List when is_list(List) ->
granularity_adapter(List)
end.
granularity_adapter(List) when length(List) > 100 ->
%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI.
%% granularity_adapter is an oversampling of the samples.
%% Use more granular data and reduce data density.
%%
%% [
%% Data1 = #{time => T1, k1 => 1, k2 => 2},
%% Data2 = #{time => T2, k1 => 3, k2 => 4},
%% ...
%% ]
%% After granularity_adapter, Merge Data1 Data2
%%
%% [
%% #{time => T2, k1 => 1 + 3, k2 => 2 + 6},
%% ...
%% ]
%%
granularity_adapter(List) when length(List) > 1000 ->
granularity_adapter(List, []);
granularity_adapter(List) ->
List.
%% Get the current rate. Not the current sampler data.
current_rate() ->
Fun =
fun(Node, Cluster) ->
@ -99,14 +128,19 @@ current_rate() ->
{badrpc, {Node, Reason}}
end
end,
lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)).
case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running))of
{badrpc, Reason} ->
{badrpc, Reason};
Rate ->
{ok, Rate}
end.
current_rate(all) ->
current_rate();
current_rate(Node) when Node == node() ->
do_call(current_rate);
current_rate(Node) ->
case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000) of
case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], ?RPC_TIMEOUT) of
{badrpc, Reason} ->
{badrpc, {Node, Reason}};
{ok, Rate} ->
@ -161,27 +195,33 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
do_call(Request) ->
gen_server:call(?MODULE, Request, 5000).
do_sample(all, MatchTime) ->
Fun =
fun(Node, All) ->
case do_sample(Node, MatchTime) of
{badrpc, Reason} ->
{badrpc, {Node, Reason}};
NodeSamplers ->
merge_cluster_samplers(NodeSamplers, All)
end
end,
lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running));
do_sample(Node, MatchTime) when Node == node() ->
MS = match_spec(MatchTime),
do_sample(all, Time) ->
do_sample(mria_mnesia:cluster_nodes(running), Time, #{});
do_sample(Node, Time) when Node == node() ->
MS = match_spec(Time),
internal_format(ets:select(?TAB, MS));
do_sample(Node, MatchTime) ->
rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, MatchTime], 5000).
do_sample(Node, Time) ->
case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Time], ?RPC_TIMEOUT) of
{badrpc, Reason} ->
{badrpc, {Node, Reason}};
Res ->
Res
end.
do_sample([], _Time, Res) ->
Res;
do_sample([Node | Nodes], Time, Res) ->
case do_sample(Node, Time) of
{badrpc, Reason} ->
{badrpc, Reason};
Samplers ->
do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res))
end.
match_spec(infinity) ->
[{'$1',[],['$1']}];
match_spec(MatchTime) ->
[{{'_', '$1', '_'}, [{'>=', '$1', MatchTime}], ['$_']}].
[{'$1', [], ['$1']}];
match_spec(Time) ->
[{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
merge_cluster_samplers(Node, Cluster) ->
maps:fold(fun merge_cluster_samplers/3, Cluster, Node).

View File

@ -26,17 +26,17 @@ api_spec() ->
paths() ->
[ "/monitor"
, "/monitor/nodes/:node"
, "/monitor/current"
, "/monitor_current"
, "/monitor_current/nodes/:node"
].
schema("/monitor") ->
#{
'operationId' => monitor,
get => #{
tags => [dashboard],
description => <<"List monitor data.">>,
parameters => [
{latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})}
],
parameters => [parameter_latest()],
responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
@ -48,11 +48,9 @@ schema("/monitor/nodes/:node") ->
#{
'operationId' => monitor,
get => #{
tags => [dashboard],
description => <<"List the monitor data on the node.">>,
parameters => [
{node, hoconsc:mk(binary(), #{in => path, nullable => false, example => node()})},
{latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})}
],
parameters => [parameter_node(), parameter_latest()],
responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
@ -60,25 +58,58 @@ schema("/monitor/nodes/:node") ->
}
};
schema("/monitor/current") ->
schema("/monitor_current") ->
#{
'operationId' => monitor_current,
get => #{
description => <<"Current monitor data. Gauge and rate">>,
tags => [dashboard],
description => <<"Current status. Gauge and rate.">>,
responses => #{
200 => hoconsc:mk(hoconsc:ref(sampler_current), #{})
}
}
};
schema("/monitor_current/nodes/:node") ->
#{
'operationId' => monitor_current,
get => #{
tags => [dashboard],
description => <<"Node current status. Gauge and rate.">>,
parameters => [parameter_node()],
responses => #{
200 => hoconsc:mk(hoconsc:ref(sampler_current), #{})
}
}
}.
parameter_latest() ->
Info = #{
in => query,
nullable => true,
example => 5 * 60,
description => <<"The latest N seconds data. Like 300 for 5 min.">>
},
{latest, hoconsc:mk(integer(), Info)}.
parameter_node() ->
Info = #{
in => path,
nullable => false,
example => node(),
description => <<"EMQX node name.">>
},
{node, hoconsc:mk(binary(), Info)}.
fields(sampler) ->
Samplers =
[{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})}
[{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
|| SamplerName <- ?SAMPLER_LIST],
[{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers];
fields(sampler_current) ->
[{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})}
[{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
|| SamplerName <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST].
%% -------------------------------------------------------------------------------------------------
@ -95,8 +126,8 @@ monitor(get, #{query_string := QS, bindings := Bindings}) ->
{200, Samplers}
end.
monitor_current(get, #{query_string := QS}) ->
NodeOrCluster = binary_to_atom(maps:get(<<"node">>, QS, <<"all">>), utf8),
monitor_current(get, #{bindings := Bindings}) ->
NodeOrCluster = binary_to_atom(maps:get(node, Bindings, <<"all">>), utf8),
case emqx_dashboard_monitor:current_rate(NodeOrCluster) of
{ok, CurrentRate} ->
{200, CurrentRate};
@ -108,30 +139,30 @@ monitor_current(get, #{query_string := QS}) ->
%% -------------------------------------------------------------------------------------------------
%% Internal
sampler_desc(received) -> sampler_desc_format("Received messages ");
sampler_desc(received_bytes) -> sampler_desc_format("Received bytes ");
sampler_desc(sent) -> sampler_desc_format("Sent messages ");
sampler_desc(sent_bytes) -> sampler_desc_format("Sent bytes ");
sampler_desc(dropped) -> sampler_desc_format("Dropped messages ");
sampler_desc(subscriptions) ->
swagger_desc(received) -> swagger_desc_format("Received messages ");
swagger_desc(received_bytes) -> swagger_desc_format("Received bytes ");
swagger_desc(sent) -> swagger_desc_format("Sent messages ");
swagger_desc(sent_bytes) -> swagger_desc_format("Sent bytes ");
swagger_desc(dropped) -> swagger_desc_format("Dropped messages ");
swagger_desc(subscriptions) ->
<<"Subscriptions at the time of sampling."
" Can only represent the approximate state">>;
sampler_desc(routes) ->
swagger_desc(routes) ->
<<"Routes at the time of sampling."
" Can only represent the approximate state">>;
sampler_desc(connections) ->
swagger_desc(connections) ->
<<"Connections at the time of sampling."
" Can only represent the approximate state">>;
sampler_desc(received_rate) -> sampler_desc_format("Dropped messages ", per);
sampler_desc(received_bytes_rate) -> sampler_desc_format("Received bytes ", per);
sampler_desc(sent_rate) -> sampler_desc_format("Sent messages ", per);
sampler_desc(sent_bytes_rate) -> sampler_desc_format("Sent bytes ", per);
sampler_desc(dropped_rate) -> sampler_desc_format("Dropped messages ", per).
swagger_desc(received_rate) -> swagger_desc_format("Dropped messages ", per);
swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per);
swagger_desc(sent_rate) -> swagger_desc_format("Sent messages ", per);
swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per);
swagger_desc(dropped_rate) -> swagger_desc_format("Dropped messages ", per).
sampler_desc_format(Format) ->
sampler_desc_format(Format, last).
swagger_desc_format(Format) ->
swagger_desc_format(Format, last).
sampler_desc_format(Format, Type) ->
swagger_desc_format(Format, Type) ->
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).

View File

@ -39,7 +39,6 @@ but use the same port.
""" })}
, {default_username, fun default_username/1}
, {default_password, fun default_password/1}
%% TODO: enum 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s
, {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})}
, {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})}
, {cors, fun cors/1}

View File

@ -19,8 +19,7 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, samplers/1
, samplers/2
]).
-include("emqx_dashboard.hrl").
@ -29,6 +28,6 @@
introduced_in() ->
"5.0.0".
-spec samplers(node()) -> list(map()) | emqx_rpc:badrpc().
samplers(Node) ->
rpc:call(Node, emqx_dashboard_monitor, samplers, [Node]).
-spec samplers(node(), Latest:: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc().
samplers(Node, Latest) ->
rpc:call(Node, emqx_dashboard_monitor, samplers, [Node, Latest]).

View File

@ -56,7 +56,7 @@ set_special_configs(_) ->
t_monitor_samplers_all(_Config) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
Size = mnesia:table_info(emqx_dashboard_monitor,size),
All = emqx_dashboard_monitor:samplers(all),
All = emqx_dashboard_monitor:samplers(all, infinity),
All2 = emqx_dashboard_monitor:samplers(),
?assert(erlang:length(All) == Size),
?assert(erlang:length(All2) == Size),
@ -78,53 +78,66 @@ t_monitor_sampler_format(_Config) ->
[?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST],
ok.
%% TODO: api test
% t_monitor_api(_) ->
% timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
% {ok, Samplers} = request(["monitor"]),
% ?assert(erlang:length(Samplers) >= 2),
% Sample = hd(Samplers),
% Fun =
% fun(Sampler) ->
% Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)],
% [?assert(lists:member(SamplerName, Keys)) || SamplerName<- ?SAMPLER_LIST]
% end,
% [Fun(Sampler) || Sampler <- Samplers],
% ok.
t_monitor_api(_) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, Samplers} = request(["monitor"], "latest=20"),
?assert(erlang:length(Samplers) >= 2),
Fun =
fun(Sampler) ->
Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)],
[?assert(lists:member(SamplerName, Keys)) || SamplerName <- ?SAMPLER_LIST]
end,
[Fun(Sampler) || Sampler <- Samplers],
{ok, NodeSamplers} = request(["monitor", "nodes", node()]),
[Fun(NodeSampler) || NodeSampler <- NodeSamplers],
ok.
% t_monitor_api_error(_) ->
% {error, _Reason} = request(["monitor_a"]),
% ok.
t_monitor_current_api(_) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, Rate} = request(["monitor_current"]),
[?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST],
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
[?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST],
ok.
% request(Path) ->
% request(Path, "").
t_monitor_api_error(_) ->
{error, {400, #{<<"code">> := <<"BAD_RPC">>}}} =
request(["monitor", "nodes", 'emqx@127.0.0.2']),
{error, {400, #{<<"code">> := <<"BAD_RPC">>}}} =
request(["monitor_current", "nodes", 'emqx@127.0.0.2']),
ok.
% request(Path, QS) ->
% Url = url(Path, QS),
% case do_request_api(get, {Path, auth_header_()}) of
% {ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])};
% Error -> Error
% end.
request(Path) ->
request(Path, "").
% url(Parts, QS)->
% case QS of
% "" ->
% ?SERVER ++ filename:join([?BASE_PATH | Parts]);
% _ ->
% ?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS
% end.
request(Path, QS) ->
Url = url(Path, QS),
do_request_api(get, {Url, [auth_header_()]}).
% do_request_api(Method, Request)->
% case httpc:request(Method, Request, [], []) of
% {error, socket_closed_remotely} ->
% {error, socket_closed_remotely};
% {ok, {{"HTTP/1.1", Code, _}, _, Return} }
% when Code >= 200 andalso Code =< 299 ->
% {ok, emqx_json:decode(Return)};
% {ok, Resp} ->
% {error, Resp}
% end.
url(Parts, QS)->
case QS of
"" ->
?SERVER ++ filename:join([?BASE_PATH | Parts]);
_ ->
?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS
end.
% auth_header_() ->
% Basic = binary_to_list(base64:encode(<<"admin:public">>)),
% {"Authorization", "Basic " ++ Basic}.
do_request_api(Method, Request)->
ct:pal("Req ~p ~p~n", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code >= 200 andalso Code =< 299 ->
{ok, emqx_json:decode(Return, [return_maps])};
{ok, {{"HTTP/1.1", Code, _}, _, Return} } ->
{error, {Code, emqx_json:decode(Return, [return_maps])}};
{error, Reason} ->
{error, Reason}
end.
auth_header_() ->
Basic = binary_to_list(base64:encode(<<"admin:public">>)),
{"Authorization", "Basic " ++ Basic}.