feat: monitor api, TODO: test suite

This commit is contained in:
DDDHuang 2022-02-23 17:02:26 +08:00
parent 21b9943df9
commit fedfa6c653
7 changed files with 179 additions and 626 deletions

View File

@ -39,7 +39,19 @@
-define(DASHBOARD_SHARD, emqx_dashboard_shard). -define(DASHBOARD_SHARD, emqx_dashboard_shard).
-record(mqtt_collect, { %% 10 seconds
timestamp :: integer(), -define(DEFAULT_SAMPLE_INTERVAL, 10).
collect
}). -define(DELTA_SAMPLER_LIST,
[ received
, received_bytes
, sent
, sent_bytes
, dropped
]).
-define(SAMPLER_LIST,
[ subscriptions
, routes
, connections
] ++ ?DELTA_SAMPLER_LIST).

View File

@ -92,11 +92,12 @@ stop_listeners() ->
%% internal %% internal
apps() -> apps() ->
[App || {App, _, _} <- application:loaded_applications(), [emqx_dashboard].
case re:run(atom_to_list(App), "^emqx") of % [App || {App, _, _} <- application:loaded_applications(),
{match,[{0,4}]} -> true; % case re:run(atom_to_list(App), "^emqx") of
_ -> false % {match,[{0,4}]} -> true;
end]. % _ -> false
% end].
listeners() -> listeners() ->
[begin [begin

View File

@ -1,191 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_dashboard_collection).
-behaviour(gen_server).
-include("emqx_dashboard.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-export([ start_link/0
]).
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([get_collect/0, select_data/0]).
-export([get_universal_epoch/0]).
-boot_mnesia({mnesia, [boot]}).
%% Mnesia bootstrap
-export([mnesia/1]).
-define(APP, emqx_dashboard).
-define(DEFAULT_INTERVAL, 10). %% seconds
-define(COLLECT, {[],[],[]}).
-define(CLEAR_INTERVAL, 86400000).
-define(EXPIRE_INTERVAL, 86400000 * 7).
mnesia(boot) ->
ok = mria:create_table(?TAB_COLLECT, [
{type, set},
{local_content, true},
{storage, disc_copies},
{record_name, mqtt_collect},
{attributes, record_info(fields, mqtt_collect)}]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
-spec select_data() -> [#mqtt_collect{}].
select_data() ->
Time = emqx_dashboard_collection:get_universal_epoch() - 7200000,
ets:select(?TAB_COLLECT, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]).
init([]) ->
timer(next_interval(), collect),
timer(get_today_remaining_seconds(), clear_expire_data),
ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL),
State = #{
count => count(),
expire_interval => ExpireInterval,
collect => ?COLLECT,
temp_collect => {0, 0, 0, 0},
last_collects => {0, 0, 0}
},
{ok, State}.
%% @doc every whole interval seconds;
%% example:
%% interval is 10s
%% now 15:01:07 (or 15:07:01 ~ 15:07:10)
%% next will be 15:01:10, 15:01:20, 15:01:30 ...
%% ensure all counters in cluster have sync time
next_interval() ->
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
interval() ->
emqx_conf:get([dashboard, sample_interval], ?DEFAULT_INTERVAL).
count() ->
60 div interval().
handle_call(get_collect, _From, State = #{temp_collect := {Received, Sent, _, _}}) ->
{reply, {Received, Sent, collect(subscriptions), collect(connections)}, State, hibernate};
handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast(_Req, State) ->
{noreply, State}.
handle_info(collect, State = #{ collect := Collect
, count := 1
, temp_collect := TempCollect
, last_collects := LastCollect}) ->
timer(next_interval(), collect),
NewLastCollect = flush(collect_all(Collect), LastCollect),
TempCollect1 = temp_collect(TempCollect),
{noreply, State#{count => count(),
collect => ?COLLECT,
temp_collect => TempCollect1,
last_collects => NewLastCollect}};
handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) ->
timer(next_interval(), collect),
TempCollect1 = temp_collect(TempCollect),
{noreply, State#{count => Count - 1,
collect => collect_all(Collect),
temp_collect => TempCollect1}, hibernate};
handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) ->
timer(?CLEAR_INTERVAL, clear_expire_data),
T1 = get_universal_epoch(),
Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end),
Collects = ets:select(?TAB_COLLECT, Spec),
lists:foreach(fun(Collect) ->
true = ets:delete_object(?TAB_COLLECT, Collect)
end, Collects),
{noreply, State, hibernate};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
temp_collect({_, _, Received, Sent}) ->
Received1 = collect(received),
Sent1 = collect(sent),
{(Received1 - Received) div interval(),
(Sent1 - Sent) div interval(),
Received1,
Sent1}.
collect_all({Connection, Route, Subscription}) ->
{[collect(connections) | Connection],
[collect(routes) | Route],
[collect(subscriptions) | Subscription]}.
collect(connections) ->
emqx_stats:getstat('connections.count');
collect(routes) ->
emqx_stats:getstat('routes.count');
collect(subscriptions) ->
emqx_stats:getstat('subscriptions.count');
collect(received) ->
emqx_metrics:val('messages.received');
collect(sent) ->
emqx_metrics:val('messages.sent');
collect(dropped) ->
emqx_metrics:val('messages.dropped').
flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
Received = collect(received),
Sent = collect(sent),
Dropped = collect(dropped),
Collect = {avg(Connection),
avg(Route),
avg(Subscription),
diff(Received, Received0),
diff(Sent, Sent0),
diff(Dropped, Dropped0)},
Ts = get_universal_epoch(),
{atomic, ok} = mria:transaction(mria:local_content_shard(),
fun mnesia:write/3,
[ ?TAB_COLLECT
, #mqtt_collect{timestamp = Ts, collect = Collect}
, write]),
{Received, Sent, Dropped}.
avg(Items) ->
lists:sum(Items) div count().
diff(Item0, Item1) ->
Item0 - Item1.
timer(Secs, Msg) ->
erlang:send_after(Secs, self(), Msg).
get_today_remaining_seconds() ->
?CLEAR_INTERVAL - (get_universal_epoch() rem ?CLEAR_INTERVAL).
get_universal_epoch() ->
(calendar:datetime_to_gregorian_seconds(calendar:universal_time()) -
calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})).

View File

@ -16,7 +16,7 @@
-module(emqx_dashboard_monitor). -module(emqx_dashboard_monitor).
-include_lib("stdlib/include/ms_transform.hrl"). -include("emqx_dashboard.hrl").
-behaviour(gen_server). -behaviour(gen_server).
@ -34,15 +34,15 @@
-export([ mnesia/1]). -export([ mnesia/1]).
-export([ samples/0 -export([ samplers/0
, samples/1 , samplers/1
, aggregate_samplers/0 , samplers/2
]). ]).
-define(TAB, ?MODULE). %% for rpc
-export([ do_samples/1]).
%% 10 seconds -define(TAB, ?MODULE).
-define(DEFAULT_INTERVAL, 10).
-ifdef(TEST). -ifdef(TEST).
%% for test %% for test
@ -70,21 +70,6 @@
data :: map() data :: map()
}). }).
-define(DELTA_LIST,
[ received
, received_bytes
, sent
, sent_bytes
, dropped
]).
-define(SAMPLER_LIST,
[ subscriptions
, routes
, connections
] ++ ?DELTA_LIST).
mnesia(boot) -> mnesia(boot) ->
ok = mria:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, set}, {type, set},
@ -93,17 +78,26 @@ mnesia(boot) ->
{record_name, emqx_monit}, {record_name, emqx_monit},
{attributes, record_info(fields, emqx_monit)}]). {attributes, record_info(fields, emqx_monit)}]).
aggregate_samplers() -> samplers() ->
[#{node => Node, data => samples(Node)} || Node <- mria_mnesia:cluster_nodes(running)]. samplers(all).
samples() -> samplers(NodeOrCluster) ->
All = [samples(Node) || Node <- mria_mnesia:cluster_nodes(running)], format(do_samples(NodeOrCluster)).
lists:foldl(fun merge_cluster_samplers/2, #{}, All).
samples(Node) when Node == node() -> samplers(NodeOrCluster, 0) ->
get_data(?DEFAULT_GET_DATA_TIME); samplers(NodeOrCluster);
samples(Node) -> samplers(NodeOrCluster, Latest) ->
rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node]). case samplers(NodeOrCluster) of
{badrpc, Reason} ->
{badrpc, Reason};
List when is_list(List) ->
case erlang:length(List) - Latest of
Start when Start > 0 ->
lists:sublist(List, Start, Latest);
_ ->
List
end
end.
%%%=================================================================== %%%===================================================================
%%% gen_server functions %%% gen_server functions
@ -147,6 +141,43 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
do_samples(all) ->
Fun =
fun(Node, All) ->
case do_samples(Node) of
{badrpc, Reason} ->
{badrpc, {Node, Reason}};
NodeSamplers ->
merge_cluster_samplers(NodeSamplers, All)
end
end,
lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running));
do_samples(Node) when Node == node() ->
get_data(?DEFAULT_GET_DATA_TIME);
do_samples(Node) ->
rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000).
merge_cluster_samplers(Node, Cluster) ->
maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
merge_cluster_samplers(TS, NodeData, Cluster) ->
case maps:get(TS, Cluster, undefined) of
undefined ->
Cluster#{TS => NodeData};
ClusterData ->
Cluster#{TS => count_map(NodeData, ClusterData)}
end.
format({badrpc, Reason}) ->
{badrpc, Reason};
format(Data) ->
All = maps:fold(fun format/3, [], Data),
Compare = fun(#{time_stamp := T1}, #{time_stamp := T2}) -> T1 =< T2 end,
lists:sort(Compare, All).
format(TimeStamp, Data, All) ->
[Data#{time_stamp => TimeStamp} | All].
sample_timer() -> sample_timer() ->
{NextTime, Remaining} = next_interval(), {NextTime, Remaining} = next_interval(),
erlang:send_after(Remaining, self(), {sample, NextTime}). erlang:send_after(Remaining, self(), {sample, NextTime}).
@ -160,7 +191,7 @@ clean_timer() ->
%% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ... %% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ...
%% Ensure that the monitor data of all nodes in the cluster are aligned in time %% Ensure that the monitor data of all nodes in the cluster are aligned in time
next_interval() -> next_interval() ->
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_INTERVAL) * 1000, Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000,
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
NextTime = ((Now div Interval) + 1) * Interval, NextTime = ((Now div Interval) + 1) * Interval,
Remaining = NextTime - Now, Remaining = NextTime - Now,
@ -186,7 +217,7 @@ delta(LastData, NowData) ->
Value = maps:get(Key, NowData) - maps:get(Key, LastData), Value = maps:get(Key, NowData) - maps:get(Key, LastData),
Data#{Key => Value} Data#{Key => Value}
end, end,
lists:foldl(Fun, NowData, ?DELTA_LIST). lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST).
store(MonitData) -> store(MonitData) ->
{atomic, ok} = {atomic, ok} =
@ -204,28 +235,18 @@ clean() ->
get_data(PastTime) -> get_data(PastTime) ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
ExpiredMS = [{{'_', '$1', '_'}, [{'<', {'-', Now, '$1'}, PastTime}], ['$_']}], ExpiredMS = [{{'_', '$1', '_'}, [{'<', {'-', Now, '$1'}, PastTime}], ['$_']}],
format(ets:select(?TAB, ExpiredMS)). internal_format(ets:select(?TAB, ExpiredMS)).
format(List) when is_list(List) -> %% To make it easier to do data aggregation
internal_format(List) when is_list(List) ->
Fun = Fun =
fun(Data, All) -> fun(Data, All) ->
maps:merge(format(Data), All) maps:merge(internal_format(Data), All)
end, end,
lists:foldl(Fun, #{}, List); lists:foldl(Fun, #{}, List);
format(#emqx_monit{time = Time, data = Data}) -> internal_format(#emqx_monit{time = Time, data = Data}) ->
#{Time => Data}. #{Time => Data}.
merge_cluster_samplers(Node, Cluster) ->
maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
merge_cluster_samplers(TS, NodeData, Cluster) ->
case maps:get(TS, Cluster, undefined) of
undefined ->
Cluster#{TS => NodeData};
ClusterData ->
Cluster#{TS => count_map(NodeData, ClusterData)}
end.
count_map(M1, M2) -> count_map(M1, M2) ->
Fun = Fun =
fun(Key, Map) -> fun(Key, Map) ->

View File

@ -5,308 +5,105 @@
-module(emqx_dashboard_monitor_api). -module(emqx_dashboard_monitor_api).
-include("emqx_dashboard.hrl"). -include("emqx_dashboard.hrl").
-include_lib("typerefl/include/types.hrl").
-behaviour(minirest_api). -behaviour(minirest_api).
-import(emqx_mgmt_util, [schema/2]). -export([ api_spec/0]).
-export([api_spec/0]).
-export([ monitor/2 -export([ paths/0
, counters/2 , schema/1
, monitor_nodes/2 , fields/1
, monitor_nodes_counters/2
, current_counters/2
]). ]).
-export([ sampling/1 -export([ monitor/2]).
, sampling/2
]).
-define(COUNTERS, [ connection -define(SAMPLERS,
, route [ connection
, subscriptions , route
, received , subscriptions
, sent , received
, dropped]). , sent
, dropped
-define(EMPTY_COLLECTION, {0, 0, 0, 0}). ]).
api_spec() -> api_spec() ->
{[ monitor_api() emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
, monitor_nodes_api()
, monitor_nodes_counters_api()
, monitor_counters_api()
, monitor_current_api()
],
[]}.
monitor_api() -> paths() ->
Metadata = #{ [ "/monitor"
, "/monitor/nodes/:node"
].
schema("/monitor") ->
#{
'operationId' => monitor,
get => #{ get => #{
description => <<"List monitor data">>, description => <<"List monitor data.">>,
parameters => [ parameters => [
#{ {latest, hoconsc:mk(integer(), #{in => query, required => false, example => 1000})}
name => aggregate,
in => query,
required => false,
schema => #{type => boolean}
}
], ],
responses => #{ responses => #{
<<"200">> => schema(counters_schema(), <<"Monitor count data">>)}}}, 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
{"/monitor", Metadata, monitor}. 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
}
monitor_nodes_api() -> }
Metadata = #{
get => #{
description => <<"List monitor data">>,
parameters => [path_param_node()],
responses => #{
<<"200">> => schema(counters_schema(), <<"Monitor count data in node">>)}}},
{"/monitor/nodes/:node", Metadata, monitor_nodes}.
monitor_nodes_counters_api() ->
Metadata = #{
get => #{
description => <<"List monitor data">>,
parameters => [
path_param_node(),
path_param_counter()
],
responses => #{
<<"200">> => schema(counter_schema(), <<"Monitor single count data in node">>)}}},
{"/monitor/nodes/:node/counters/:counter", Metadata, monitor_nodes_counters}.
monitor_counters_api() ->
Metadata = #{
get => #{
description => <<"List monitor data">>,
parameters => [
path_param_counter()
],
responses => #{
<<"200">> =>
schema(counter_schema(), <<"Monitor single count data">>)}}},
{"/monitor/counters/:counter", Metadata, counters}.
monitor_current_api() ->
Metadata = #{
get => #{
description => <<"Current monitor data">>,
responses => #{
<<"200">> => schema(current_counters_schema(), <<"Current monitor data">>)}}},
{"/monitor/current", Metadata, current_counters}.
path_param_node() ->
#{
name => node,
in => path,
required => true,
schema => #{type => string},
example => node()
}.
path_param_counter() ->
#{
name => counter,
in => path,
required => true,
schema => #{type => string, enum => ?COUNTERS},
example => hd(?COUNTERS)
}.
current_counters_schema() ->
#{
type => object,
properties => #{
connection => #{type => integer},
sent => #{type => integer},
received => #{type => integer},
subscription => #{type => integer}}
}.
counters_schema() ->
Fun =
fun(K, M) ->
maps:merge(M, counters_schema(K))
end,
Properties = lists:foldl(Fun, #{}, ?COUNTERS),
#{
type => object,
properties => Properties
}.
counters_schema(Name) ->
#{Name => counter_schema()}.
counter_schema() ->
#{
type => array,
items => #{
type => object,
properties => #{
timestamp => #{
type => integer,
description => <<"Millisecond">>},
count => #{
type => integer}}}}.
%%%==============================================================================================
%% parameters trans
monitor(get, #{query_string := Qs}) ->
Aggregate = maps:get(<<"aggregate">>, Qs, <<"false">>),
{200, list_collect(Aggregate)}.
monitor_nodes(get, #{bindings := #{node := Node}}) ->
lookup([{<<"node">>, Node}]).
monitor_nodes_counters(get, #{bindings := #{node := Node, counter := Counter}}) ->
lookup([{<<"node">>, Node}, {<<"counter">>, Counter}]).
counters(get, #{bindings := #{counter := Counter}}) ->
lookup([{<<"counter">>, Counter}]).
current_counters(get, _Params) ->
Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()],
Nodes = length(mria_mnesia:running_nodes()),
{Received, Sent, Sub, Conn} = format_current_metrics(Data),
Response = #{
nodes => Nodes,
received => Received,
sent => Sent,
subscription => Sub,
connection => Conn
},
{200, Response}.
format_current_metrics(Collects) ->
format_current_metrics(Collects, ?EMPTY_COLLECTION).
format_current_metrics([], Acc) ->
Acc;
format_current_metrics([{Received, Sent, Sub, Conn} | Collects],
{Received1, Sent1, Sub1, Conn1}) ->
format_current_metrics(Collects,
{Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
%%%==============================================================================================
%% api apply
lookup(Params) ->
Fun =
fun({K,V}, M) ->
maps:put(binary_to_atom(K, utf8), binary_to_atom(V, utf8), M)
end,
lookup_(lists:foldl(Fun, #{}, Params)).
lookup_(#{node := Node, counter := Counter}) ->
Data = hd(maps:values(sampling(Node, Counter))),
{200, Data};
lookup_(#{node := Node}) ->
{200, sampling(Node)};
lookup_(#{counter := Counter}) ->
CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]),
Data = hd(maps:values(CounterData)),
{200, Data}.
list_collect(Aggregate) ->
case Aggregate of
<<"true">> ->
[maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()];
_ ->
Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()],
merger_counters(Counters)
end.
get_collect(Node) ->
case emqx_dashboard_proto_v1:get_collect(Node) of
{badrpc, _Reason} -> ?EMPTY_COLLECTION;
Res -> Res
end.
merger_counters(ClusterCounters) ->
lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters).
merger_node_counters(NodeCounters, Counters) ->
maps:fold(fun merger_counter/3, Counters, NodeCounters).
merger_counter(Key, Counters, Res) ->
case maps:get(Key, Res, undefined) of
undefined ->
Res#{Key => Counters};
OldCounters ->
NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters),
Res#{Key => NCounters}
end.
merger_counter(#{timestamp := Timestamp, count := Value}, Counters) ->
Comparison =
fun(Counter) ->
case maps:get(timestamp, Counter) =:= Timestamp of
true ->
Count = maps:get(count, Counter),
{ok, Counter#{count => Count + Value}};
false ->
ignore
end
end,
key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}).
key_replace(List, Comparison, Default) ->
key_replace(List, List, Comparison, Default).
key_replace([], All, _Comparison, Default) ->
[Default | All];
key_replace([Term | List], All, Comparison, Default) ->
case Comparison(Term) of
{ok, NTerm} ->
Tail = [NTerm | List],
Header = lists:sublist(All, length(All) - length(Tail)),
lists:append(Header, Tail);
_ ->
key_replace(List, All, Comparison, Default)
end.
sampling(Node) ->
Data = emqx_dashboard_proto_v1:select_data(Node),
format(lists:sort(Data)).
sampling(Node, Counter) ->
Data = emqx_dashboard_proto_v1:select_data(Node),
format_single(lists:sort(Data), Counter).
format(Collects) ->
format(Collects, {[],[],[],[],[],[]}).
format([], {Connection, Route, Subscription, Received, Sent, Dropped}) ->
#{
connection => add_key(Connection),
route => add_key(Route),
subscriptions => add_key(Subscription),
received => add_key(Received),
sent => add_key(Sent),
dropped => add_key(Dropped)
}; };
format([#mqtt_collect{timestamp = Ts, collect = {C, R, S, Re, S1, D}} | Collects], schema("/monitor/nodes/:node") ->
{Connection, Route, Subscription, Received, Sent, Dropped}) -> #{
format(Collects, {[[Ts, C] | Connection], 'operationId' => monitor,
[[Ts, R] | Route], get => #{
[[Ts, S] | Subscription], description => <<"List the monitor data on the node.">>,
[[Ts, Re] | Received], parameters => [
[[Ts, S1] | Sent], {node, hoconsc:mk(binary(), #{in => path, required => true, example => node()})},
[[Ts, D] | Dropped]}). {latest, hoconsc:mk(integer(), #{in => query, required => false, example => 1000})}
add_key(Collects) -> ],
lists:reverse([#{timestamp => Ts * 1000, count => C} || [Ts, C] <- Collects]). responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
}
}
}.
format_single(Collects, Counter) -> fields(sampler) ->
#{Counter => format_single(Collects, counter_index(Counter), [])}. Samplers =
format_single([], _Index, Acc) -> [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})}
lists:reverse(Acc); || SamplerName <- ?SAMPLER_LIST],
format_single([#mqtt_collect{timestamp = Ts, collect = Collect} | Collects], Index, Acc) -> [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers].
format_single(Collects, Index,
[#{timestamp => Ts * 1000, count => erlang:element(Index, Collect)} | Acc]).
counter_index(connection) -> 1; %% -------------------------------------------------------------------------------------------------
counter_index(route) -> 2; %% API
counter_index(subscriptions) -> 3;
counter_index(received) -> 4; monitor(get, #{query_string := QS, bindings := Bindings}) ->
counter_index(sent) -> 5; Latest = maps:get(<<"latest">>, QS, 0),
counter_index(dropped) -> 6. Node = binary_to_atom(maps:get(node, Bindings, <<"all">>)),
case emqx_dashboard_monitor:samplers(Node, Latest) of
{badrpc, {Node, Reason}} ->
Message = list_to_binary(io_lib:format("Bad node ~p, rpc failed ~p", [Node, Reason])),
{400, 'BAD_RPC', Message};
Samplers ->
{200, Samplers}
end.
%% -------------------------------------------------------------------------------------------------
%% Internal
sampler_desc(received) -> sampler_desc_format("Received messages ");
sampler_desc(received_bytes) -> sampler_desc_format("Received bytes ");
sampler_desc(sent) -> sampler_desc_format("Sent messages ");
sampler_desc(sent_bytes) -> sampler_desc_format("Sent bytes ");
sampler_desc(dropped) -> sampler_desc_format("Dropped messages ");
sampler_desc(subscriptions) ->
<<"Subscriptions at the time of sampling."
" Can only represent the approximate state">>;
sampler_desc(routes) ->
<<"Routes at the time of sampling."
" Can only represent the approximate state">>;
sampler_desc(connections) ->
<<"Connections at the time of sampling."
" Can only represent the approximate state">>.
sampler_desc_format(Format) ->
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
list_to_binary(io_lib:format(Format ++ "last ~p seconds", [Interval])).

View File

@ -20,8 +20,7 @@
-export([ introduced_in/0 -export([ introduced_in/0
, get_collect/1 , samplers/1
, select_data/1
]). ]).
-include("emqx_dashboard.hrl"). -include("emqx_dashboard.hrl").
@ -30,11 +29,6 @@
introduced_in() -> introduced_in() ->
"5.0.0". "5.0.0".
-spec get_collect(node()) -> _. -spec samplers(node()) -> list(map()) | emqx_rpc:badrpc().
get_collect(Node) -> samplers(Node) ->
rpc:call(Node, emqx_dashboard_collection, get_collect, []). rpc:call(Node, emqx_dashboard_monitor, samplers, [Node]).
-spec select_data(node()) -> [#mqtt_collect{}]
| emqx_rpc:badrpc().
select_data(Node) ->
rpc:call(Node, emqx_dashboard_collection, select_data, []).

View File

@ -27,95 +27,14 @@
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_testcase(t_badrpc_collect, Config) ->
Cluster = cluster_specs(2),
Apps = [emqx_modules, emqx_dashboard],
Nodes = [N1, N2] = lists:map(fun(Spec) -> start_slave(Spec, Apps) end, Cluster),
%% form the cluster
ok = rpc:call(N2, mria, join, [N1]),
%% Wait until all nodes are healthy:
[rpc:call(Node, mria_rlog, wait_for_shards, [[?DASHBOARD_SHARD], 5000])
|| Node <- Nodes],
[ {nodes, Nodes}
, {apps, Apps}
| Config];
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
Config. Config.
end_per_testcase(t_badrpc_collect, Config) ->
Apps = ?config(apps, Config),
Nodes = ?config(nodes, Config),
lists:foreach(fun(Node) -> stop_slave(Node, Apps) end, Nodes),
ok;
end_per_testcase(_, _Config) -> end_per_testcase(_, _Config) ->
ok. ok.
t_badrpc_collect(Config) ->
[N1, N2] = ?config(nodes, Config),
%% simulate badrpc on one node
ok = rpc:call(N2, meck, new, [emqx_dashboard_collection, [no_history, no_link]]),
%% we don't mock the `emqx_dashboard_collection:get_collect/0' to
%% provoke the `badrpc' error.
?assertMatch(
{200, #{nodes := 2}},
rpc:call(N1, emqx_dashboard_monitor_api, current_counters, [get, #{}])),
ok = rpc:call(N2, meck, unload, [emqx_dashboard_collection]),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
cluster_specs(NumNodes) ->
BaseGenRpcPort = 9000,
Specs0 = [#{ name => node_name(N)
, num => N
}
|| N <- lists:seq(1, NumNodes)],
GenRpcPorts = maps:from_list([{node_id(Name), {tcp, BaseGenRpcPort + N}}
|| #{name := Name, num := N} <- Specs0]),
[ Spec#{env => [ {gen_rpc, tcp_server_port, BaseGenRpcPort + N}
, {gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
]}
|| Spec = #{num := N} <- Specs0].
node_name(N) ->
list_to_atom("n" ++ integer_to_list(N)).
node_id(Name) ->
list_to_atom(lists:concat([Name, "@", host()])).
start_slave(Spec = #{ name := Name}, Apps) ->
CommonBeamOpts = "+S 1:1 ", % We want VMs to only occupy a single core
{ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()),
setup_node(Node, Spec, Apps),
Node.
stop_slave(Node, Apps) ->
ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps]),
slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch.
setenv(Node, Env) ->
[rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env].
setup_node(Node, _Spec = #{env := Env}, Apps) ->
%% load these before starting ekka and such
[rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_conf, emqx]],
setenv(Node, Env),
EnvHandler =
fun(emqx) ->
application:set_env(emqx, boot_modules, [router, broker]);
(_) ->
ok
end,
ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps, EnvHandler]),
ok.