From 00b83121a4fada5278293ce97e214806e2509dd2 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 25 Feb 2022 12:44:49 +0800 Subject: [PATCH] fix(test): add api test SUITE & bug fix --- .../emqx_dashboard/include/emqx_dashboard.hrl | 2 +- .../src/emqx_dashboard_monitor.erl | 88 +++++++++++---- .../src/emqx_dashboard_monitor_api.erl | 91 +++++++++++----- .../src/emqx_dashboard_schema.erl | 1 - .../src/proto/emqx_dashboard_proto_v1.erl | 9 +- .../test/emqx_dashboard_monitor_SUITE.erl | 103 ++++++++++-------- 6 files changed, 188 insertions(+), 106 deletions(-) diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index c8a2f8a1f..2d1d28d2f 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -40,7 +40,7 @@ -define(DASHBOARD_SHARD, emqx_dashboard_shard). -ifdef(TEST). -%% for test, 2s +%% for test -define(DEFAULT_SAMPLE_INTERVAL, 1). -else. %% dashboard monitor do sample interval, default 10s diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 1eeb5b68f..6384405c9 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -51,6 +51,12 @@ %% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds -define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000). +-ifdef(TEST). +-define(RPC_TIMEOUT, 50). +-else. +-define(RPC_TIMEOUT, 5000). +-endif. + -record(state, { last }). @@ -75,20 +81,43 @@ samplers() -> format(do_sample(all, infinity)). samplers(NodeOrCluster, Latest) -> - Now = erlang:system_time(millisecond), - MatchTime = Now - (Latest * 1000), - case format(do_sample(NodeOrCluster, MatchTime)) of + Time = + case Latest of + infinity -> + infinity; + Latest when is_integer(Latest) -> + Now = erlang:system_time(millisecond), + Now - (Latest * 1000) + end, + case format(do_sample(NodeOrCluster, Time)) of {badrpc, Reason} -> {badrpc, Reason}; List when is_list(List) -> granularity_adapter(List) end. -granularity_adapter(List) when length(List) > 100 -> +%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI. +%% granularity_adapter is an oversampling of the samples. +%% Use more granular data and reduce data density. +%% +%% [ +%% Data1 = #{time => T1, k1 => 1, k2 => 2}, +%% Data2 = #{time => T2, k1 => 3, k2 => 4}, +%% ... +%% ] +%% After granularity_adapter, Merge Data1 Data2 +%% +%% [ +%% #{time => T2, k1 => 1 + 3, k2 => 2 + 6}, +%% ... +%% ] +%% +granularity_adapter(List) when length(List) > 1000 -> granularity_adapter(List, []); granularity_adapter(List) -> List. +%% Get the current rate. Not the current sampler data. current_rate() -> Fun = fun(Node, Cluster) -> @@ -99,14 +128,19 @@ current_rate() -> {badrpc, {Node, Reason}} end end, - lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)). + case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running))of + {badrpc, Reason} -> + {badrpc, Reason}; + Rate -> + {ok, Rate} + end. current_rate(all) -> current_rate(); current_rate(Node) when Node == node() -> do_call(current_rate); current_rate(Node) -> - case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000) of + case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], ?RPC_TIMEOUT) of {badrpc, Reason} -> {badrpc, {Node, Reason}}; {ok, Rate} -> @@ -161,27 +195,33 @@ code_change(_OldVsn, State = #state{}, _Extra) -> do_call(Request) -> gen_server:call(?MODULE, Request, 5000). -do_sample(all, MatchTime) -> - Fun = - fun(Node, All) -> - case do_sample(Node, MatchTime) of - {badrpc, Reason} -> - {badrpc, {Node, Reason}}; - NodeSamplers -> - merge_cluster_samplers(NodeSamplers, All) - end - end, - lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)); -do_sample(Node, MatchTime) when Node == node() -> - MS = match_spec(MatchTime), +do_sample(all, Time) -> + do_sample(mria_mnesia:cluster_nodes(running), Time, #{}); +do_sample(Node, Time) when Node == node() -> + MS = match_spec(Time), internal_format(ets:select(?TAB, MS)); -do_sample(Node, MatchTime) -> - rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, MatchTime], 5000). +do_sample(Node, Time) -> + case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Time], ?RPC_TIMEOUT) of + {badrpc, Reason} -> + {badrpc, {Node, Reason}}; + Res -> + Res + end. + +do_sample([], _Time, Res) -> + Res; +do_sample([Node | Nodes], Time, Res) -> + case do_sample(Node, Time) of + {badrpc, Reason} -> + {badrpc, Reason}; + Samplers -> + do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res)) + end. match_spec(infinity) -> - [{'$1',[],['$1']}]; -match_spec(MatchTime) -> - [{{'_', '$1', '_'}, [{'>=', '$1', MatchTime}], ['$_']}]. + [{'$1', [], ['$1']}]; +match_spec(Time) -> + [{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}]. merge_cluster_samplers(Node, Cluster) -> maps:fold(fun merge_cluster_samplers/3, Cluster, Node). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index e0662c972..ee761fa9c 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -26,17 +26,17 @@ api_spec() -> paths() -> [ "/monitor" , "/monitor/nodes/:node" - , "/monitor/current" + , "/monitor_current" + , "/monitor_current/nodes/:node" ]. schema("/monitor") -> #{ 'operationId' => monitor, get => #{ + tags => [dashboard], description => <<"List monitor data.">>, - parameters => [ - {latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})} - ], + parameters => [parameter_latest()], responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) @@ -48,11 +48,9 @@ schema("/monitor/nodes/:node") -> #{ 'operationId' => monitor, get => #{ + tags => [dashboard], description => <<"List the monitor data on the node.">>, - parameters => [ - {node, hoconsc:mk(binary(), #{in => path, nullable => false, example => node()})}, - {latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})} - ], + parameters => [parameter_node(), parameter_latest()], responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) @@ -60,25 +58,58 @@ schema("/monitor/nodes/:node") -> } }; -schema("/monitor/current") -> +schema("/monitor_current") -> #{ 'operationId' => monitor_current, get => #{ - description => <<"Current monitor data. Gauge and rate">>, + tags => [dashboard], + description => <<"Current status. Gauge and rate.">>, + responses => #{ + 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}) + } + } + }; + +schema("/monitor_current/nodes/:node") -> + #{ + 'operationId' => monitor_current, + get => #{ + tags => [dashboard], + description => <<"Node current status. Gauge and rate.">>, + parameters => [parameter_node()], responses => #{ 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}) } } }. +parameter_latest() -> + Info = #{ + in => query, + nullable => true, + example => 5 * 60, + description => <<"The latest N seconds data. Like 300 for 5 min.">> + }, + {latest, hoconsc:mk(integer(), Info)}. + +parameter_node() -> + Info = #{ + in => path, + nullable => false, + example => node(), + description => <<"EMQX node name.">> + }, + {node, hoconsc:mk(binary(), Info)}. + + fields(sampler) -> Samplers = - [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} + [{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})} || SamplerName <- ?SAMPLER_LIST], [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]; fields(sampler_current) -> - [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} + [{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})} || SamplerName <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST]. %% ------------------------------------------------------------------------------------------------- @@ -95,8 +126,8 @@ monitor(get, #{query_string := QS, bindings := Bindings}) -> {200, Samplers} end. -monitor_current(get, #{query_string := QS}) -> - NodeOrCluster = binary_to_atom(maps:get(<<"node">>, QS, <<"all">>), utf8), +monitor_current(get, #{bindings := Bindings}) -> + NodeOrCluster = binary_to_atom(maps:get(node, Bindings, <<"all">>), utf8), case emqx_dashboard_monitor:current_rate(NodeOrCluster) of {ok, CurrentRate} -> {200, CurrentRate}; @@ -108,30 +139,30 @@ monitor_current(get, #{query_string := QS}) -> %% ------------------------------------------------------------------------------------------------- %% Internal -sampler_desc(received) -> sampler_desc_format("Received messages "); -sampler_desc(received_bytes) -> sampler_desc_format("Received bytes "); -sampler_desc(sent) -> sampler_desc_format("Sent messages "); -sampler_desc(sent_bytes) -> sampler_desc_format("Sent bytes "); -sampler_desc(dropped) -> sampler_desc_format("Dropped messages "); -sampler_desc(subscriptions) -> +swagger_desc(received) -> swagger_desc_format("Received messages "); +swagger_desc(received_bytes) -> swagger_desc_format("Received bytes "); +swagger_desc(sent) -> swagger_desc_format("Sent messages "); +swagger_desc(sent_bytes) -> swagger_desc_format("Sent bytes "); +swagger_desc(dropped) -> swagger_desc_format("Dropped messages "); +swagger_desc(subscriptions) -> <<"Subscriptions at the time of sampling." " Can only represent the approximate state">>; -sampler_desc(routes) -> +swagger_desc(routes) -> <<"Routes at the time of sampling." " Can only represent the approximate state">>; -sampler_desc(connections) -> +swagger_desc(connections) -> <<"Connections at the time of sampling." " Can only represent the approximate state">>; -sampler_desc(received_rate) -> sampler_desc_format("Dropped messages ", per); -sampler_desc(received_bytes_rate) -> sampler_desc_format("Received bytes ", per); -sampler_desc(sent_rate) -> sampler_desc_format("Sent messages ", per); -sampler_desc(sent_bytes_rate) -> sampler_desc_format("Sent bytes ", per); -sampler_desc(dropped_rate) -> sampler_desc_format("Dropped messages ", per). +swagger_desc(received_rate) -> swagger_desc_format("Dropped messages ", per); +swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per); +swagger_desc(sent_rate) -> swagger_desc_format("Sent messages ", per); +swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per); +swagger_desc(dropped_rate) -> swagger_desc_format("Dropped messages ", per). -sampler_desc_format(Format) -> - sampler_desc_format(Format, last). +swagger_desc_format(Format) -> + swagger_desc_format(Format, last). -sampler_desc_format(Format, Type) -> +swagger_desc_format(Format, Type) -> Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL), list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index efd890bec..f112fc852 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -39,7 +39,6 @@ but use the same port. """ })} , {default_username, fun default_username/1} , {default_password, fun default_password/1} - %% TODO: enum 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s , {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})} , {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})} , {cors, fun cors/1} diff --git a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl index 09c3ce66c..6a4f3fb3a 100644 --- a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl +++ b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl @@ -19,8 +19,7 @@ -behaviour(emqx_bpapi). -export([ introduced_in/0 - - , samplers/1 + , samplers/2 ]). -include("emqx_dashboard.hrl"). @@ -29,6 +28,6 @@ introduced_in() -> "5.0.0". --spec samplers(node()) -> list(map()) | emqx_rpc:badrpc(). -samplers(Node) -> - rpc:call(Node, emqx_dashboard_monitor, samplers, [Node]). +-spec samplers(node(), Latest:: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc(). +samplers(Node, Latest) -> + rpc:call(Node, emqx_dashboard_monitor, samplers, [Node, Latest]). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 43d8d0712..88da67245 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -56,7 +56,7 @@ set_special_configs(_) -> t_monitor_samplers_all(_Config) -> timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), Size = mnesia:table_info(emqx_dashboard_monitor,size), - All = emqx_dashboard_monitor:samplers(all), + All = emqx_dashboard_monitor:samplers(all, infinity), All2 = emqx_dashboard_monitor:samplers(), ?assert(erlang:length(All) == Size), ?assert(erlang:length(All2) == Size), @@ -78,53 +78,66 @@ t_monitor_sampler_format(_Config) -> [?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST], ok. -%% TODO: api test -% t_monitor_api(_) -> -% timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), -% {ok, Samplers} = request(["monitor"]), -% ?assert(erlang:length(Samplers) >= 2), -% Sample = hd(Samplers), -% Fun = -% fun(Sampler) -> -% Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)], -% [?assert(lists:member(SamplerName, Keys)) || SamplerName<- ?SAMPLER_LIST] -% end, -% [Fun(Sampler) || Sampler <- Samplers], -% ok. +t_monitor_api(_) -> + timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, Samplers} = request(["monitor"], "latest=20"), + ?assert(erlang:length(Samplers) >= 2), + Fun = + fun(Sampler) -> + Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)], + [?assert(lists:member(SamplerName, Keys)) || SamplerName <- ?SAMPLER_LIST] + end, + [Fun(Sampler) || Sampler <- Samplers], + {ok, NodeSamplers} = request(["monitor", "nodes", node()]), + [Fun(NodeSampler) || NodeSampler <- NodeSamplers], + ok. -% t_monitor_api_error(_) -> -% {error, _Reason} = request(["monitor_a"]), -% ok. +t_monitor_current_api(_) -> + timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, Rate} = request(["monitor_current"]), + [?assert(maps:is_key(atom_to_binary(Key, utf8), Rate)) + || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST], + {ok, NodeRate} = request(["monitor_current", "nodes", node()]), + [?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate)) + || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST], + ok. -% request(Path) -> -% request(Path, ""). +t_monitor_api_error(_) -> + {error, {400, #{<<"code">> := <<"BAD_RPC">>}}} = + request(["monitor", "nodes", 'emqx@127.0.0.2']), + {error, {400, #{<<"code">> := <<"BAD_RPC">>}}} = + request(["monitor_current", "nodes", 'emqx@127.0.0.2']), + ok. -% request(Path, QS) -> -% Url = url(Path, QS), -% case do_request_api(get, {Path, auth_header_()}) of -% {ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])}; -% Error -> Error -% end. +request(Path) -> + request(Path, ""). -% url(Parts, QS)-> -% case QS of -% "" -> -% ?SERVER ++ filename:join([?BASE_PATH | Parts]); -% _ -> -% ?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS -% end. +request(Path, QS) -> + Url = url(Path, QS), + do_request_api(get, {Url, [auth_header_()]}). -% do_request_api(Method, Request)-> -% case httpc:request(Method, Request, [], []) of -% {error, socket_closed_remotely} -> -% {error, socket_closed_remotely}; -% {ok, {{"HTTP/1.1", Code, _}, _, Return} } -% when Code >= 200 andalso Code =< 299 -> -% {ok, emqx_json:decode(Return)}; -% {ok, Resp} -> -% {error, Resp} -% end. +url(Parts, QS)-> + case QS of + "" -> + ?SERVER ++ filename:join([?BASE_PATH | Parts]); + _ -> + ?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS + end. -% auth_header_() -> -% Basic = binary_to_list(base64:encode(<<"admin:public">>)), -% {"Authorization", "Basic " ++ Basic}. +do_request_api(Method, Request)-> + ct:pal("Req ~p ~p~n", [Method, Request]), + case httpc:request(Method, Request, [], []) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } + when Code >= 200 andalso Code =< 299 -> + {ok, emqx_json:decode(Return, [return_maps])}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } -> + {error, {Code, emqx_json:decode(Return, [return_maps])}}; + {error, Reason} -> + {error, Reason} + end. + +auth_header_() -> + Basic = binary_to_list(base64:encode(<<"admin:public">>)), + {"Authorization", "Basic " ++ Basic}.