From f4a669b3bcb4cb0a913b0bfd6bf791cfdbb3b6b7 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 17 Feb 2022 15:41:06 +0800 Subject: [PATCH 1/8] fix(doc): reflactor monitor --- .../src/emqx_dashboard_monitor.erl | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 apps/emqx_dashboard/src/emqx_dashboard_monitor.erl diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl new file mode 100644 index 000000000..53f9bb5f8 --- /dev/null +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -0,0 +1,87 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_dashboard_monitor). +-behaviour(gen_server). + +-export([start_link/0]). + +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-export([ mnesia/1]). + +-define(TAB, ?MODULE). + +-record(state, {}). +-define(INIT_DATA, + #{ + sent => 0, + received => 0, + sent_bytes => 0, + received_bytes => 0, + dropped => 0, + subscriptions => 0, + routes => 0, + connection => 0 + }). + +-record(monitor_data) + +mnesia(boot) -> + ok = mria:create_table(?TAB, [ + {type, set}, + {local_content, true}, + {storage, disc_copies}, + {record_name, monitor_data}, + {attributes, record_info(fields, monitor_data)}]). + +start_link() -> + gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #state{}}. + +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +terminate(_Reason, _State = #state{}) -> + ok. + +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +next_interval() -> + ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL), + ok. + +monit() -> + ok. From 21b9943df9ee3fd12544a0f59c5604b638fe0cb6 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 18 Feb 2022 19:34:33 +0800 Subject: [PATCH 2/8] feat: new monitor TODO: API --- apps/emqx_dashboard/etc/emqx_dashboard.conf | 1 + .../src/emqx_dashboard_monitor.erl | 196 ++++++++++++++++-- .../src/emqx_dashboard_schema.erl | 1 + .../emqx_dashboard/src/emqx_dashboard_sup.erl | 2 +- 4 files changed, 179 insertions(+), 21 deletions(-) diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index 1c5a757d7..e5a15b515 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -6,6 +6,7 @@ dashboard { default_username = "admin" default_password = "public" ## notice: sample_interval should be divisible by 60. + ## as like 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s sample_interval = 10s ## api jwt timeout. default is 30 minute token_expired_time = 60m diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 53f9bb5f8..c588f5a25 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -15,9 +15,14 @@ %%-------------------------------------------------------------------- -module(emqx_dashboard_monitor). + +-include_lib("stdlib/include/ms_transform.hrl"). + -behaviour(gen_server). --export([start_link/0]). +-boot_mnesia({mnesia, [boot]}). + +-export([ start_link/0]). -export([ init/1 , handle_call/3 @@ -29,36 +34,88 @@ -export([ mnesia/1]). +-export([ samples/0 + , samples/1 + , aggregate_samplers/0 + ]). + -define(TAB, ?MODULE). --record(state, {}). --define(INIT_DATA, - #{ - sent => 0, - received => 0, - sent_bytes => 0, - received_bytes => 0, - dropped => 0, - subscriptions => 0, - routes => 0, - connection => 0 +%% 10 seconds +-define(DEFAULT_INTERVAL, 10). + +-ifdef(TEST). +%% for test +-define(CLEAN_EXPIRED_INTERVAL, 2 * 1000). +-define(RETENTION_TIME, 3 * 1000). +-define(DEFAULT_GET_DATA_TIME, 5* 1000). + +-else. + +%% 1 hour = 60 * 60 * 1000 milliseconds +-define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000). +%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds +-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000). +%% 1 day = 60 * 60 * 1000 milliseconds +-define(DEFAULT_GET_DATA_TIME, 60 * 60 * 1000). + +-endif. + +-record(state, { + last }). --record(monitor_data) +-record(emqx_monit, { + time :: integer(), + data :: map() + }). + + +-define(DELTA_LIST, + [ received + , received_bytes + , sent + , sent_bytes + , dropped + ]). + +-define(SAMPLER_LIST, + [ subscriptions + , routes + , connections + ] ++ ?DELTA_LIST). mnesia(boot) -> ok = mria:create_table(?TAB, [ {type, set}, {local_content, true}, {storage, disc_copies}, - {record_name, monitor_data}, - {attributes, record_info(fields, monitor_data)}]). + {record_name, emqx_monit}, + {attributes, record_info(fields, emqx_monit)}]). + +aggregate_samplers() -> + [#{node => Node, data => samples(Node)} || Node <- mria_mnesia:cluster_nodes(running)]. + +samples() -> + All = [samples(Node) || Node <- mria_mnesia:cluster_nodes(running)], + lists:foldl(fun merge_cluster_samplers/2, #{}, All). + +samples(Node) when Node == node() -> + get_data(?DEFAULT_GET_DATA_TIME); +samples(Node) -> + rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node]). + +%%%=================================================================== +%%% gen_server functions +%%%=================================================================== start_link() -> - gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> - {ok, #state{}}. + sample_timer(), + clean_timer(), + {ok, #state{last = undefined}}. handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -66,6 +123,17 @@ handle_call(_Request, _From, State = #state{}) -> handle_cast(_Request, State = #state{}) -> {noreply, State}. +handle_info({sample, Time}, State = #state{last = Last}) -> + Now = sample(Time), + {atomic, ok} = flush(Last, Now), + sample_timer(), + {noreply, State#state{last = Now}}; + +handle_info(clean_expired, State) -> + clean(), + clean_timer(), + {noreply, State}; + handle_info(_Info, State = #state{}) -> {noreply, State}. @@ -79,9 +147,97 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +sample_timer() -> + {NextTime, Remaining} = next_interval(), + erlang:send_after(Remaining, self(), {sample, NextTime}). + +clean_timer() -> + erlang:send_after(?CLEAN_EXPIRED_INTERVAL, self(), clean_expired). + +%% Per interval seconds. +%% As an example: +%% Interval = 10 +%% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ... +%% Ensure that the monitor data of all nodes in the cluster are aligned in time next_interval() -> - ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL), + Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_INTERVAL) * 1000, + Now = erlang:system_time(millisecond), + NextTime = ((Now div Interval) + 1) * Interval, + Remaining = NextTime - Now, + {NextTime, Remaining}. + +sample(Time) -> + Fun = + fun(Key, Res) -> + maps:put(Key, value(Key), Res) + end, + Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST), + #emqx_monit{time = Time, data = Data}. + +flush(_Last = undefined, Now) -> + store(Now); +flush(_Last = #emqx_monit{data = LastData}, Now = #emqx_monit{data = NowData}) -> + Store = Now#emqx_monit{data = delta(LastData, NowData)}, + store(Store). + +delta(LastData, NowData) -> + Fun = + fun(Key, Data) -> + Value = maps:get(Key, NowData) - maps:get(Key, LastData), + Data#{Key => Value} + end, + lists:foldl(Fun, NowData, ?DELTA_LIST). + +store(MonitData) -> + {atomic, ok} = + mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]). + +clean() -> + Now = erlang:system_time(millisecond), + ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}], + Expired = ets:select(?TAB, ExpiredMS), + lists:foreach(fun(Data) -> + true = ets:delete_object(?TAB, Data) + end, Expired), ok. -monit() -> - ok. +get_data(PastTime) -> + Now = erlang:system_time(millisecond), + ExpiredMS = [{{'_', '$1', '_'}, [{'<', {'-', Now, '$1'}, PastTime}], ['$_']}], + format(ets:select(?TAB, ExpiredMS)). + +format(List) when is_list(List) -> + Fun = + fun(Data, All) -> + maps:merge(format(Data), All) + end, + lists:foldl(Fun, #{}, List); +format(#emqx_monit{time = Time, data = Data}) -> + #{Time => Data}. + +merge_cluster_samplers(Node, Cluster) -> + maps:fold(fun merge_cluster_samplers/3, Cluster, Node). + +merge_cluster_samplers(TS, NodeData, Cluster) -> + case maps:get(TS, Cluster, undefined) of + undefined -> + Cluster#{TS => NodeData}; + ClusterData -> + Cluster#{TS => count_map(NodeData, ClusterData)} + end. + +count_map(M1, M2) -> + Fun = + fun(Key, Map) -> + Map#{key => maps:get(Key, M1) + maps:get(Key, M2)} + end, + lists:foldl(Fun, #{}, ?SAMPLER_LIST). + +value(connections) -> emqx_stats:getstat('connections.count'); +value(routes) -> emqx_stats:getstat('routes.count'); +value(subscriptions) -> emqx_stats:getstat('subscriptions.count'); +value(received) -> emqx_metrics:val('messages.received'); +value(received_bytes) -> emqx_metrics:val('bytes.received'); +value(sent) -> emqx_metrics:val('messages.sent'); +value(sent_bytes) -> emqx_metrics:val('bytes.sent'); +value(dropped) -> emqx_metrics:val('messages.dropped'). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index f112fc852..efd890bec 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -39,6 +39,7 @@ but use the same port. """ })} , {default_username, fun default_username/1} , {default_password, fun default_password/1} + %% TODO: enum 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s , {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})} , {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})} , {cors, fun cors/1} diff --git a/apps/emqx_dashboard/src/emqx_dashboard_sup.erl b/apps/emqx_dashboard/src/emqx_dashboard_sup.erl index a4ce812b6..7e13221bd 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_sup.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_sup.erl @@ -29,4 +29,4 @@ start_link() -> init([]) -> {ok, {{one_for_all, 10, 100}, - [?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_collection)]}}. + [?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_monitor)]}}. From fedfa6c653e1e9044c033416e49b04d61b1fdd2c Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 23 Feb 2022 17:02:26 +0800 Subject: [PATCH 3/8] feat: monitor api, TODO: test suite --- .../emqx_dashboard/include/emqx_dashboard.hrl | 20 +- apps/emqx_dashboard/src/emqx_dashboard.erl | 11 +- .../src/emqx_dashboard_collection.erl | 191 --------- .../src/emqx_dashboard_monitor.erl | 117 +++--- .../src/emqx_dashboard_monitor_api.erl | 371 ++++-------------- .../src/proto/emqx_dashboard_proto_v1.erl | 14 +- .../test/emqx_dashboard_monitor_api_SUITE.erl | 81 ---- 7 files changed, 179 insertions(+), 626 deletions(-) delete mode 100644 apps/emqx_dashboard/src/emqx_dashboard_collection.erl diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 7e021370a..8deae8187 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -39,7 +39,19 @@ -define(DASHBOARD_SHARD, emqx_dashboard_shard). --record(mqtt_collect, { - timestamp :: integer(), - collect - }). +%% 10 seconds +-define(DEFAULT_SAMPLE_INTERVAL, 10). + +-define(DELTA_SAMPLER_LIST, + [ received + , received_bytes + , sent + , sent_bytes + , dropped + ]). + +-define(SAMPLER_LIST, + [ subscriptions + , routes + , connections + ] ++ ?DELTA_SAMPLER_LIST). diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index 8e618cb0e..a6fadc014 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -92,11 +92,12 @@ stop_listeners() -> %% internal apps() -> - [App || {App, _, _} <- application:loaded_applications(), - case re:run(atom_to_list(App), "^emqx") of - {match,[{0,4}]} -> true; - _ -> false - end]. + [emqx_dashboard]. + % [App || {App, _, _} <- application:loaded_applications(), + % case re:run(atom_to_list(App), "^emqx") of + % {match,[{0,4}]} -> true; + % _ -> false + % end]. listeners() -> [begin diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl deleted file mode 100644 index e0b57432c..000000000 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ /dev/null @@ -1,191 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_dashboard_collection). - --behaviour(gen_server). - --include("emqx_dashboard.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). - --export([ start_link/0 - ]). - --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --export([get_collect/0, select_data/0]). - --export([get_universal_epoch/0]). - --boot_mnesia({mnesia, [boot]}). - -%% Mnesia bootstrap --export([mnesia/1]). - --define(APP, emqx_dashboard). - --define(DEFAULT_INTERVAL, 10). %% seconds - --define(COLLECT, {[],[],[]}). - --define(CLEAR_INTERVAL, 86400000). - --define(EXPIRE_INTERVAL, 86400000 * 7). - -mnesia(boot) -> - ok = mria:create_table(?TAB_COLLECT, [ - {type, set}, - {local_content, true}, - {storage, disc_copies}, - {record_name, mqtt_collect}, - {attributes, record_info(fields, mqtt_collect)}]). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -get_collect() -> gen_server:call(whereis(?MODULE), get_collect). - --spec select_data() -> [#mqtt_collect{}]. -select_data() -> - Time = emqx_dashboard_collection:get_universal_epoch() - 7200000, - ets:select(?TAB_COLLECT, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]). - -init([]) -> - timer(next_interval(), collect), - timer(get_today_remaining_seconds(), clear_expire_data), - ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL), - State = #{ - count => count(), - expire_interval => ExpireInterval, - collect => ?COLLECT, - temp_collect => {0, 0, 0, 0}, - last_collects => {0, 0, 0} - }, - {ok, State}. - -%% @doc every whole interval seconds; -%% example: -%% interval is 10s -%% now 15:01:07 (or 15:07:01 ~ 15:07:10) -%% next will be 15:01:10, 15:01:20, 15:01:30 ... -%% ensure all counters in cluster have sync time -next_interval() -> - (1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1. - -interval() -> - emqx_conf:get([dashboard, sample_interval], ?DEFAULT_INTERVAL). - -count() -> - 60 div interval(). - -handle_call(get_collect, _From, State = #{temp_collect := {Received, Sent, _, _}}) -> - {reply, {Received, Sent, collect(subscriptions), collect(connections)}, State, hibernate}; -handle_call(_Req, _From, State) -> - {reply, ok, State}. -handle_cast(_Req, State) -> - {noreply, State}. - -handle_info(collect, State = #{ collect := Collect - , count := 1 - , temp_collect := TempCollect - , last_collects := LastCollect}) -> - timer(next_interval(), collect), - NewLastCollect = flush(collect_all(Collect), LastCollect), - TempCollect1 = temp_collect(TempCollect), - {noreply, State#{count => count(), - collect => ?COLLECT, - temp_collect => TempCollect1, - last_collects => NewLastCollect}}; - -handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) -> - timer(next_interval(), collect), - TempCollect1 = temp_collect(TempCollect), - {noreply, State#{count => Count - 1, - collect => collect_all(Collect), - temp_collect => TempCollect1}, hibernate}; - -handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) -> - timer(?CLEAR_INTERVAL, clear_expire_data), - T1 = get_universal_epoch(), - Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end), - Collects = ets:select(?TAB_COLLECT, Spec), - lists:foreach(fun(Collect) -> - true = ets:delete_object(?TAB_COLLECT, Collect) - end, Collects), - {noreply, State, hibernate}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -temp_collect({_, _, Received, Sent}) -> - Received1 = collect(received), - Sent1 = collect(sent), - {(Received1 - Received) div interval(), - (Sent1 - Sent) div interval(), - Received1, - Sent1}. - -collect_all({Connection, Route, Subscription}) -> - {[collect(connections) | Connection], - [collect(routes) | Route], - [collect(subscriptions) | Subscription]}. - -collect(connections) -> - emqx_stats:getstat('connections.count'); -collect(routes) -> - emqx_stats:getstat('routes.count'); -collect(subscriptions) -> - emqx_stats:getstat('subscriptions.count'); -collect(received) -> - emqx_metrics:val('messages.received'); -collect(sent) -> - emqx_metrics:val('messages.sent'); -collect(dropped) -> - emqx_metrics:val('messages.dropped'). - -flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> - Received = collect(received), - Sent = collect(sent), - Dropped = collect(dropped), - Collect = {avg(Connection), - avg(Route), - avg(Subscription), - diff(Received, Received0), - diff(Sent, Sent0), - diff(Dropped, Dropped0)}, - Ts = get_universal_epoch(), - {atomic, ok} = mria:transaction(mria:local_content_shard(), - fun mnesia:write/3, - [ ?TAB_COLLECT - , #mqtt_collect{timestamp = Ts, collect = Collect} - , write]), - {Received, Sent, Dropped}. - -avg(Items) -> - lists:sum(Items) div count(). - -diff(Item0, Item1) -> - Item0 - Item1. - -timer(Secs, Msg) -> - erlang:send_after(Secs, self(), Msg). - -get_today_remaining_seconds() -> - ?CLEAR_INTERVAL - (get_universal_epoch() rem ?CLEAR_INTERVAL). - -get_universal_epoch() -> - (calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - - calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index c588f5a25..99c636fe1 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -16,7 +16,7 @@ -module(emqx_dashboard_monitor). --include_lib("stdlib/include/ms_transform.hrl"). +-include("emqx_dashboard.hrl"). -behaviour(gen_server). @@ -34,15 +34,15 @@ -export([ mnesia/1]). --export([ samples/0 - , samples/1 - , aggregate_samplers/0 +-export([ samplers/0 + , samplers/1 + , samplers/2 ]). --define(TAB, ?MODULE). +%% for rpc +-export([ do_samples/1]). -%% 10 seconds --define(DEFAULT_INTERVAL, 10). +-define(TAB, ?MODULE). -ifdef(TEST). %% for test @@ -70,21 +70,6 @@ data :: map() }). - --define(DELTA_LIST, - [ received - , received_bytes - , sent - , sent_bytes - , dropped - ]). - --define(SAMPLER_LIST, - [ subscriptions - , routes - , connections - ] ++ ?DELTA_LIST). - mnesia(boot) -> ok = mria:create_table(?TAB, [ {type, set}, @@ -93,17 +78,26 @@ mnesia(boot) -> {record_name, emqx_monit}, {attributes, record_info(fields, emqx_monit)}]). -aggregate_samplers() -> - [#{node => Node, data => samples(Node)} || Node <- mria_mnesia:cluster_nodes(running)]. +samplers() -> + samplers(all). -samples() -> - All = [samples(Node) || Node <- mria_mnesia:cluster_nodes(running)], - lists:foldl(fun merge_cluster_samplers/2, #{}, All). +samplers(NodeOrCluster) -> + format(do_samples(NodeOrCluster)). -samples(Node) when Node == node() -> - get_data(?DEFAULT_GET_DATA_TIME); -samples(Node) -> - rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node]). +samplers(NodeOrCluster, 0) -> + samplers(NodeOrCluster); +samplers(NodeOrCluster, Latest) -> + case samplers(NodeOrCluster) of + {badrpc, Reason} -> + {badrpc, Reason}; + List when is_list(List) -> + case erlang:length(List) - Latest of + Start when Start > 0 -> + lists:sublist(List, Start, Latest); + _ -> + List + end + end. %%%=================================================================== %%% gen_server functions @@ -147,6 +141,43 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +do_samples(all) -> + Fun = + fun(Node, All) -> + case do_samples(Node) of + {badrpc, Reason} -> + {badrpc, {Node, Reason}}; + NodeSamplers -> + merge_cluster_samplers(NodeSamplers, All) + end + end, + lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)); +do_samples(Node) when Node == node() -> + get_data(?DEFAULT_GET_DATA_TIME); +do_samples(Node) -> + rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000). + +merge_cluster_samplers(Node, Cluster) -> + maps:fold(fun merge_cluster_samplers/3, Cluster, Node). + +merge_cluster_samplers(TS, NodeData, Cluster) -> + case maps:get(TS, Cluster, undefined) of + undefined -> + Cluster#{TS => NodeData}; + ClusterData -> + Cluster#{TS => count_map(NodeData, ClusterData)} + end. + +format({badrpc, Reason}) -> + {badrpc, Reason}; +format(Data) -> + All = maps:fold(fun format/3, [], Data), + Compare = fun(#{time_stamp := T1}, #{time_stamp := T2}) -> T1 =< T2 end, + lists:sort(Compare, All). + +format(TimeStamp, Data, All) -> + [Data#{time_stamp => TimeStamp} | All]. + sample_timer() -> {NextTime, Remaining} = next_interval(), erlang:send_after(Remaining, self(), {sample, NextTime}). @@ -160,7 +191,7 @@ clean_timer() -> %% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ... %% Ensure that the monitor data of all nodes in the cluster are aligned in time next_interval() -> - Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_INTERVAL) * 1000, + Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000, Now = erlang:system_time(millisecond), NextTime = ((Now div Interval) + 1) * Interval, Remaining = NextTime - Now, @@ -186,7 +217,7 @@ delta(LastData, NowData) -> Value = maps:get(Key, NowData) - maps:get(Key, LastData), Data#{Key => Value} end, - lists:foldl(Fun, NowData, ?DELTA_LIST). + lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST). store(MonitData) -> {atomic, ok} = @@ -204,28 +235,18 @@ clean() -> get_data(PastTime) -> Now = erlang:system_time(millisecond), ExpiredMS = [{{'_', '$1', '_'}, [{'<', {'-', Now, '$1'}, PastTime}], ['$_']}], - format(ets:select(?TAB, ExpiredMS)). + internal_format(ets:select(?TAB, ExpiredMS)). -format(List) when is_list(List) -> +%% To make it easier to do data aggregation +internal_format(List) when is_list(List) -> Fun = fun(Data, All) -> - maps:merge(format(Data), All) + maps:merge(internal_format(Data), All) end, lists:foldl(Fun, #{}, List); -format(#emqx_monit{time = Time, data = Data}) -> +internal_format(#emqx_monit{time = Time, data = Data}) -> #{Time => Data}. -merge_cluster_samplers(Node, Cluster) -> - maps:fold(fun merge_cluster_samplers/3, Cluster, Node). - -merge_cluster_samplers(TS, NodeData, Cluster) -> - case maps:get(TS, Cluster, undefined) of - undefined -> - Cluster#{TS => NodeData}; - ClusterData -> - Cluster#{TS => count_map(NodeData, ClusterData)} - end. - count_map(M1, M2) -> Fun = fun(Key, Map) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index e9bc62521..d3cf5523c 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -5,308 +5,105 @@ -module(emqx_dashboard_monitor_api). -include("emqx_dashboard.hrl"). +-include_lib("typerefl/include/types.hrl"). -behaviour(minirest_api). --import(emqx_mgmt_util, [schema/2]). --export([api_spec/0]). +-export([ api_spec/0]). --export([ monitor/2 - , counters/2 - , monitor_nodes/2 - , monitor_nodes_counters/2 - , current_counters/2 +-export([ paths/0 + , schema/1 + , fields/1 ]). --export([ sampling/1 - , sampling/2 - ]). +-export([ monitor/2]). --define(COUNTERS, [ connection - , route - , subscriptions - , received - , sent - , dropped]). - --define(EMPTY_COLLECTION, {0, 0, 0, 0}). +-define(SAMPLERS, + [ connection + , route + , subscriptions + , received + , sent + , dropped + ]). api_spec() -> - {[ monitor_api() - , monitor_nodes_api() - , monitor_nodes_counters_api() - , monitor_counters_api() - , monitor_current_api() - ], - []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). -monitor_api() -> - Metadata = #{ +paths() -> + [ "/monitor" + , "/monitor/nodes/:node" + ]. + +schema("/monitor") -> + #{ + 'operationId' => monitor, get => #{ - description => <<"List monitor data">>, + description => <<"List monitor data.">>, parameters => [ - #{ - name => aggregate, - in => query, - required => false, - schema => #{type => boolean} - } + {latest, hoconsc:mk(integer(), #{in => query, required => false, example => 1000})} ], responses => #{ - <<"200">> => schema(counters_schema(), <<"Monitor count data">>)}}}, - {"/monitor", Metadata, monitor}. - -monitor_nodes_api() -> - Metadata = #{ - get => #{ - description => <<"List monitor data">>, - parameters => [path_param_node()], - responses => #{ - <<"200">> => schema(counters_schema(), <<"Monitor count data in node">>)}}}, - {"/monitor/nodes/:node", Metadata, monitor_nodes}. - -monitor_nodes_counters_api() -> - Metadata = #{ - get => #{ - description => <<"List monitor data">>, - parameters => [ - path_param_node(), - path_param_counter() - ], - responses => #{ - <<"200">> => schema(counter_schema(), <<"Monitor single count data in node">>)}}}, - {"/monitor/nodes/:node/counters/:counter", Metadata, monitor_nodes_counters}. - -monitor_counters_api() -> - Metadata = #{ - get => #{ - description => <<"List monitor data">>, - parameters => [ - path_param_counter() - ], - responses => #{ - <<"200">> => - schema(counter_schema(), <<"Monitor single count data">>)}}}, - {"/monitor/counters/:counter", Metadata, counters}. -monitor_current_api() -> - Metadata = #{ - get => #{ - description => <<"Current monitor data">>, - responses => #{ - <<"200">> => schema(current_counters_schema(), <<"Current monitor data">>)}}}, - {"/monitor/current", Metadata, current_counters}. - -path_param_node() -> - #{ - name => node, - in => path, - required => true, - schema => #{type => string}, - example => node() - }. - -path_param_counter() -> - #{ - name => counter, - in => path, - required => true, - schema => #{type => string, enum => ?COUNTERS}, - example => hd(?COUNTERS) - }. - -current_counters_schema() -> - #{ - type => object, - properties => #{ - connection => #{type => integer}, - sent => #{type => integer}, - received => #{type => integer}, - subscription => #{type => integer}} - }. - -counters_schema() -> - Fun = - fun(K, M) -> - maps:merge(M, counters_schema(K)) - end, - Properties = lists:foldl(Fun, #{}, ?COUNTERS), - #{ - type => object, - properties => Properties - }. - -counters_schema(Name) -> - #{Name => counter_schema()}. -counter_schema() -> - #{ - type => array, - items => #{ - type => object, - properties => #{ - timestamp => #{ - type => integer, - description => <<"Millisecond">>}, - count => #{ - type => integer}}}}. -%%%============================================================================================== -%% parameters trans -monitor(get, #{query_string := Qs}) -> - Aggregate = maps:get(<<"aggregate">>, Qs, <<"false">>), - {200, list_collect(Aggregate)}. - -monitor_nodes(get, #{bindings := #{node := Node}}) -> - lookup([{<<"node">>, Node}]). - -monitor_nodes_counters(get, #{bindings := #{node := Node, counter := Counter}}) -> - lookup([{<<"node">>, Node}, {<<"counter">>, Counter}]). - -counters(get, #{bindings := #{counter := Counter}}) -> - lookup([{<<"counter">>, Counter}]). - -current_counters(get, _Params) -> - Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()], - Nodes = length(mria_mnesia:running_nodes()), - {Received, Sent, Sub, Conn} = format_current_metrics(Data), - Response = #{ - nodes => Nodes, - received => Received, - sent => Sent, - subscription => Sub, - connection => Conn - }, - {200, Response}. - -format_current_metrics(Collects) -> - format_current_metrics(Collects, ?EMPTY_COLLECTION). -format_current_metrics([], Acc) -> - Acc; -format_current_metrics([{Received, Sent, Sub, Conn} | Collects], - {Received1, Sent1, Sub1, Conn1}) -> - format_current_metrics(Collects, - {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). - - -%%%============================================================================================== -%% api apply - -lookup(Params) -> - Fun = - fun({K,V}, M) -> - maps:put(binary_to_atom(K, utf8), binary_to_atom(V, utf8), M) - end, - lookup_(lists:foldl(Fun, #{}, Params)). - -lookup_(#{node := Node, counter := Counter}) -> - Data = hd(maps:values(sampling(Node, Counter))), - {200, Data}; -lookup_(#{node := Node}) -> - {200, sampling(Node)}; -lookup_(#{counter := Counter}) -> - CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]), - Data = hd(maps:values(CounterData)), - {200, Data}. - -list_collect(Aggregate) -> - case Aggregate of - <<"true">> -> - [maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()]; - _ -> - Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()], - merger_counters(Counters) - end. - -get_collect(Node) -> - case emqx_dashboard_proto_v1:get_collect(Node) of - {badrpc, _Reason} -> ?EMPTY_COLLECTION; - Res -> Res - end. - -merger_counters(ClusterCounters) -> - lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters). - -merger_node_counters(NodeCounters, Counters) -> - maps:fold(fun merger_counter/3, Counters, NodeCounters). - -merger_counter(Key, Counters, Res) -> - case maps:get(Key, Res, undefined) of - undefined -> - Res#{Key => Counters}; - OldCounters -> - NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters), - Res#{Key => NCounters} - end. - -merger_counter(#{timestamp := Timestamp, count := Value}, Counters) -> - Comparison = - fun(Counter) -> - case maps:get(timestamp, Counter) =:= Timestamp of - true -> - Count = maps:get(count, Counter), - {ok, Counter#{count => Count + Value}}; - false -> - ignore - end - end, - key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}). - -key_replace(List, Comparison, Default) -> - key_replace(List, List, Comparison, Default). - -key_replace([], All, _Comparison, Default) -> - [Default | All]; - -key_replace([Term | List], All, Comparison, Default) -> - case Comparison(Term) of - {ok, NTerm} -> - Tail = [NTerm | List], - Header = lists:sublist(All, length(All) - length(Tail)), - lists:append(Header, Tail); - _ -> - key_replace(List, All, Comparison, Default) - end. - -sampling(Node) -> - Data = emqx_dashboard_proto_v1:select_data(Node), - format(lists:sort(Data)). - -sampling(Node, Counter) -> - Data = emqx_dashboard_proto_v1:select_data(Node), - format_single(lists:sort(Data), Counter). - -format(Collects) -> - format(Collects, {[],[],[],[],[],[]}). -format([], {Connection, Route, Subscription, Received, Sent, Dropped}) -> - #{ - connection => add_key(Connection), - route => add_key(Route), - subscriptions => add_key(Subscription), - received => add_key(Received), - sent => add_key(Sent), - dropped => add_key(Dropped) + 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), + 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) + } + } }; -format([#mqtt_collect{timestamp = Ts, collect = {C, R, S, Re, S1, D}} | Collects], - {Connection, Route, Subscription, Received, Sent, Dropped}) -> - format(Collects, {[[Ts, C] | Connection], - [[Ts, R] | Route], - [[Ts, S] | Subscription], - [[Ts, Re] | Received], - [[Ts, S1] | Sent], - [[Ts, D] | Dropped]}). -add_key(Collects) -> - lists:reverse([#{timestamp => Ts * 1000, count => C} || [Ts, C] <- Collects]). +schema("/monitor/nodes/:node") -> + #{ + 'operationId' => monitor, + get => #{ + description => <<"List the monitor data on the node.">>, + parameters => [ + {node, hoconsc:mk(binary(), #{in => path, required => true, example => node()})}, + {latest, hoconsc:mk(integer(), #{in => query, required => false, example => 1000})} + ], + responses => #{ + 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), + 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) + } + } + }. -format_single(Collects, Counter) -> - #{Counter => format_single(Collects, counter_index(Counter), [])}. -format_single([], _Index, Acc) -> - lists:reverse(Acc); -format_single([#mqtt_collect{timestamp = Ts, collect = Collect} | Collects], Index, Acc) -> - format_single(Collects, Index, - [#{timestamp => Ts * 1000, count => erlang:element(Index, Collect)} | Acc]). +fields(sampler) -> + Samplers = + [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} + || SamplerName <- ?SAMPLER_LIST], + [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]. -counter_index(connection) -> 1; -counter_index(route) -> 2; -counter_index(subscriptions) -> 3; -counter_index(received) -> 4; -counter_index(sent) -> 5; -counter_index(dropped) -> 6. +%% ------------------------------------------------------------------------------------------------- +%% API + +monitor(get, #{query_string := QS, bindings := Bindings}) -> + Latest = maps:get(<<"latest">>, QS, 0), + Node = binary_to_atom(maps:get(node, Bindings, <<"all">>)), + case emqx_dashboard_monitor:samplers(Node, Latest) of + {badrpc, {Node, Reason}} -> + Message = list_to_binary(io_lib:format("Bad node ~p, rpc failed ~p", [Node, Reason])), + {400, 'BAD_RPC', Message}; + Samplers -> + {200, Samplers} + end. + +%% ------------------------------------------------------------------------------------------------- +%% Internal + +sampler_desc(received) -> sampler_desc_format("Received messages "); +sampler_desc(received_bytes) -> sampler_desc_format("Received bytes "); +sampler_desc(sent) -> sampler_desc_format("Sent messages "); +sampler_desc(sent_bytes) -> sampler_desc_format("Sent bytes "); +sampler_desc(dropped) -> sampler_desc_format("Dropped messages "); +sampler_desc(subscriptions) -> + <<"Subscriptions at the time of sampling." + " Can only represent the approximate state">>; +sampler_desc(routes) -> + <<"Routes at the time of sampling." + " Can only represent the approximate state">>; +sampler_desc(connections) -> + <<"Connections at the time of sampling." + " Can only represent the approximate state">>. + +sampler_desc_format(Format) -> + Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL), + list_to_binary(io_lib:format(Format ++ "last ~p seconds", [Interval])). diff --git a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl index 6cf562983..09c3ce66c 100644 --- a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl +++ b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl @@ -20,8 +20,7 @@ -export([ introduced_in/0 - , get_collect/1 - , select_data/1 + , samplers/1 ]). -include("emqx_dashboard.hrl"). @@ -30,11 +29,6 @@ introduced_in() -> "5.0.0". --spec get_collect(node()) -> _. -get_collect(Node) -> - rpc:call(Node, emqx_dashboard_collection, get_collect, []). - --spec select_data(node()) -> [#mqtt_collect{}] - | emqx_rpc:badrpc(). -select_data(Node) -> - rpc:call(Node, emqx_dashboard_collection, select_data, []). +-spec samplers(node()) -> list(map()) | emqx_rpc:badrpc(). +samplers(Node) -> + rpc:call(Node, emqx_dashboard_monitor, samplers, [Node]). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl index 717091375..aae7a4b69 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl @@ -27,95 +27,14 @@ all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(t_badrpc_collect, Config) -> - Cluster = cluster_specs(2), - Apps = [emqx_modules, emqx_dashboard], - Nodes = [N1, N2] = lists:map(fun(Spec) -> start_slave(Spec, Apps) end, Cluster), - %% form the cluster - ok = rpc:call(N2, mria, join, [N1]), - %% Wait until all nodes are healthy: - [rpc:call(Node, mria_rlog, wait_for_shards, [[?DASHBOARD_SHARD], 5000]) - || Node <- Nodes], - [ {nodes, Nodes} - , {apps, Apps} - | Config]; init_per_testcase(_, Config) -> Config. -end_per_testcase(t_badrpc_collect, Config) -> - Apps = ?config(apps, Config), - Nodes = ?config(nodes, Config), - lists:foreach(fun(Node) -> stop_slave(Node, Apps) end, Nodes), - ok; end_per_testcase(_, _Config) -> ok. -t_badrpc_collect(Config) -> - [N1, N2] = ?config(nodes, Config), - %% simulate badrpc on one node - ok = rpc:call(N2, meck, new, [emqx_dashboard_collection, [no_history, no_link]]), - %% we don't mock the `emqx_dashboard_collection:get_collect/0' to - %% provoke the `badrpc' error. - ?assertMatch( - {200, #{nodes := 2}}, - rpc:call(N1, emqx_dashboard_monitor_api, current_counters, [get, #{}])), - ok = rpc:call(N2, meck, unload, [emqx_dashboard_collection]), - ok. %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ -cluster_specs(NumNodes) -> - BaseGenRpcPort = 9000, - Specs0 = [#{ name => node_name(N) - , num => N - } - || N <- lists:seq(1, NumNodes)], - GenRpcPorts = maps:from_list([{node_id(Name), {tcp, BaseGenRpcPort + N}} - || #{name := Name, num := N} <- Specs0]), - [ Spec#{env => [ {gen_rpc, tcp_server_port, BaseGenRpcPort + N} - , {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} - ]} - || Spec = #{num := N} <- Specs0]. - -node_name(N) -> - list_to_atom("n" ++ integer_to_list(N)). - -node_id(Name) -> - list_to_atom(lists:concat([Name, "@", host()])). - -start_slave(Spec = #{ name := Name}, Apps) -> - CommonBeamOpts = "+S 1:1 ", % We want VMs to only occupy a single core - {ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()), - setup_node(Node, Spec, Apps), - Node. - -stop_slave(Node, Apps) -> - ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps]), - slave:stop(Node). - -host() -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. - -ebin_path() -> - string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). - -is_lib(Path) -> - string:prefix(Path, code:lib_dir()) =:= nomatch. - -setenv(Node, Env) -> - [rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env]. - -setup_node(Node, _Spec = #{env := Env}, Apps) -> - %% load these before starting ekka and such - [rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_conf, emqx]], - setenv(Node, Env), - EnvHandler = - fun(emqx) -> - application:set_env(emqx, boot_modules, [router, broker]); - (_) -> - ok - end, - ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps, EnvHandler]), - ok. From 8c1c87a8d6b810548184bcc4d39759483a6abb5d Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 24 Feb 2022 14:39:30 +0800 Subject: [PATCH 4/8] fix(test): add monitor SUITE, TODO: API Test SUITE --- .../emqx_dashboard/include/emqx_dashboard.hrl | 7 +- .../src/emqx_dashboard_monitor.erl | 34 ++--- .../src/emqx_dashboard_monitor_api.erl | 15 +- .../test/emqx_dashboard_monitor_SUITE.erl | 130 ++++++++++++++++++ .../test/emqx_dashboard_monitor_api_SUITE.erl | 40 ------ 5 files changed, 148 insertions(+), 78 deletions(-) create mode 100644 apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl delete mode 100644 apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 8deae8187..a32affef0 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -39,8 +39,13 @@ -define(DASHBOARD_SHARD, emqx_dashboard_shard). -%% 10 seconds +-ifdef(TEST). +%% for test, 2s +-define(DEFAULT_SAMPLE_INTERVAL, 1). +-else. +%% dashboard monitor do sample interval, default 10s -define(DEFAULT_SAMPLE_INTERVAL, 10). +-endif. -define(DELTA_SAMPLER_LIST, [ received diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 99c636fe1..40008f23e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -40,26 +40,14 @@ ]). %% for rpc --export([ do_samples/1]). +-export([ do_sample/1]). -define(TAB, ?MODULE). --ifdef(TEST). -%% for test --define(CLEAN_EXPIRED_INTERVAL, 2 * 1000). --define(RETENTION_TIME, 3 * 1000). --define(DEFAULT_GET_DATA_TIME, 5* 1000). - --else. - %% 1 hour = 60 * 60 * 1000 milliseconds -define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000). %% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds -define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000). -%% 1 day = 60 * 60 * 1000 milliseconds --define(DEFAULT_GET_DATA_TIME, 60 * 60 * 1000). - --endif. -record(state, { last @@ -82,7 +70,7 @@ samplers() -> samplers(all). samplers(NodeOrCluster) -> - format(do_samples(NodeOrCluster)). + format(do_sample(NodeOrCluster)). samplers(NodeOrCluster, 0) -> samplers(NodeOrCluster); @@ -141,10 +129,10 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -do_samples(all) -> +do_sample(all) -> Fun = fun(Node, All) -> - case do_samples(Node) of + case do_sample(Node) of {badrpc, Reason} -> {badrpc, {Node, Reason}}; NodeSamplers -> @@ -152,9 +140,10 @@ do_samples(all) -> end end, lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)); -do_samples(Node) when Node == node() -> - get_data(?DEFAULT_GET_DATA_TIME); -do_samples(Node) -> +do_sample(Node) when Node == node() -> + ExpiredMS = [{'$1',[],['$1']}], + internal_format(ets:select(?TAB, ExpiredMS)); +do_sample(Node) -> rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000). merge_cluster_samplers(Node, Cluster) -> @@ -191,7 +180,7 @@ clean_timer() -> %% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ... %% Ensure that the monitor data of all nodes in the cluster are aligned in time next_interval() -> - Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000, + Interval = emqx_conf:get([dashboard, sample_interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000, Now = erlang:system_time(millisecond), NextTime = ((Now div Interval) + 1) * Interval, Remaining = NextTime - Now, @@ -232,11 +221,6 @@ clean() -> end, Expired), ok. -get_data(PastTime) -> - Now = erlang:system_time(millisecond), - ExpiredMS = [{{'_', '$1', '_'}, [{'<', {'-', Now, '$1'}, PastTime}], ['$_']}], - internal_format(ets:select(?TAB, ExpiredMS)). - %% To make it easier to do data aggregation internal_format(List) when is_list(List) -> Fun = diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index d3cf5523c..65fb3a153 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -18,15 +18,6 @@ -export([ monitor/2]). --define(SAMPLERS, - [ connection - , route - , subscriptions - , received - , sent - , dropped - ]). - api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). @@ -41,7 +32,7 @@ schema("/monitor") -> get => #{ description => <<"List monitor data.">>, parameters => [ - {latest, hoconsc:mk(integer(), #{in => query, required => false, example => 1000})} + {latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})} ], responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), @@ -56,8 +47,8 @@ schema("/monitor/nodes/:node") -> get => #{ description => <<"List the monitor data on the node.">>, parameters => [ - {node, hoconsc:mk(binary(), #{in => path, required => true, example => node()})}, - {latest, hoconsc:mk(integer(), #{in => query, required => false, example => 1000})} + {node, hoconsc:mk(binary(), #{in => path, nullable => false, example => node()})}, + {latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})} ], responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl new file mode 100644 index 000000000..43d8d0712 --- /dev/null +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -0,0 +1,130 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_dashboard_monitor_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include("emqx_dashboard.hrl"). + +-define(SERVER, "http://127.0.0.1:18083"). +-define(BASE_PATH, "/api/v5"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + mria:start(), + emqx_common_test_helpers:start_apps([emqx_dashboard], fun set_special_configs/1), + Config. + +end_per_suite(Config) -> + emqx_common_test_helpers:stop_apps([emqx_dashboard]), + Config. + +set_special_configs(emqx_dashboard) -> + Config = #{ + default_username => <<"admin">>, + default_password => <<"public">>, + listeners => [#{ + protocol => http, + port => 18083 + }] + }, + emqx_config:put([dashboard], Config), + ok; +set_special_configs(_) -> + ok. + +t_monitor_samplers_all(_Config) -> + timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + Size = mnesia:table_info(emqx_dashboard_monitor,size), + All = emqx_dashboard_monitor:samplers(all), + All2 = emqx_dashboard_monitor:samplers(), + ?assert(erlang:length(All) == Size), + ?assert(erlang:length(All2) == Size), + ok. + +t_monitor_samplers_latest(_Config) -> + timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + Samplers = emqx_dashboard_monitor:samplers(node(), 2), + Latest = emqx_dashboard_monitor:samplers(node(), 1), + ?assert(erlang:length(Samplers) == 2), + ?assert(erlang:length(Latest) == 1), + ?assert(hd(Latest) == lists:nth(2, Samplers)), + ok. + +t_monitor_sampler_format(_Config) -> + timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + Latest = hd(emqx_dashboard_monitor:samplers(node(), 1)), + SamplerKeys = maps:keys(Latest), + [?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST], + ok. + +%% TODO: api test +% t_monitor_api(_) -> +% timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), +% {ok, Samplers} = request(["monitor"]), +% ?assert(erlang:length(Samplers) >= 2), +% Sample = hd(Samplers), +% Fun = +% fun(Sampler) -> +% Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)], +% [?assert(lists:member(SamplerName, Keys)) || SamplerName<- ?SAMPLER_LIST] +% end, +% [Fun(Sampler) || Sampler <- Samplers], +% ok. + +% t_monitor_api_error(_) -> +% {error, _Reason} = request(["monitor_a"]), +% ok. + +% request(Path) -> +% request(Path, ""). + +% request(Path, QS) -> +% Url = url(Path, QS), +% case do_request_api(get, {Path, auth_header_()}) of +% {ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])}; +% Error -> Error +% end. + +% url(Parts, QS)-> +% case QS of +% "" -> +% ?SERVER ++ filename:join([?BASE_PATH | Parts]); +% _ -> +% ?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS +% end. + +% do_request_api(Method, Request)-> +% case httpc:request(Method, Request, [], []) of +% {error, socket_closed_remotely} -> +% {error, socket_closed_remotely}; +% {ok, {{"HTTP/1.1", Code, _}, _, Return} } +% when Code >= 200 andalso Code =< 299 -> +% {ok, emqx_json:decode(Return)}; +% {ok, Resp} -> +% {error, Resp} +% end. + +% auth_header_() -> +% Basic = binary_to_list(base64:encode(<<"admin:public">>)), +% {"Authorization", "Basic " ++ Basic}. diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl deleted file mode 100644 index aae7a4b69..000000000 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_api_SUITE.erl +++ /dev/null @@ -1,40 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_dashboard_monitor_api_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). --include_lib("emqx/include/emqx.hrl"). --include("emqx_dashboard.hrl"). - -all() -> - emqx_common_test_helpers:all(?MODULE). - -init_per_testcase(_, Config) -> - Config. - -end_per_testcase(_, _Config) -> - ok. - - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - From a88e8b0c9ea10674eebbbf70a78ccee1e69592c6 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 24 Feb 2022 14:51:41 +0800 Subject: [PATCH 5/8] fix(doc): remove annotation --- apps/emqx_dashboard/etc/emqx_dashboard.conf | 2 +- apps/emqx_dashboard/src/emqx_dashboard.erl | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index e5a15b515..0fb0184fe 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -6,7 +6,7 @@ dashboard { default_username = "admin" default_password = "public" ## notice: sample_interval should be divisible by 60. - ## as like 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s + ## like 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s sample_interval = 10s ## api jwt timeout. default is 30 minute token_expired_time = 60m diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index a6fadc014..8e618cb0e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -92,12 +92,11 @@ stop_listeners() -> %% internal apps() -> - [emqx_dashboard]. - % [App || {App, _, _} <- application:loaded_applications(), - % case re:run(atom_to_list(App), "^emqx") of - % {match,[{0,4}]} -> true; - % _ -> false - % end]. + [App || {App, _, _} <- application:loaded_applications(), + case re:run(atom_to_list(App), "^emqx") of + {match,[{0,4}]} -> true; + _ -> false + end]. listeners() -> [begin From c21bc9d329ef904f3a7a80aa3a8ae4752fc6c749 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 24 Feb 2022 21:25:39 +0800 Subject: [PATCH 6/8] feat: dashboard monitor granularity adapter --- .../emqx_dashboard/include/emqx_dashboard.hrl | 14 +- .../src/emqx_dashboard_monitor.erl | 136 ++++++++++++++---- .../src/emqx_dashboard_monitor_api.erl | 47 +++++- 3 files changed, 162 insertions(+), 35 deletions(-) diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index a32affef0..c8a2f8a1f 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -55,8 +55,18 @@ , dropped ]). --define(SAMPLER_LIST, +-define(GAUGE_SAMPLER_LIST, [ subscriptions , routes , connections - ] ++ ?DELTA_SAMPLER_LIST). + ]). + +-define(SAMPLER_LIST, ?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST). + +-define(DELTA_SAMPLER_RATE_MAP, #{ + received => received_rate, + received_bytes => received_bytes_rate, + sent => sent_rate, + sent_bytes => sent_bytes_rate, + dropped => dropped_rate + }). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 40008f23e..1eeb5b68f 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -35,12 +35,14 @@ -export([ mnesia/1]). -export([ samplers/0 - , samplers/1 , samplers/2 + , current_rate/0 + , current_rate/1 + , granularity_adapter/1 ]). %% for rpc --export([ do_sample/1]). +-export([ do_sample/2]). -define(TAB, ?MODULE). @@ -66,30 +68,53 @@ mnesia(boot) -> {record_name, emqx_monit}, {attributes, record_info(fields, emqx_monit)}]). +%% ------------------------------------------------------------------------------------------------- +%% API + samplers() -> - samplers(all). + format(do_sample(all, infinity)). -samplers(NodeOrCluster) -> - format(do_sample(NodeOrCluster)). - -samplers(NodeOrCluster, 0) -> - samplers(NodeOrCluster); samplers(NodeOrCluster, Latest) -> - case samplers(NodeOrCluster) of + Now = erlang:system_time(millisecond), + MatchTime = Now - (Latest * 1000), + case format(do_sample(NodeOrCluster, MatchTime)) of {badrpc, Reason} -> {badrpc, Reason}; List when is_list(List) -> - case erlang:length(List) - Latest of - Start when Start > 0 -> - lists:sublist(List, Start, Latest); - _ -> - List - end + granularity_adapter(List) end. -%%%=================================================================== -%%% gen_server functions -%%%=================================================================== +granularity_adapter(List) when length(List) > 100 -> + granularity_adapter(List, []); +granularity_adapter(List) -> + List. + +current_rate() -> + Fun = + fun(Node, Cluster) -> + case current_rate(Node) of + {ok, CurrentRate} -> + merge_cluster_rate(CurrentRate, Cluster); + {badrpc, Reason} -> + {badrpc, {Node, Reason}} + end + end, + lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)). + +current_rate(all) -> + current_rate(); +current_rate(Node) when Node == node() -> + do_call(current_rate); +current_rate(Node) -> + case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000) of + {badrpc, Reason} -> + {badrpc, {Node, Reason}}; + {ok, Rate} -> + {ok, Rate} + end. + +%% ------------------------------------------------------------------------------------------------- +%% gen_server functions start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -99,6 +124,11 @@ init([]) -> clean_timer(), {ok, #state{last = undefined}}. +handle_call(current_rate, _From, State = #state{last = Last}) -> + NowTime = erlang:system_time(millisecond), + NowSamplers = sample(NowTime), + Rate = cal_rate(NowSamplers, Last), + {reply, {ok, Rate}, State}; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -125,14 +155,16 @@ terminate(_Reason, _State = #state{}) -> code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. -%%%=================================================================== -%%% Internal functions -%%%=================================================================== +%% ------------------------------------------------------------------------------------------------- +%% Internal functions -do_sample(all) -> +do_call(Request) -> + gen_server:call(?MODULE, Request, 5000). + +do_sample(all, MatchTime) -> Fun = fun(Node, All) -> - case do_sample(Node) of + case do_sample(Node, MatchTime) of {badrpc, Reason} -> {badrpc, {Node, Reason}}; NodeSamplers -> @@ -140,11 +172,16 @@ do_sample(all) -> end end, lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)); -do_sample(Node) when Node == node() -> - ExpiredMS = [{'$1',[],['$1']}], - internal_format(ets:select(?TAB, ExpiredMS)); -do_sample(Node) -> - rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000). +do_sample(Node, MatchTime) when Node == node() -> + MS = match_spec(MatchTime), + internal_format(ets:select(?TAB, MS)); +do_sample(Node, MatchTime) -> + rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, MatchTime], 5000). + +match_spec(infinity) -> + [{'$1',[],['$1']}]; +match_spec(MatchTime) -> + [{{'_', '$1', '_'}, [{'>=', '$1', MatchTime}], ['$_']}]. merge_cluster_samplers(Node, Cluster) -> maps:fold(fun merge_cluster_samplers/3, Cluster, Node). @@ -157,6 +194,14 @@ merge_cluster_samplers(TS, NodeData, Cluster) -> Cluster#{TS => count_map(NodeData, ClusterData)} end. +merge_cluster_rate(Node, Cluster) -> + Fun = + fun(Key, Value, NCluster) -> + ClusterValue = maps:get(Key, NCluster, 0), + NCluster#{Key => Value + ClusterValue} + end, + maps:fold(Fun, Cluster, Node). + format({badrpc, Reason}) -> {badrpc, Reason}; format(Data) -> @@ -167,6 +212,38 @@ format(Data) -> format(TimeStamp, Data, All) -> [Data#{time_stamp => TimeStamp} | All]. +cal_rate( #emqx_monit{data = NowData, time = NowTime} + , #emqx_monit{data = LastData, time = LastTime}) -> + TimeDelta = NowTime - LastTime, + Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end, + Gauge = maps:filter(Filter, NowData), + {_, _, _, Rate} = + lists:foldl(fun cal_rate_/2, {NowData, LastData, TimeDelta, Gauge}, ?DELTA_SAMPLER_LIST), + Rate. + +cal_rate_(Key, {Now, Last, TDelta, Res}) -> + NewValue = maps:get(Key, Now), + LastValue = maps:get(Key, Last), + Rate = ((NewValue - LastValue) * 1000) div TDelta, + RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP), + {Now, Last, TDelta, Res#{RateKey => Rate}}. + +granularity_adapter([], Res) -> + lists:reverse(Res); +granularity_adapter([Sampler], Res) -> + granularity_adapter([], [Sampler | Res]); +granularity_adapter([Sampler1, Sampler2 | Rest], Res) -> + Fun = + fun(Key, M) -> + Value1 = maps:get(Key, Sampler1), + Value2 = maps:get(Key, Sampler2), + M#{Key => Value1 + Value2} + end, + granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]). + +%% ------------------------------------------------------------------------------------------------- +%% timer + sample_timer() -> {NextTime, Remaining} = next_interval(), erlang:send_after(Remaining, self(), {sample, NextTime}). @@ -186,6 +263,9 @@ next_interval() -> Remaining = NextTime - Now, {NextTime, Remaining}. +%% ------------------------------------------------------------------------------------------------- +%% data + sample(Time) -> Fun = fun(Key, Res) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 65fb3a153..e0662c972 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -16,7 +16,9 @@ , fields/1 ]). --export([ monitor/2]). +-export([ monitor/2 + , monitor_current/2 + ]). api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). @@ -24,6 +26,7 @@ api_spec() -> paths() -> [ "/monitor" , "/monitor/nodes/:node" + , "/monitor/current" ]. schema("/monitor") -> @@ -55,19 +58,34 @@ schema("/monitor/nodes/:node") -> 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) } } + }; + +schema("/monitor/current") -> + #{ + 'operationId' => monitor_current, + get => #{ + description => <<"Current monitor data. Gauge and rate">>, + responses => #{ + 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}) + } + } }. fields(sampler) -> Samplers = [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} || SamplerName <- ?SAMPLER_LIST], - [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]. + [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]; + +fields(sampler_current) -> + [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} + || SamplerName <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST]. %% ------------------------------------------------------------------------------------------------- %% API monitor(get, #{query_string := QS, bindings := Bindings}) -> - Latest = maps:get(<<"latest">>, QS, 0), + Latest = maps:get(<<"latest">>, QS, 1000), Node = binary_to_atom(maps:get(node, Bindings, <<"all">>)), case emqx_dashboard_monitor:samplers(Node, Latest) of {badrpc, {Node, Reason}} -> @@ -77,6 +95,16 @@ monitor(get, #{query_string := QS, bindings := Bindings}) -> {200, Samplers} end. +monitor_current(get, #{query_string := QS}) -> + NodeOrCluster = binary_to_atom(maps:get(<<"node">>, QS, <<"all">>), utf8), + case emqx_dashboard_monitor:current_rate(NodeOrCluster) of + {ok, CurrentRate} -> + {200, CurrentRate}; + {badrpc, {Node, Reason}} -> + Message = list_to_binary(io_lib:format("Bad node ~p, rpc failed ~p", [Node, Reason])), + {400, 'BAD_RPC', Message} + end. + %% ------------------------------------------------------------------------------------------------- %% Internal @@ -93,8 +121,17 @@ sampler_desc(routes) -> " Can only represent the approximate state">>; sampler_desc(connections) -> <<"Connections at the time of sampling." - " Can only represent the approximate state">>. + " Can only represent the approximate state">>; + +sampler_desc(received_rate) -> sampler_desc_format("Dropped messages ", per); +sampler_desc(received_bytes_rate) -> sampler_desc_format("Received bytes ", per); +sampler_desc(sent_rate) -> sampler_desc_format("Sent messages ", per); +sampler_desc(sent_bytes_rate) -> sampler_desc_format("Sent bytes ", per); +sampler_desc(dropped_rate) -> sampler_desc_format("Dropped messages ", per). sampler_desc_format(Format) -> + sampler_desc_format(Format, last). + +sampler_desc_format(Format, Type) -> Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL), - list_to_binary(io_lib:format(Format ++ "last ~p seconds", [Interval])). + list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])). From 00b83121a4fada5278293ce97e214806e2509dd2 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 25 Feb 2022 12:44:49 +0800 Subject: [PATCH 7/8] fix(test): add api test SUITE & bug fix --- .../emqx_dashboard/include/emqx_dashboard.hrl | 2 +- .../src/emqx_dashboard_monitor.erl | 88 +++++++++++---- .../src/emqx_dashboard_monitor_api.erl | 91 +++++++++++----- .../src/emqx_dashboard_schema.erl | 1 - .../src/proto/emqx_dashboard_proto_v1.erl | 9 +- .../test/emqx_dashboard_monitor_SUITE.erl | 103 ++++++++++-------- 6 files changed, 188 insertions(+), 106 deletions(-) diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index c8a2f8a1f..2d1d28d2f 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -40,7 +40,7 @@ -define(DASHBOARD_SHARD, emqx_dashboard_shard). -ifdef(TEST). -%% for test, 2s +%% for test -define(DEFAULT_SAMPLE_INTERVAL, 1). -else. %% dashboard monitor do sample interval, default 10s diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 1eeb5b68f..6384405c9 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -51,6 +51,12 @@ %% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds -define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000). +-ifdef(TEST). +-define(RPC_TIMEOUT, 50). +-else. +-define(RPC_TIMEOUT, 5000). +-endif. + -record(state, { last }). @@ -75,20 +81,43 @@ samplers() -> format(do_sample(all, infinity)). samplers(NodeOrCluster, Latest) -> - Now = erlang:system_time(millisecond), - MatchTime = Now - (Latest * 1000), - case format(do_sample(NodeOrCluster, MatchTime)) of + Time = + case Latest of + infinity -> + infinity; + Latest when is_integer(Latest) -> + Now = erlang:system_time(millisecond), + Now - (Latest * 1000) + end, + case format(do_sample(NodeOrCluster, Time)) of {badrpc, Reason} -> {badrpc, Reason}; List when is_list(List) -> granularity_adapter(List) end. -granularity_adapter(List) when length(List) > 100 -> +%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI. +%% granularity_adapter is an oversampling of the samples. +%% Use more granular data and reduce data density. +%% +%% [ +%% Data1 = #{time => T1, k1 => 1, k2 => 2}, +%% Data2 = #{time => T2, k1 => 3, k2 => 4}, +%% ... +%% ] +%% After granularity_adapter, Merge Data1 Data2 +%% +%% [ +%% #{time => T2, k1 => 1 + 3, k2 => 2 + 6}, +%% ... +%% ] +%% +granularity_adapter(List) when length(List) > 1000 -> granularity_adapter(List, []); granularity_adapter(List) -> List. +%% Get the current rate. Not the current sampler data. current_rate() -> Fun = fun(Node, Cluster) -> @@ -99,14 +128,19 @@ current_rate() -> {badrpc, {Node, Reason}} end end, - lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)). + case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running))of + {badrpc, Reason} -> + {badrpc, Reason}; + Rate -> + {ok, Rate} + end. current_rate(all) -> current_rate(); current_rate(Node) when Node == node() -> do_call(current_rate); current_rate(Node) -> - case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000) of + case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], ?RPC_TIMEOUT) of {badrpc, Reason} -> {badrpc, {Node, Reason}}; {ok, Rate} -> @@ -161,27 +195,33 @@ code_change(_OldVsn, State = #state{}, _Extra) -> do_call(Request) -> gen_server:call(?MODULE, Request, 5000). -do_sample(all, MatchTime) -> - Fun = - fun(Node, All) -> - case do_sample(Node, MatchTime) of - {badrpc, Reason} -> - {badrpc, {Node, Reason}}; - NodeSamplers -> - merge_cluster_samplers(NodeSamplers, All) - end - end, - lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)); -do_sample(Node, MatchTime) when Node == node() -> - MS = match_spec(MatchTime), +do_sample(all, Time) -> + do_sample(mria_mnesia:cluster_nodes(running), Time, #{}); +do_sample(Node, Time) when Node == node() -> + MS = match_spec(Time), internal_format(ets:select(?TAB, MS)); -do_sample(Node, MatchTime) -> - rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, MatchTime], 5000). +do_sample(Node, Time) -> + case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Time], ?RPC_TIMEOUT) of + {badrpc, Reason} -> + {badrpc, {Node, Reason}}; + Res -> + Res + end. + +do_sample([], _Time, Res) -> + Res; +do_sample([Node | Nodes], Time, Res) -> + case do_sample(Node, Time) of + {badrpc, Reason} -> + {badrpc, Reason}; + Samplers -> + do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res)) + end. match_spec(infinity) -> - [{'$1',[],['$1']}]; -match_spec(MatchTime) -> - [{{'_', '$1', '_'}, [{'>=', '$1', MatchTime}], ['$_']}]. + [{'$1', [], ['$1']}]; +match_spec(Time) -> + [{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}]. merge_cluster_samplers(Node, Cluster) -> maps:fold(fun merge_cluster_samplers/3, Cluster, Node). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index e0662c972..ee761fa9c 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -26,17 +26,17 @@ api_spec() -> paths() -> [ "/monitor" , "/monitor/nodes/:node" - , "/monitor/current" + , "/monitor_current" + , "/monitor_current/nodes/:node" ]. schema("/monitor") -> #{ 'operationId' => monitor, get => #{ + tags => [dashboard], description => <<"List monitor data.">>, - parameters => [ - {latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})} - ], + parameters => [parameter_latest()], responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) @@ -48,11 +48,9 @@ schema("/monitor/nodes/:node") -> #{ 'operationId' => monitor, get => #{ + tags => [dashboard], description => <<"List the monitor data on the node.">>, - parameters => [ - {node, hoconsc:mk(binary(), #{in => path, nullable => false, example => node()})}, - {latest, hoconsc:mk(integer(), #{in => query, nullable => true, example => 1000})} - ], + parameters => [parameter_node(), parameter_latest()], responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}), 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) @@ -60,25 +58,58 @@ schema("/monitor/nodes/:node") -> } }; -schema("/monitor/current") -> +schema("/monitor_current") -> #{ 'operationId' => monitor_current, get => #{ - description => <<"Current monitor data. Gauge and rate">>, + tags => [dashboard], + description => <<"Current status. Gauge and rate.">>, + responses => #{ + 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}) + } + } + }; + +schema("/monitor_current/nodes/:node") -> + #{ + 'operationId' => monitor_current, + get => #{ + tags => [dashboard], + description => <<"Node current status. Gauge and rate.">>, + parameters => [parameter_node()], responses => #{ 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}) } } }. +parameter_latest() -> + Info = #{ + in => query, + nullable => true, + example => 5 * 60, + description => <<"The latest N seconds data. Like 300 for 5 min.">> + }, + {latest, hoconsc:mk(integer(), Info)}. + +parameter_node() -> + Info = #{ + in => path, + nullable => false, + example => node(), + description => <<"EMQX node name.">> + }, + {node, hoconsc:mk(binary(), Info)}. + + fields(sampler) -> Samplers = - [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} + [{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})} || SamplerName <- ?SAMPLER_LIST], [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]; fields(sampler_current) -> - [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} + [{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})} || SamplerName <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST]. %% ------------------------------------------------------------------------------------------------- @@ -95,8 +126,8 @@ monitor(get, #{query_string := QS, bindings := Bindings}) -> {200, Samplers} end. -monitor_current(get, #{query_string := QS}) -> - NodeOrCluster = binary_to_atom(maps:get(<<"node">>, QS, <<"all">>), utf8), +monitor_current(get, #{bindings := Bindings}) -> + NodeOrCluster = binary_to_atom(maps:get(node, Bindings, <<"all">>), utf8), case emqx_dashboard_monitor:current_rate(NodeOrCluster) of {ok, CurrentRate} -> {200, CurrentRate}; @@ -108,30 +139,30 @@ monitor_current(get, #{query_string := QS}) -> %% ------------------------------------------------------------------------------------------------- %% Internal -sampler_desc(received) -> sampler_desc_format("Received messages "); -sampler_desc(received_bytes) -> sampler_desc_format("Received bytes "); -sampler_desc(sent) -> sampler_desc_format("Sent messages "); -sampler_desc(sent_bytes) -> sampler_desc_format("Sent bytes "); -sampler_desc(dropped) -> sampler_desc_format("Dropped messages "); -sampler_desc(subscriptions) -> +swagger_desc(received) -> swagger_desc_format("Received messages "); +swagger_desc(received_bytes) -> swagger_desc_format("Received bytes "); +swagger_desc(sent) -> swagger_desc_format("Sent messages "); +swagger_desc(sent_bytes) -> swagger_desc_format("Sent bytes "); +swagger_desc(dropped) -> swagger_desc_format("Dropped messages "); +swagger_desc(subscriptions) -> <<"Subscriptions at the time of sampling." " Can only represent the approximate state">>; -sampler_desc(routes) -> +swagger_desc(routes) -> <<"Routes at the time of sampling." " Can only represent the approximate state">>; -sampler_desc(connections) -> +swagger_desc(connections) -> <<"Connections at the time of sampling." " Can only represent the approximate state">>; -sampler_desc(received_rate) -> sampler_desc_format("Dropped messages ", per); -sampler_desc(received_bytes_rate) -> sampler_desc_format("Received bytes ", per); -sampler_desc(sent_rate) -> sampler_desc_format("Sent messages ", per); -sampler_desc(sent_bytes_rate) -> sampler_desc_format("Sent bytes ", per); -sampler_desc(dropped_rate) -> sampler_desc_format("Dropped messages ", per). +swagger_desc(received_rate) -> swagger_desc_format("Dropped messages ", per); +swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per); +swagger_desc(sent_rate) -> swagger_desc_format("Sent messages ", per); +swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per); +swagger_desc(dropped_rate) -> swagger_desc_format("Dropped messages ", per). -sampler_desc_format(Format) -> - sampler_desc_format(Format, last). +swagger_desc_format(Format) -> + swagger_desc_format(Format, last). -sampler_desc_format(Format, Type) -> +swagger_desc_format(Format, Type) -> Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL), list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index efd890bec..f112fc852 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -39,7 +39,6 @@ but use the same port. """ })} , {default_username, fun default_username/1} , {default_password, fun default_password/1} - %% TODO: enum 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s , {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})} , {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})} , {cors, fun cors/1} diff --git a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl index 09c3ce66c..6a4f3fb3a 100644 --- a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl +++ b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl @@ -19,8 +19,7 @@ -behaviour(emqx_bpapi). -export([ introduced_in/0 - - , samplers/1 + , samplers/2 ]). -include("emqx_dashboard.hrl"). @@ -29,6 +28,6 @@ introduced_in() -> "5.0.0". --spec samplers(node()) -> list(map()) | emqx_rpc:badrpc(). -samplers(Node) -> - rpc:call(Node, emqx_dashboard_monitor, samplers, [Node]). +-spec samplers(node(), Latest:: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc(). +samplers(Node, Latest) -> + rpc:call(Node, emqx_dashboard_monitor, samplers, [Node, Latest]). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 43d8d0712..88da67245 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -56,7 +56,7 @@ set_special_configs(_) -> t_monitor_samplers_all(_Config) -> timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), Size = mnesia:table_info(emqx_dashboard_monitor,size), - All = emqx_dashboard_monitor:samplers(all), + All = emqx_dashboard_monitor:samplers(all, infinity), All2 = emqx_dashboard_monitor:samplers(), ?assert(erlang:length(All) == Size), ?assert(erlang:length(All2) == Size), @@ -78,53 +78,66 @@ t_monitor_sampler_format(_Config) -> [?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST], ok. -%% TODO: api test -% t_monitor_api(_) -> -% timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), -% {ok, Samplers} = request(["monitor"]), -% ?assert(erlang:length(Samplers) >= 2), -% Sample = hd(Samplers), -% Fun = -% fun(Sampler) -> -% Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)], -% [?assert(lists:member(SamplerName, Keys)) || SamplerName<- ?SAMPLER_LIST] -% end, -% [Fun(Sampler) || Sampler <- Samplers], -% ok. +t_monitor_api(_) -> + timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, Samplers} = request(["monitor"], "latest=20"), + ?assert(erlang:length(Samplers) >= 2), + Fun = + fun(Sampler) -> + Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)], + [?assert(lists:member(SamplerName, Keys)) || SamplerName <- ?SAMPLER_LIST] + end, + [Fun(Sampler) || Sampler <- Samplers], + {ok, NodeSamplers} = request(["monitor", "nodes", node()]), + [Fun(NodeSampler) || NodeSampler <- NodeSamplers], + ok. -% t_monitor_api_error(_) -> -% {error, _Reason} = request(["monitor_a"]), -% ok. +t_monitor_current_api(_) -> + timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), + {ok, Rate} = request(["monitor_current"]), + [?assert(maps:is_key(atom_to_binary(Key, utf8), Rate)) + || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST], + {ok, NodeRate} = request(["monitor_current", "nodes", node()]), + [?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate)) + || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST], + ok. -% request(Path) -> -% request(Path, ""). +t_monitor_api_error(_) -> + {error, {400, #{<<"code">> := <<"BAD_RPC">>}}} = + request(["monitor", "nodes", 'emqx@127.0.0.2']), + {error, {400, #{<<"code">> := <<"BAD_RPC">>}}} = + request(["monitor_current", "nodes", 'emqx@127.0.0.2']), + ok. -% request(Path, QS) -> -% Url = url(Path, QS), -% case do_request_api(get, {Path, auth_header_()}) of -% {ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])}; -% Error -> Error -% end. +request(Path) -> + request(Path, ""). -% url(Parts, QS)-> -% case QS of -% "" -> -% ?SERVER ++ filename:join([?BASE_PATH | Parts]); -% _ -> -% ?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS -% end. +request(Path, QS) -> + Url = url(Path, QS), + do_request_api(get, {Url, [auth_header_()]}). -% do_request_api(Method, Request)-> -% case httpc:request(Method, Request, [], []) of -% {error, socket_closed_remotely} -> -% {error, socket_closed_remotely}; -% {ok, {{"HTTP/1.1", Code, _}, _, Return} } -% when Code >= 200 andalso Code =< 299 -> -% {ok, emqx_json:decode(Return)}; -% {ok, Resp} -> -% {error, Resp} -% end. +url(Parts, QS)-> + case QS of + "" -> + ?SERVER ++ filename:join([?BASE_PATH | Parts]); + _ -> + ?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS + end. -% auth_header_() -> -% Basic = binary_to_list(base64:encode(<<"admin:public">>)), -% {"Authorization", "Basic " ++ Basic}. +do_request_api(Method, Request)-> + ct:pal("Req ~p ~p~n", [Method, Request]), + case httpc:request(Method, Request, [], []) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } + when Code >= 200 andalso Code =< 299 -> + {ok, emqx_json:decode(Return, [return_maps])}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } -> + {error, {Code, emqx_json:decode(Return, [return_maps])}}; + {error, Reason} -> + {error, Reason} + end. + +auth_header_() -> + Basic = binary_to_list(base64:encode(<<"admin:public">>)), + {"Authorization", "Basic " ++ Basic}. From 103df6f06bb7ab88ed4738cb9e930bc2c9b77f96 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 25 Feb 2022 14:19:40 +0800 Subject: [PATCH 8/8] fix: dashboard monitor bpapi --- apps/emqx_dashboard/include/emqx_dashboard.hrl | 2 ++ apps/emqx_dashboard/src/emqx_dashboard_monitor.erl | 10 ++-------- .../src/emqx_dashboard_monitor_api.erl | 7 ++++--- .../src/proto/emqx_dashboard_proto_v1.erl | 13 +++++++++---- .../test/emqx_dashboard_monitor_SUITE.erl | 2 ++ 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 2d1d28d2f..de0682039 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -42,9 +42,11 @@ -ifdef(TEST). %% for test -define(DEFAULT_SAMPLE_INTERVAL, 1). +-define(RPC_TIMEOUT, 50). -else. %% dashboard monitor do sample interval, default 10s -define(DEFAULT_SAMPLE_INTERVAL, 10). +-define(RPC_TIMEOUT, 5000). -endif. -define(DELTA_SAMPLER_LIST, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 6384405c9..6fa972724 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -51,12 +51,6 @@ %% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds -define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000). --ifdef(TEST). --define(RPC_TIMEOUT, 50). --else. --define(RPC_TIMEOUT, 5000). --endif. - -record(state, { last }). @@ -140,7 +134,7 @@ current_rate(all) -> current_rate(Node) when Node == node() -> do_call(current_rate); current_rate(Node) -> - case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], ?RPC_TIMEOUT) of + case emqx_dashboard_proto_v1:current_rate(Node) of {badrpc, Reason} -> {badrpc, {Node, Reason}}; {ok, Rate} -> @@ -201,7 +195,7 @@ do_sample(Node, Time) when Node == node() -> MS = match_spec(Time), internal_format(ets:select(?TAB, MS)); do_sample(Node, Time) -> - case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Time], ?RPC_TIMEOUT) of + case emqx_dashboard_proto_v1:do_sample(Node, Time) of {badrpc, Reason} -> {badrpc, {Node, Reason}}; Res -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index ee761fa9c..196cf8a16 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -78,7 +78,8 @@ schema("/monitor_current/nodes/:node") -> description => <<"Node current status. Gauge and rate.">>, parameters => [parameter_node()], responses => #{ - 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}) + 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}), + 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) } } }. @@ -86,7 +87,7 @@ schema("/monitor_current/nodes/:node") -> parameter_latest() -> Info = #{ in => query, - nullable => true, + required => false, example => 5 * 60, description => <<"The latest N seconds data. Like 300 for 5 min.">> }, @@ -95,7 +96,7 @@ parameter_latest() -> parameter_node() -> Info = #{ in => path, - nullable => false, + required => true, example => node(), description => <<"EMQX node name.">> }, diff --git a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl index 6a4f3fb3a..bcfa9f0d2 100644 --- a/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl +++ b/apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl @@ -19,7 +19,8 @@ -behaviour(emqx_bpapi). -export([ introduced_in/0 - , samplers/2 + , do_sample/2 + , current_rate/1 ]). -include("emqx_dashboard.hrl"). @@ -28,6 +29,10 @@ introduced_in() -> "5.0.0". --spec samplers(node(), Latest:: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc(). -samplers(Node, Latest) -> - rpc:call(Node, emqx_dashboard_monitor, samplers, [Node, Latest]). +-spec do_sample(node(), Latest:: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc(). +do_sample(Node, Latest) -> + rpc:call(Node, emqx_dashboard_monitor, do_sample, [Node, Latest], ?RPC_TIMEOUT). + +-spec current_rate(node()) -> {ok, map()} | emqx_rpc:badrpc(). +current_rate(Node) -> + rpc:call(Node, emqx_dashboard_monitor, current_rate, [Node], ?RPC_TIMEOUT). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 88da67245..2c44f3041 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -131,8 +131,10 @@ do_request_api(Method, Request)-> {error, socket_closed_remotely}; {ok, {{"HTTP/1.1", Code, _}, _, Return} } when Code >= 200 andalso Code =< 299 -> + ct:pal("Resp ~p ~p~n", [Code, Return]), {ok, emqx_json:decode(Return, [return_maps])}; {ok, {{"HTTP/1.1", Code, _}, _, Return} } -> + ct:pal("Resp ~p ~p~n", [Code, Return]), {error, {Code, emqx_json:decode(Return, [return_maps])}}; {error, Reason} -> {error, Reason}