commit
e8f92b422c
|
@ -6,6 +6,7 @@ dashboard {
|
|||
default_username = "admin"
|
||||
default_password = "public"
|
||||
## notice: sample_interval should be divisible by 60.
|
||||
## like 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s
|
||||
sample_interval = 10s
|
||||
## api jwt timeout. default is 30 minute
|
||||
token_expired_time = 60m
|
||||
|
|
|
@ -39,7 +39,36 @@
|
|||
|
||||
-define(DASHBOARD_SHARD, emqx_dashboard_shard).
|
||||
|
||||
-record(mqtt_collect, {
|
||||
timestamp :: integer(),
|
||||
collect
|
||||
-ifdef(TEST).
|
||||
%% for test
|
||||
-define(DEFAULT_SAMPLE_INTERVAL, 1).
|
||||
-define(RPC_TIMEOUT, 50).
|
||||
-else.
|
||||
%% dashboard monitor do sample interval, default 10s
|
||||
-define(DEFAULT_SAMPLE_INTERVAL, 10).
|
||||
-define(RPC_TIMEOUT, 5000).
|
||||
-endif.
|
||||
|
||||
-define(DELTA_SAMPLER_LIST,
|
||||
[ received
|
||||
, received_bytes
|
||||
, sent
|
||||
, sent_bytes
|
||||
, dropped
|
||||
]).
|
||||
|
||||
-define(GAUGE_SAMPLER_LIST,
|
||||
[ subscriptions
|
||||
, routes
|
||||
, connections
|
||||
]).
|
||||
|
||||
-define(SAMPLER_LIST, ?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST).
|
||||
|
||||
-define(DELTA_SAMPLER_RATE_MAP, #{
|
||||
received => received_rate,
|
||||
received_bytes => received_bytes_rate,
|
||||
sent => sent_rate,
|
||||
sent_bytes => sent_bytes_rate,
|
||||
dropped => dropped_rate
|
||||
}).
|
||||
|
|
|
@ -1,191 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_dashboard_collection).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx_dashboard.hrl").
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
|
||||
-export([ start_link/0
|
||||
]).
|
||||
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-export([get_collect/0, select_data/0]).
|
||||
|
||||
-export([get_universal_epoch/0]).
|
||||
|
||||
-boot_mnesia({mnesia, [boot]}).
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
||||
-define(APP, emqx_dashboard).
|
||||
|
||||
-define(DEFAULT_INTERVAL, 10). %% seconds
|
||||
|
||||
-define(COLLECT, {[],[],[]}).
|
||||
|
||||
-define(CLEAR_INTERVAL, 86400000).
|
||||
|
||||
-define(EXPIRE_INTERVAL, 86400000 * 7).
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = mria:create_table(?TAB_COLLECT, [
|
||||
{type, set},
|
||||
{local_content, true},
|
||||
{storage, disc_copies},
|
||||
{record_name, mqtt_collect},
|
||||
{attributes, record_info(fields, mqtt_collect)}]).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
|
||||
|
||||
-spec select_data() -> [#mqtt_collect{}].
|
||||
select_data() ->
|
||||
Time = emqx_dashboard_collection:get_universal_epoch() - 7200000,
|
||||
ets:select(?TAB_COLLECT, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]).
|
||||
|
||||
init([]) ->
|
||||
timer(next_interval(), collect),
|
||||
timer(get_today_remaining_seconds(), clear_expire_data),
|
||||
ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL),
|
||||
State = #{
|
||||
count => count(),
|
||||
expire_interval => ExpireInterval,
|
||||
collect => ?COLLECT,
|
||||
temp_collect => {0, 0, 0, 0},
|
||||
last_collects => {0, 0, 0}
|
||||
},
|
||||
{ok, State}.
|
||||
|
||||
%% @doc every whole interval seconds;
|
||||
%% example:
|
||||
%% interval is 10s
|
||||
%% now 15:01:07 (or 15:07:01 ~ 15:07:10)
|
||||
%% next will be 15:01:10, 15:01:20, 15:01:30 ...
|
||||
%% ensure all counters in cluster have sync time
|
||||
next_interval() ->
|
||||
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
|
||||
|
||||
interval() ->
|
||||
emqx_conf:get([dashboard, sample_interval], ?DEFAULT_INTERVAL).
|
||||
|
||||
count() ->
|
||||
60 div interval().
|
||||
|
||||
handle_call(get_collect, _From, State = #{temp_collect := {Received, Sent, _, _}}) ->
|
||||
{reply, {Received, Sent, collect(subscriptions), collect(connections)}, State, hibernate};
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
handle_cast(_Req, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(collect, State = #{ collect := Collect
|
||||
, count := 1
|
||||
, temp_collect := TempCollect
|
||||
, last_collects := LastCollect}) ->
|
||||
timer(next_interval(), collect),
|
||||
NewLastCollect = flush(collect_all(Collect), LastCollect),
|
||||
TempCollect1 = temp_collect(TempCollect),
|
||||
{noreply, State#{count => count(),
|
||||
collect => ?COLLECT,
|
||||
temp_collect => TempCollect1,
|
||||
last_collects => NewLastCollect}};
|
||||
|
||||
handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) ->
|
||||
timer(next_interval(), collect),
|
||||
TempCollect1 = temp_collect(TempCollect),
|
||||
{noreply, State#{count => Count - 1,
|
||||
collect => collect_all(Collect),
|
||||
temp_collect => TempCollect1}, hibernate};
|
||||
|
||||
handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) ->
|
||||
timer(?CLEAR_INTERVAL, clear_expire_data),
|
||||
T1 = get_universal_epoch(),
|
||||
Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end),
|
||||
Collects = ets:select(?TAB_COLLECT, Spec),
|
||||
lists:foreach(fun(Collect) ->
|
||||
true = ets:delete_object(?TAB_COLLECT, Collect)
|
||||
end, Collects),
|
||||
{noreply, State, hibernate};
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
temp_collect({_, _, Received, Sent}) ->
|
||||
Received1 = collect(received),
|
||||
Sent1 = collect(sent),
|
||||
{(Received1 - Received) div interval(),
|
||||
(Sent1 - Sent) div interval(),
|
||||
Received1,
|
||||
Sent1}.
|
||||
|
||||
collect_all({Connection, Route, Subscription}) ->
|
||||
{[collect(connections) | Connection],
|
||||
[collect(routes) | Route],
|
||||
[collect(subscriptions) | Subscription]}.
|
||||
|
||||
collect(connections) ->
|
||||
emqx_stats:getstat('connections.count');
|
||||
collect(routes) ->
|
||||
emqx_stats:getstat('routes.count');
|
||||
collect(subscriptions) ->
|
||||
emqx_stats:getstat('subscriptions.count');
|
||||
collect(received) ->
|
||||
emqx_metrics:val('messages.received');
|
||||
collect(sent) ->
|
||||
emqx_metrics:val('messages.sent');
|
||||
collect(dropped) ->
|
||||
emqx_metrics:val('messages.dropped').
|
||||
|
||||
flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
|
||||
Received = collect(received),
|
||||
Sent = collect(sent),
|
||||
Dropped = collect(dropped),
|
||||
Collect = {avg(Connection),
|
||||
avg(Route),
|
||||
avg(Subscription),
|
||||
diff(Received, Received0),
|
||||
diff(Sent, Sent0),
|
||||
diff(Dropped, Dropped0)},
|
||||
Ts = get_universal_epoch(),
|
||||
{atomic, ok} = mria:transaction(mria:local_content_shard(),
|
||||
fun mnesia:write/3,
|
||||
[ ?TAB_COLLECT
|
||||
, #mqtt_collect{timestamp = Ts, collect = Collect}
|
||||
, write]),
|
||||
{Received, Sent, Dropped}.
|
||||
|
||||
avg(Items) ->
|
||||
lists:sum(Items) div count().
|
||||
|
||||
diff(Item0, Item1) ->
|
||||
Item0 - Item1.
|
||||
|
||||
timer(Secs, Msg) ->
|
||||
erlang:send_after(Secs, self(), Msg).
|
||||
|
||||
get_today_remaining_seconds() ->
|
||||
?CLEAR_INTERVAL - (get_universal_epoch() rem ?CLEAR_INTERVAL).
|
||||
|
||||
get_universal_epoch() ->
|
||||
(calendar:datetime_to_gregorian_seconds(calendar:universal_time()) -
|
||||
calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})).
|
|
@ -0,0 +1,362 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_dashboard_monitor).
|
||||
|
||||
-include("emqx_dashboard.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-boot_mnesia({mnesia, [boot]}).
|
||||
|
||||
-export([ start_link/0]).
|
||||
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-export([ mnesia/1]).
|
||||
|
||||
-export([ samplers/0
|
||||
, samplers/2
|
||||
, current_rate/0
|
||||
, current_rate/1
|
||||
, granularity_adapter/1
|
||||
]).
|
||||
|
||||
%% for rpc
|
||||
-export([ do_sample/2]).
|
||||
|
||||
-define(TAB, ?MODULE).
|
||||
|
||||
%% 1 hour = 60 * 60 * 1000 milliseconds
|
||||
-define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000).
|
||||
%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds
|
||||
-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000).
|
||||
|
||||
-record(state, {
|
||||
last
|
||||
}).
|
||||
|
||||
-record(emqx_monit, {
|
||||
time :: integer(),
|
||||
data :: map()
|
||||
}).
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = mria:create_table(?TAB, [
|
||||
{type, set},
|
||||
{local_content, true},
|
||||
{storage, disc_copies},
|
||||
{record_name, emqx_monit},
|
||||
{attributes, record_info(fields, emqx_monit)}]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% API
|
||||
|
||||
samplers() ->
|
||||
format(do_sample(all, infinity)).
|
||||
|
||||
samplers(NodeOrCluster, Latest) ->
|
||||
Time =
|
||||
case Latest of
|
||||
infinity ->
|
||||
infinity;
|
||||
Latest when is_integer(Latest) ->
|
||||
Now = erlang:system_time(millisecond),
|
||||
Now - (Latest * 1000)
|
||||
end,
|
||||
case format(do_sample(NodeOrCluster, Time)) of
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, Reason};
|
||||
List when is_list(List) ->
|
||||
granularity_adapter(List)
|
||||
end.
|
||||
|
||||
%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI.
|
||||
%% granularity_adapter is an oversampling of the samples.
|
||||
%% Use more granular data and reduce data density.
|
||||
%%
|
||||
%% [
|
||||
%% Data1 = #{time => T1, k1 => 1, k2 => 2},
|
||||
%% Data2 = #{time => T2, k1 => 3, k2 => 4},
|
||||
%% ...
|
||||
%% ]
|
||||
%% After granularity_adapter, Merge Data1 Data2
|
||||
%%
|
||||
%% [
|
||||
%% #{time => T2, k1 => 1 + 3, k2 => 2 + 6},
|
||||
%% ...
|
||||
%% ]
|
||||
%%
|
||||
granularity_adapter(List) when length(List) > 1000 ->
|
||||
granularity_adapter(List, []);
|
||||
granularity_adapter(List) ->
|
||||
List.
|
||||
|
||||
%% Get the current rate. Not the current sampler data.
|
||||
current_rate() ->
|
||||
Fun =
|
||||
fun(Node, Cluster) ->
|
||||
case current_rate(Node) of
|
||||
{ok, CurrentRate} ->
|
||||
merge_cluster_rate(CurrentRate, Cluster);
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, {Node, Reason}}
|
||||
end
|
||||
end,
|
||||
case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running))of
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, Reason};
|
||||
Rate ->
|
||||
{ok, Rate}
|
||||
end.
|
||||
|
||||
current_rate(all) ->
|
||||
current_rate();
|
||||
current_rate(Node) when Node == node() ->
|
||||
do_call(current_rate);
|
||||
current_rate(Node) ->
|
||||
case emqx_dashboard_proto_v1:current_rate(Node) of
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, {Node, Reason}};
|
||||
{ok, Rate} ->
|
||||
{ok, Rate}
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% gen_server functions
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
sample_timer(),
|
||||
clean_timer(),
|
||||
{ok, #state{last = undefined}}.
|
||||
|
||||
handle_call(current_rate, _From, State = #state{last = Last}) ->
|
||||
NowTime = erlang:system_time(millisecond),
|
||||
NowSamplers = sample(NowTime),
|
||||
Rate = cal_rate(NowSamplers, Last),
|
||||
{reply, {ok, Rate}, State};
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({sample, Time}, State = #state{last = Last}) ->
|
||||
Now = sample(Time),
|
||||
{atomic, ok} = flush(Last, Now),
|
||||
sample_timer(),
|
||||
{noreply, State#state{last = Now}};
|
||||
|
||||
handle_info(clean_expired, State) ->
|
||||
clean(),
|
||||
clean_timer(),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
|
||||
do_call(Request) ->
|
||||
gen_server:call(?MODULE, Request, 5000).
|
||||
|
||||
do_sample(all, Time) ->
|
||||
do_sample(mria_mnesia:cluster_nodes(running), Time, #{});
|
||||
do_sample(Node, Time) when Node == node() ->
|
||||
MS = match_spec(Time),
|
||||
internal_format(ets:select(?TAB, MS));
|
||||
do_sample(Node, Time) ->
|
||||
case emqx_dashboard_proto_v1:do_sample(Node, Time) of
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, {Node, Reason}};
|
||||
Res ->
|
||||
Res
|
||||
end.
|
||||
|
||||
do_sample([], _Time, Res) ->
|
||||
Res;
|
||||
do_sample([Node | Nodes], Time, Res) ->
|
||||
case do_sample(Node, Time) of
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, Reason};
|
||||
Samplers ->
|
||||
do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res))
|
||||
end.
|
||||
|
||||
match_spec(infinity) ->
|
||||
[{'$1', [], ['$1']}];
|
||||
match_spec(Time) ->
|
||||
[{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
|
||||
|
||||
merge_cluster_samplers(Node, Cluster) ->
|
||||
maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
|
||||
|
||||
merge_cluster_samplers(TS, NodeData, Cluster) ->
|
||||
case maps:get(TS, Cluster, undefined) of
|
||||
undefined ->
|
||||
Cluster#{TS => NodeData};
|
||||
ClusterData ->
|
||||
Cluster#{TS => count_map(NodeData, ClusterData)}
|
||||
end.
|
||||
|
||||
merge_cluster_rate(Node, Cluster) ->
|
||||
Fun =
|
||||
fun(Key, Value, NCluster) ->
|
||||
ClusterValue = maps:get(Key, NCluster, 0),
|
||||
NCluster#{Key => Value + ClusterValue}
|
||||
end,
|
||||
maps:fold(Fun, Cluster, Node).
|
||||
|
||||
format({badrpc, Reason}) ->
|
||||
{badrpc, Reason};
|
||||
format(Data) ->
|
||||
All = maps:fold(fun format/3, [], Data),
|
||||
Compare = fun(#{time_stamp := T1}, #{time_stamp := T2}) -> T1 =< T2 end,
|
||||
lists:sort(Compare, All).
|
||||
|
||||
format(TimeStamp, Data, All) ->
|
||||
[Data#{time_stamp => TimeStamp} | All].
|
||||
|
||||
cal_rate( #emqx_monit{data = NowData, time = NowTime}
|
||||
, #emqx_monit{data = LastData, time = LastTime}) ->
|
||||
TimeDelta = NowTime - LastTime,
|
||||
Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end,
|
||||
Gauge = maps:filter(Filter, NowData),
|
||||
{_, _, _, Rate} =
|
||||
lists:foldl(fun cal_rate_/2, {NowData, LastData, TimeDelta, Gauge}, ?DELTA_SAMPLER_LIST),
|
||||
Rate.
|
||||
|
||||
cal_rate_(Key, {Now, Last, TDelta, Res}) ->
|
||||
NewValue = maps:get(Key, Now),
|
||||
LastValue = maps:get(Key, Last),
|
||||
Rate = ((NewValue - LastValue) * 1000) div TDelta,
|
||||
RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP),
|
||||
{Now, Last, TDelta, Res#{RateKey => Rate}}.
|
||||
|
||||
granularity_adapter([], Res) ->
|
||||
lists:reverse(Res);
|
||||
granularity_adapter([Sampler], Res) ->
|
||||
granularity_adapter([], [Sampler | Res]);
|
||||
granularity_adapter([Sampler1, Sampler2 | Rest], Res) ->
|
||||
Fun =
|
||||
fun(Key, M) ->
|
||||
Value1 = maps:get(Key, Sampler1),
|
||||
Value2 = maps:get(Key, Sampler2),
|
||||
M#{Key => Value1 + Value2}
|
||||
end,
|
||||
granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% timer
|
||||
|
||||
sample_timer() ->
|
||||
{NextTime, Remaining} = next_interval(),
|
||||
erlang:send_after(Remaining, self(), {sample, NextTime}).
|
||||
|
||||
clean_timer() ->
|
||||
erlang:send_after(?CLEAN_EXPIRED_INTERVAL, self(), clean_expired).
|
||||
|
||||
%% Per interval seconds.
|
||||
%% As an example:
|
||||
%% Interval = 10
|
||||
%% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ...
|
||||
%% Ensure that the monitor data of all nodes in the cluster are aligned in time
|
||||
next_interval() ->
|
||||
Interval = emqx_conf:get([dashboard, sample_interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000,
|
||||
Now = erlang:system_time(millisecond),
|
||||
NextTime = ((Now div Interval) + 1) * Interval,
|
||||
Remaining = NextTime - Now,
|
||||
{NextTime, Remaining}.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% data
|
||||
|
||||
sample(Time) ->
|
||||
Fun =
|
||||
fun(Key, Res) ->
|
||||
maps:put(Key, value(Key), Res)
|
||||
end,
|
||||
Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST),
|
||||
#emqx_monit{time = Time, data = Data}.
|
||||
|
||||
flush(_Last = undefined, Now) ->
|
||||
store(Now);
|
||||
flush(_Last = #emqx_monit{data = LastData}, Now = #emqx_monit{data = NowData}) ->
|
||||
Store = Now#emqx_monit{data = delta(LastData, NowData)},
|
||||
store(Store).
|
||||
|
||||
delta(LastData, NowData) ->
|
||||
Fun =
|
||||
fun(Key, Data) ->
|
||||
Value = maps:get(Key, NowData) - maps:get(Key, LastData),
|
||||
Data#{Key => Value}
|
||||
end,
|
||||
lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST).
|
||||
|
||||
store(MonitData) ->
|
||||
{atomic, ok} =
|
||||
mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]).
|
||||
|
||||
clean() ->
|
||||
Now = erlang:system_time(millisecond),
|
||||
ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
|
||||
Expired = ets:select(?TAB, ExpiredMS),
|
||||
lists:foreach(fun(Data) ->
|
||||
true = ets:delete_object(?TAB, Data)
|
||||
end, Expired),
|
||||
ok.
|
||||
|
||||
%% To make it easier to do data aggregation
|
||||
internal_format(List) when is_list(List) ->
|
||||
Fun =
|
||||
fun(Data, All) ->
|
||||
maps:merge(internal_format(Data), All)
|
||||
end,
|
||||
lists:foldl(Fun, #{}, List);
|
||||
internal_format(#emqx_monit{time = Time, data = Data}) ->
|
||||
#{Time => Data}.
|
||||
|
||||
count_map(M1, M2) ->
|
||||
Fun =
|
||||
fun(Key, Map) ->
|
||||
Map#{key => maps:get(Key, M1) + maps:get(Key, M2)}
|
||||
end,
|
||||
lists:foldl(Fun, #{}, ?SAMPLER_LIST).
|
||||
|
||||
value(connections) -> emqx_stats:getstat('connections.count');
|
||||
value(routes) -> emqx_stats:getstat('routes.count');
|
||||
value(subscriptions) -> emqx_stats:getstat('subscriptions.count');
|
||||
value(received) -> emqx_metrics:val('messages.received');
|
||||
value(received_bytes) -> emqx_metrics:val('bytes.received');
|
||||
value(sent) -> emqx_metrics:val('messages.sent');
|
||||
value(sent_bytes) -> emqx_metrics:val('bytes.sent');
|
||||
value(dropped) -> emqx_metrics:val('messages.dropped').
|
|
@ -5,308 +5,165 @@
|
|||
-module(emqx_dashboard_monitor_api).
|
||||
|
||||
-include("emqx_dashboard.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-import(emqx_mgmt_util, [schema/2]).
|
||||
-export([api_spec/0]).
|
||||
-export([ api_spec/0]).
|
||||
|
||||
-export([ paths/0
|
||||
, schema/1
|
||||
, fields/1
|
||||
]).
|
||||
|
||||
-export([ monitor/2
|
||||
, counters/2
|
||||
, monitor_nodes/2
|
||||
, monitor_nodes_counters/2
|
||||
, current_counters/2
|
||||
, monitor_current/2
|
||||
]).
|
||||
|
||||
-export([ sampling/1
|
||||
, sampling/2
|
||||
]).
|
||||
|
||||
-define(COUNTERS, [ connection
|
||||
, route
|
||||
, subscriptions
|
||||
, received
|
||||
, sent
|
||||
, dropped]).
|
||||
|
||||
-define(EMPTY_COLLECTION, {0, 0, 0, 0}).
|
||||
|
||||
api_spec() ->
|
||||
{[ monitor_api()
|
||||
, monitor_nodes_api()
|
||||
, monitor_nodes_counters_api()
|
||||
, monitor_counters_api()
|
||||
, monitor_current_api()
|
||||
],
|
||||
[]}.
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
|
||||
|
||||
monitor_api() ->
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"List monitor data">>,
|
||||
parameters => [
|
||||
paths() ->
|
||||
[ "/monitor"
|
||||
, "/monitor/nodes/:node"
|
||||
, "/monitor_current"
|
||||
, "/monitor_current/nodes/:node"
|
||||
].
|
||||
|
||||
schema("/monitor") ->
|
||||
#{
|
||||
name => aggregate,
|
||||
in => query,
|
||||
required => false,
|
||||
schema => #{type => boolean}
|
||||
'operationId' => monitor,
|
||||
get => #{
|
||||
tags => [dashboard],
|
||||
description => <<"List monitor data.">>,
|
||||
parameters => [parameter_latest()],
|
||||
responses => #{
|
||||
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
|
||||
400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
|
||||
}
|
||||
}
|
||||
],
|
||||
responses => #{
|
||||
<<"200">> => schema(counters_schema(), <<"Monitor count data">>)}}},
|
||||
{"/monitor", Metadata, monitor}.
|
||||
|
||||
monitor_nodes_api() ->
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"List monitor data">>,
|
||||
parameters => [path_param_node()],
|
||||
responses => #{
|
||||
<<"200">> => schema(counters_schema(), <<"Monitor count data in node">>)}}},
|
||||
{"/monitor/nodes/:node", Metadata, monitor_nodes}.
|
||||
|
||||
monitor_nodes_counters_api() ->
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"List monitor data">>,
|
||||
parameters => [
|
||||
path_param_node(),
|
||||
path_param_counter()
|
||||
],
|
||||
responses => #{
|
||||
<<"200">> => schema(counter_schema(), <<"Monitor single count data in node">>)}}},
|
||||
{"/monitor/nodes/:node/counters/:counter", Metadata, monitor_nodes_counters}.
|
||||
|
||||
monitor_counters_api() ->
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"List monitor data">>,
|
||||
parameters => [
|
||||
path_param_counter()
|
||||
],
|
||||
responses => #{
|
||||
<<"200">> =>
|
||||
schema(counter_schema(), <<"Monitor single count data">>)}}},
|
||||
{"/monitor/counters/:counter", Metadata, counters}.
|
||||
monitor_current_api() ->
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"Current monitor data">>,
|
||||
responses => #{
|
||||
<<"200">> => schema(current_counters_schema(), <<"Current monitor data">>)}}},
|
||||
{"/monitor/current", Metadata, current_counters}.
|
||||
|
||||
path_param_node() ->
|
||||
#{
|
||||
name => node,
|
||||
in => path,
|
||||
required => true,
|
||||
schema => #{type => string},
|
||||
example => node()
|
||||
}.
|
||||
|
||||
path_param_counter() ->
|
||||
#{
|
||||
name => counter,
|
||||
in => path,
|
||||
required => true,
|
||||
schema => #{type => string, enum => ?COUNTERS},
|
||||
example => hd(?COUNTERS)
|
||||
}.
|
||||
|
||||
current_counters_schema() ->
|
||||
#{
|
||||
type => object,
|
||||
properties => #{
|
||||
connection => #{type => integer},
|
||||
sent => #{type => integer},
|
||||
received => #{type => integer},
|
||||
subscription => #{type => integer}}
|
||||
}.
|
||||
|
||||
counters_schema() ->
|
||||
Fun =
|
||||
fun(K, M) ->
|
||||
maps:merge(M, counters_schema(K))
|
||||
end,
|
||||
Properties = lists:foldl(Fun, #{}, ?COUNTERS),
|
||||
#{
|
||||
type => object,
|
||||
properties => Properties
|
||||
}.
|
||||
|
||||
counters_schema(Name) ->
|
||||
#{Name => counter_schema()}.
|
||||
counter_schema() ->
|
||||
#{
|
||||
type => array,
|
||||
items => #{
|
||||
type => object,
|
||||
properties => #{
|
||||
timestamp => #{
|
||||
type => integer,
|
||||
description => <<"Millisecond">>},
|
||||
count => #{
|
||||
type => integer}}}}.
|
||||
%%%==============================================================================================
|
||||
%% parameters trans
|
||||
monitor(get, #{query_string := Qs}) ->
|
||||
Aggregate = maps:get(<<"aggregate">>, Qs, <<"false">>),
|
||||
{200, list_collect(Aggregate)}.
|
||||
|
||||
monitor_nodes(get, #{bindings := #{node := Node}}) ->
|
||||
lookup([{<<"node">>, Node}]).
|
||||
|
||||
monitor_nodes_counters(get, #{bindings := #{node := Node, counter := Counter}}) ->
|
||||
lookup([{<<"node">>, Node}, {<<"counter">>, Counter}]).
|
||||
|
||||
counters(get, #{bindings := #{counter := Counter}}) ->
|
||||
lookup([{<<"counter">>, Counter}]).
|
||||
|
||||
current_counters(get, _Params) ->
|
||||
Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()],
|
||||
Nodes = length(mria_mnesia:running_nodes()),
|
||||
{Received, Sent, Sub, Conn} = format_current_metrics(Data),
|
||||
Response = #{
|
||||
nodes => Nodes,
|
||||
received => Received,
|
||||
sent => Sent,
|
||||
subscription => Sub,
|
||||
connection => Conn
|
||||
},
|
||||
{200, Response}.
|
||||
|
||||
format_current_metrics(Collects) ->
|
||||
format_current_metrics(Collects, ?EMPTY_COLLECTION).
|
||||
format_current_metrics([], Acc) ->
|
||||
Acc;
|
||||
format_current_metrics([{Received, Sent, Sub, Conn} | Collects],
|
||||
{Received1, Sent1, Sub1, Conn1}) ->
|
||||
format_current_metrics(Collects,
|
||||
{Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
|
||||
|
||||
|
||||
%%%==============================================================================================
|
||||
%% api apply
|
||||
|
||||
lookup(Params) ->
|
||||
Fun =
|
||||
fun({K,V}, M) ->
|
||||
maps:put(binary_to_atom(K, utf8), binary_to_atom(V, utf8), M)
|
||||
end,
|
||||
lookup_(lists:foldl(Fun, #{}, Params)).
|
||||
|
||||
lookup_(#{node := Node, counter := Counter}) ->
|
||||
Data = hd(maps:values(sampling(Node, Counter))),
|
||||
{200, Data};
|
||||
lookup_(#{node := Node}) ->
|
||||
{200, sampling(Node)};
|
||||
lookup_(#{counter := Counter}) ->
|
||||
CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]),
|
||||
Data = hd(maps:values(CounterData)),
|
||||
{200, Data}.
|
||||
|
||||
list_collect(Aggregate) ->
|
||||
case Aggregate of
|
||||
<<"true">> ->
|
||||
[maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()];
|
||||
_ ->
|
||||
Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()],
|
||||
merger_counters(Counters)
|
||||
end.
|
||||
|
||||
get_collect(Node) ->
|
||||
case emqx_dashboard_proto_v1:get_collect(Node) of
|
||||
{badrpc, _Reason} -> ?EMPTY_COLLECTION;
|
||||
Res -> Res
|
||||
end.
|
||||
|
||||
merger_counters(ClusterCounters) ->
|
||||
lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters).
|
||||
|
||||
merger_node_counters(NodeCounters, Counters) ->
|
||||
maps:fold(fun merger_counter/3, Counters, NodeCounters).
|
||||
|
||||
merger_counter(Key, Counters, Res) ->
|
||||
case maps:get(Key, Res, undefined) of
|
||||
undefined ->
|
||||
Res#{Key => Counters};
|
||||
OldCounters ->
|
||||
NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters),
|
||||
Res#{Key => NCounters}
|
||||
end.
|
||||
|
||||
merger_counter(#{timestamp := Timestamp, count := Value}, Counters) ->
|
||||
Comparison =
|
||||
fun(Counter) ->
|
||||
case maps:get(timestamp, Counter) =:= Timestamp of
|
||||
true ->
|
||||
Count = maps:get(count, Counter),
|
||||
{ok, Counter#{count => Count + Value}};
|
||||
false ->
|
||||
ignore
|
||||
end
|
||||
end,
|
||||
key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}).
|
||||
|
||||
key_replace(List, Comparison, Default) ->
|
||||
key_replace(List, List, Comparison, Default).
|
||||
|
||||
key_replace([], All, _Comparison, Default) ->
|
||||
[Default | All];
|
||||
|
||||
key_replace([Term | List], All, Comparison, Default) ->
|
||||
case Comparison(Term) of
|
||||
{ok, NTerm} ->
|
||||
Tail = [NTerm | List],
|
||||
Header = lists:sublist(All, length(All) - length(Tail)),
|
||||
lists:append(Header, Tail);
|
||||
_ ->
|
||||
key_replace(List, All, Comparison, Default)
|
||||
end.
|
||||
|
||||
sampling(Node) ->
|
||||
Data = emqx_dashboard_proto_v1:select_data(Node),
|
||||
format(lists:sort(Data)).
|
||||
|
||||
sampling(Node, Counter) ->
|
||||
Data = emqx_dashboard_proto_v1:select_data(Node),
|
||||
format_single(lists:sort(Data), Counter).
|
||||
|
||||
format(Collects) ->
|
||||
format(Collects, {[],[],[],[],[],[]}).
|
||||
format([], {Connection, Route, Subscription, Received, Sent, Dropped}) ->
|
||||
#{
|
||||
connection => add_key(Connection),
|
||||
route => add_key(Route),
|
||||
subscriptions => add_key(Subscription),
|
||||
received => add_key(Received),
|
||||
sent => add_key(Sent),
|
||||
dropped => add_key(Dropped)
|
||||
};
|
||||
|
||||
format([#mqtt_collect{timestamp = Ts, collect = {C, R, S, Re, S1, D}} | Collects],
|
||||
{Connection, Route, Subscription, Received, Sent, Dropped}) ->
|
||||
format(Collects, {[[Ts, C] | Connection],
|
||||
[[Ts, R] | Route],
|
||||
[[Ts, S] | Subscription],
|
||||
[[Ts, Re] | Received],
|
||||
[[Ts, S1] | Sent],
|
||||
[[Ts, D] | Dropped]}).
|
||||
add_key(Collects) ->
|
||||
lists:reverse([#{timestamp => Ts * 1000, count => C} || [Ts, C] <- Collects]).
|
||||
schema("/monitor/nodes/:node") ->
|
||||
#{
|
||||
'operationId' => monitor,
|
||||
get => #{
|
||||
tags => [dashboard],
|
||||
description => <<"List the monitor data on the node.">>,
|
||||
parameters => [parameter_node(), parameter_latest()],
|
||||
responses => #{
|
||||
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
|
||||
400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
format_single(Collects, Counter) ->
|
||||
#{Counter => format_single(Collects, counter_index(Counter), [])}.
|
||||
format_single([], _Index, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
format_single([#mqtt_collect{timestamp = Ts, collect = Collect} | Collects], Index, Acc) ->
|
||||
format_single(Collects, Index,
|
||||
[#{timestamp => Ts * 1000, count => erlang:element(Index, Collect)} | Acc]).
|
||||
schema("/monitor_current") ->
|
||||
#{
|
||||
'operationId' => monitor_current,
|
||||
get => #{
|
||||
tags => [dashboard],
|
||||
description => <<"Current status. Gauge and rate.">>,
|
||||
responses => #{
|
||||
200 => hoconsc:mk(hoconsc:ref(sampler_current), #{})
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
counter_index(connection) -> 1;
|
||||
counter_index(route) -> 2;
|
||||
counter_index(subscriptions) -> 3;
|
||||
counter_index(received) -> 4;
|
||||
counter_index(sent) -> 5;
|
||||
counter_index(dropped) -> 6.
|
||||
schema("/monitor_current/nodes/:node") ->
|
||||
#{
|
||||
'operationId' => monitor_current,
|
||||
get => #{
|
||||
tags => [dashboard],
|
||||
description => <<"Node current status. Gauge and rate.">>,
|
||||
parameters => [parameter_node()],
|
||||
responses => #{
|
||||
200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}),
|
||||
400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
parameter_latest() ->
|
||||
Info = #{
|
||||
in => query,
|
||||
required => false,
|
||||
example => 5 * 60,
|
||||
description => <<"The latest N seconds data. Like 300 for 5 min.">>
|
||||
},
|
||||
{latest, hoconsc:mk(integer(), Info)}.
|
||||
|
||||
parameter_node() ->
|
||||
Info = #{
|
||||
in => path,
|
||||
required => true,
|
||||
example => node(),
|
||||
description => <<"EMQX node name.">>
|
||||
},
|
||||
{node, hoconsc:mk(binary(), Info)}.
|
||||
|
||||
|
||||
fields(sampler) ->
|
||||
Samplers =
|
||||
[{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
|
||||
|| SamplerName <- ?SAMPLER_LIST],
|
||||
[{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers];
|
||||
|
||||
fields(sampler_current) ->
|
||||
[{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
|
||||
|| SamplerName <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST].
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% API
|
||||
|
||||
monitor(get, #{query_string := QS, bindings := Bindings}) ->
|
||||
Latest = maps:get(<<"latest">>, QS, 1000),
|
||||
Node = binary_to_atom(maps:get(node, Bindings, <<"all">>)),
|
||||
case emqx_dashboard_monitor:samplers(Node, Latest) of
|
||||
{badrpc, {Node, Reason}} ->
|
||||
Message = list_to_binary(io_lib:format("Bad node ~p, rpc failed ~p", [Node, Reason])),
|
||||
{400, 'BAD_RPC', Message};
|
||||
Samplers ->
|
||||
{200, Samplers}
|
||||
end.
|
||||
|
||||
monitor_current(get, #{bindings := Bindings}) ->
|
||||
NodeOrCluster = binary_to_atom(maps:get(node, Bindings, <<"all">>), utf8),
|
||||
case emqx_dashboard_monitor:current_rate(NodeOrCluster) of
|
||||
{ok, CurrentRate} ->
|
||||
{200, CurrentRate};
|
||||
{badrpc, {Node, Reason}} ->
|
||||
Message = list_to_binary(io_lib:format("Bad node ~p, rpc failed ~p", [Node, Reason])),
|
||||
{400, 'BAD_RPC', Message}
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Internal
|
||||
|
||||
swagger_desc(received) -> swagger_desc_format("Received messages ");
|
||||
swagger_desc(received_bytes) -> swagger_desc_format("Received bytes ");
|
||||
swagger_desc(sent) -> swagger_desc_format("Sent messages ");
|
||||
swagger_desc(sent_bytes) -> swagger_desc_format("Sent bytes ");
|
||||
swagger_desc(dropped) -> swagger_desc_format("Dropped messages ");
|
||||
swagger_desc(subscriptions) ->
|
||||
<<"Subscriptions at the time of sampling."
|
||||
" Can only represent the approximate state">>;
|
||||
swagger_desc(routes) ->
|
||||
<<"Routes at the time of sampling."
|
||||
" Can only represent the approximate state">>;
|
||||
swagger_desc(connections) ->
|
||||
<<"Connections at the time of sampling."
|
||||
" Can only represent the approximate state">>;
|
||||
|
||||
swagger_desc(received_rate) -> swagger_desc_format("Dropped messages ", per);
|
||||
swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per);
|
||||
swagger_desc(sent_rate) -> swagger_desc_format("Sent messages ", per);
|
||||
swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per);
|
||||
swagger_desc(dropped_rate) -> swagger_desc_format("Dropped messages ", per).
|
||||
|
||||
swagger_desc_format(Format) ->
|
||||
swagger_desc_format(Format, last).
|
||||
|
||||
swagger_desc_format(Format, Type) ->
|
||||
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
|
||||
list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).
|
||||
|
|
|
@ -29,4 +29,4 @@ start_link() ->
|
|||
|
||||
init([]) ->
|
||||
{ok, {{one_for_all, 10, 100},
|
||||
[?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_collection)]}}.
|
||||
[?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_monitor)]}}.
|
||||
|
|
|
@ -19,9 +19,8 @@
|
|||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([ introduced_in/0
|
||||
|
||||
, get_collect/1
|
||||
, select_data/1
|
||||
, do_sample/2
|
||||
, current_rate/1
|
||||
]).
|
||||
|
||||
-include("emqx_dashboard.hrl").
|
||||
|
@ -30,11 +29,10 @@
|
|||
introduced_in() ->
|
||||
"5.0.0".
|
||||
|
||||
-spec get_collect(node()) -> _.
|
||||
get_collect(Node) ->
|
||||
rpc:call(Node, emqx_dashboard_collection, get_collect, []).
|
||||
-spec do_sample(node(), Latest:: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc().
|
||||
do_sample(Node, Latest) ->
|
||||
rpc:call(Node, emqx_dashboard_monitor, do_sample, [Node, Latest], ?RPC_TIMEOUT).
|
||||
|
||||
-spec select_data(node()) -> [#mqtt_collect{}]
|
||||
| emqx_rpc:badrpc().
|
||||
select_data(Node) ->
|
||||
rpc:call(Node, emqx_dashboard_collection, select_data, []).
|
||||
-spec current_rate(node()) -> {ok, map()} | emqx_rpc:badrpc().
|
||||
current_rate(Node) ->
|
||||
rpc:call(Node, emqx_dashboard_monitor, current_rate, [Node], ?RPC_TIMEOUT).
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_dashboard_monitor_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include("emqx_dashboard.hrl").
|
||||
|
||||
-define(SERVER, "http://127.0.0.1:18083").
|
||||
-define(BASE_PATH, "/api/v5").
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
mria:start(),
|
||||
emqx_common_test_helpers:start_apps([emqx_dashboard], fun set_special_configs/1),
|
||||
Config.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
emqx_common_test_helpers:stop_apps([emqx_dashboard]),
|
||||
Config.
|
||||
|
||||
set_special_configs(emqx_dashboard) ->
|
||||
Config = #{
|
||||
default_username => <<"admin">>,
|
||||
default_password => <<"public">>,
|
||||
listeners => [#{
|
||||
protocol => http,
|
||||
port => 18083
|
||||
}]
|
||||
},
|
||||
emqx_config:put([dashboard], Config),
|
||||
ok;
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
||||
t_monitor_samplers_all(_Config) ->
|
||||
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
||||
Size = mnesia:table_info(emqx_dashboard_monitor,size),
|
||||
All = emqx_dashboard_monitor:samplers(all, infinity),
|
||||
All2 = emqx_dashboard_monitor:samplers(),
|
||||
?assert(erlang:length(All) == Size),
|
||||
?assert(erlang:length(All2) == Size),
|
||||
ok.
|
||||
|
||||
t_monitor_samplers_latest(_Config) ->
|
||||
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
||||
Samplers = emqx_dashboard_monitor:samplers(node(), 2),
|
||||
Latest = emqx_dashboard_monitor:samplers(node(), 1),
|
||||
?assert(erlang:length(Samplers) == 2),
|
||||
?assert(erlang:length(Latest) == 1),
|
||||
?assert(hd(Latest) == lists:nth(2, Samplers)),
|
||||
ok.
|
||||
|
||||
t_monitor_sampler_format(_Config) ->
|
||||
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
||||
Latest = hd(emqx_dashboard_monitor:samplers(node(), 1)),
|
||||
SamplerKeys = maps:keys(Latest),
|
||||
[?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST],
|
||||
ok.
|
||||
|
||||
t_monitor_api(_) ->
|
||||
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
||||
{ok, Samplers} = request(["monitor"], "latest=20"),
|
||||
?assert(erlang:length(Samplers) >= 2),
|
||||
Fun =
|
||||
fun(Sampler) ->
|
||||
Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)],
|
||||
[?assert(lists:member(SamplerName, Keys)) || SamplerName <- ?SAMPLER_LIST]
|
||||
end,
|
||||
[Fun(Sampler) || Sampler <- Samplers],
|
||||
{ok, NodeSamplers} = request(["monitor", "nodes", node()]),
|
||||
[Fun(NodeSampler) || NodeSampler <- NodeSamplers],
|
||||
ok.
|
||||
|
||||
t_monitor_current_api(_) ->
|
||||
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
||||
{ok, Rate} = request(["monitor_current"]),
|
||||
[?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|
||||
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST],
|
||||
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
|
||||
[?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate))
|
||||
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST],
|
||||
ok.
|
||||
|
||||
t_monitor_api_error(_) ->
|
||||
{error, {400, #{<<"code">> := <<"BAD_RPC">>}}} =
|
||||
request(["monitor", "nodes", 'emqx@127.0.0.2']),
|
||||
{error, {400, #{<<"code">> := <<"BAD_RPC">>}}} =
|
||||
request(["monitor_current", "nodes", 'emqx@127.0.0.2']),
|
||||
ok.
|
||||
|
||||
request(Path) ->
|
||||
request(Path, "").
|
||||
|
||||
request(Path, QS) ->
|
||||
Url = url(Path, QS),
|
||||
do_request_api(get, {Url, [auth_header_()]}).
|
||||
|
||||
url(Parts, QS)->
|
||||
case QS of
|
||||
"" ->
|
||||
?SERVER ++ filename:join([?BASE_PATH | Parts]);
|
||||
_ ->
|
||||
?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS
|
||||
end.
|
||||
|
||||
do_request_api(Method, Request)->
|
||||
ct:pal("Req ~p ~p~n", [Method, Request]),
|
||||
case httpc:request(Method, Request, [], []) of
|
||||
{error, socket_closed_remotely} ->
|
||||
{error, socket_closed_remotely};
|
||||
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
|
||||
when Code >= 200 andalso Code =< 299 ->
|
||||
ct:pal("Resp ~p ~p~n", [Code, Return]),
|
||||
{ok, emqx_json:decode(Return, [return_maps])};
|
||||
{ok, {{"HTTP/1.1", Code, _}, _, Return} } ->
|
||||
ct:pal("Resp ~p ~p~n", [Code, Return]),
|
||||
{error, {Code, emqx_json:decode(Return, [return_maps])}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
auth_header_() ->
|
||||
Basic = binary_to_list(base64:encode(<<"admin:public">>)),
|
||||
{"Authorization", "Basic " ++ Basic}.
|
|
@ -1,121 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_dashboard_monitor_api_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include("emqx_dashboard.hrl").
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_testcase(t_badrpc_collect, Config) ->
|
||||
Cluster = cluster_specs(2),
|
||||
Apps = [emqx_modules, emqx_dashboard],
|
||||
Nodes = [N1, N2] = lists:map(fun(Spec) -> start_slave(Spec, Apps) end, Cluster),
|
||||
%% form the cluster
|
||||
ok = rpc:call(N2, mria, join, [N1]),
|
||||
%% Wait until all nodes are healthy:
|
||||
[rpc:call(Node, mria_rlog, wait_for_shards, [[?DASHBOARD_SHARD], 5000])
|
||||
|| Node <- Nodes],
|
||||
[ {nodes, Nodes}
|
||||
, {apps, Apps}
|
||||
| Config];
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_badrpc_collect, Config) ->
|
||||
Apps = ?config(apps, Config),
|
||||
Nodes = ?config(nodes, Config),
|
||||
lists:foreach(fun(Node) -> stop_slave(Node, Apps) end, Nodes),
|
||||
ok;
|
||||
end_per_testcase(_, _Config) ->
|
||||
ok.
|
||||
|
||||
t_badrpc_collect(Config) ->
|
||||
[N1, N2] = ?config(nodes, Config),
|
||||
%% simulate badrpc on one node
|
||||
ok = rpc:call(N2, meck, new, [emqx_dashboard_collection, [no_history, no_link]]),
|
||||
%% we don't mock the `emqx_dashboard_collection:get_collect/0' to
|
||||
%% provoke the `badrpc' error.
|
||||
?assertMatch(
|
||||
{200, #{nodes := 2}},
|
||||
rpc:call(N1, emqx_dashboard_monitor_api, current_counters, [get, #{}])),
|
||||
ok = rpc:call(N2, meck, unload, [emqx_dashboard_collection]),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
cluster_specs(NumNodes) ->
|
||||
BaseGenRpcPort = 9000,
|
||||
Specs0 = [#{ name => node_name(N)
|
||||
, num => N
|
||||
}
|
||||
|| N <- lists:seq(1, NumNodes)],
|
||||
GenRpcPorts = maps:from_list([{node_id(Name), {tcp, BaseGenRpcPort + N}}
|
||||
|| #{name := Name, num := N} <- Specs0]),
|
||||
[ Spec#{env => [ {gen_rpc, tcp_server_port, BaseGenRpcPort + N}
|
||||
, {gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
|
||||
]}
|
||||
|| Spec = #{num := N} <- Specs0].
|
||||
|
||||
node_name(N) ->
|
||||
list_to_atom("n" ++ integer_to_list(N)).
|
||||
|
||||
node_id(Name) ->
|
||||
list_to_atom(lists:concat([Name, "@", host()])).
|
||||
|
||||
start_slave(Spec = #{ name := Name}, Apps) ->
|
||||
CommonBeamOpts = "+S 1:1 ", % We want VMs to only occupy a single core
|
||||
{ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()),
|
||||
setup_node(Node, Spec, Apps),
|
||||
Node.
|
||||
|
||||
stop_slave(Node, Apps) ->
|
||||
ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps]),
|
||||
slave:stop(Node).
|
||||
|
||||
host() ->
|
||||
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
|
||||
|
||||
ebin_path() ->
|
||||
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
||||
|
||||
is_lib(Path) ->
|
||||
string:prefix(Path, code:lib_dir()) =:= nomatch.
|
||||
|
||||
setenv(Node, Env) ->
|
||||
[rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env].
|
||||
|
||||
setup_node(Node, _Spec = #{env := Env}, Apps) ->
|
||||
%% load these before starting ekka and such
|
||||
[rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_conf, emqx]],
|
||||
setenv(Node, Env),
|
||||
EnvHandler =
|
||||
fun(emqx) ->
|
||||
application:set_env(emqx, boot_modules, [router, broker]);
|
||||
(_) ->
|
||||
ok
|
||||
end,
|
||||
ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps, EnvHandler]),
|
||||
ok.
|
Loading…
Reference in New Issue