411 lines
12 KiB
Erlang
411 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_dashboard_monitor).
|
|
|
|
-include("emqx_dashboard.hrl").
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-boot_mnesia({mnesia, [boot]}).
|
|
|
|
-export([start_link/0]).
|
|
|
|
-export([
|
|
init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
-export([mnesia/1]).
|
|
|
|
-export([
|
|
samplers/0,
|
|
samplers/2,
|
|
current_rate/0,
|
|
current_rate/1,
|
|
granularity_adapter/1
|
|
]).
|
|
|
|
%% for rpc
|
|
-export([do_sample/2]).
|
|
|
|
-define(TAB, ?MODULE).
|
|
|
|
%% 1 hour = 60 * 60 * 1000 milliseconds
|
|
-define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000).
|
|
%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds
|
|
-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000).
|
|
|
|
-record(state, {
|
|
last
|
|
}).
|
|
|
|
-record(emqx_monit, {
|
|
time :: integer(),
|
|
data :: map()
|
|
}).
|
|
|
|
mnesia(boot) ->
|
|
ok = mria:create_table(?TAB, [
|
|
{type, set},
|
|
{local_content, true},
|
|
{storage, disc_copies},
|
|
{record_name, emqx_monit},
|
|
{attributes, record_info(fields, emqx_monit)}
|
|
]).
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
%% API
|
|
|
|
samplers() ->
|
|
format(do_sample(all, infinity)).
|
|
|
|
samplers(NodeOrCluster, Latest) ->
|
|
Time = latest2time(Latest),
|
|
case format(do_sample(NodeOrCluster, Time)) of
|
|
{badrpc, Reason} ->
|
|
{badrpc, Reason};
|
|
List when is_list(List) ->
|
|
granularity_adapter(List)
|
|
end.
|
|
|
|
latest2time(infinity) -> infinity;
|
|
latest2time(Latest) -> erlang:system_time(millisecond) - (Latest * 1000).
|
|
|
|
%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI.
|
|
%% granularity_adapter is an oversampling of the samples.
|
|
%% Use more granular data and reduce data density.
|
|
%%
|
|
%% [
|
|
%% Data1 = #{time => T1, k1 => 1, k2 => 2},
|
|
%% Data2 = #{time => T2, k1 => 3, k2 => 4},
|
|
%% ...
|
|
%% ]
|
|
%% After granularity_adapter, Merge Data1 Data2
|
|
%%
|
|
%% [
|
|
%% #{time => T2, k1 => 1 + 3, k2 => 2 + 6},
|
|
%% ...
|
|
%% ]
|
|
%%
|
|
granularity_adapter(List) when length(List) > 1000 ->
|
|
granularity_adapter(List, []);
|
|
granularity_adapter(List) ->
|
|
List.
|
|
|
|
%% Get the current rate. Not the current sampler data.
|
|
current_rate() ->
|
|
Fun =
|
|
fun
|
|
(Node, Cluster) when is_map(Cluster) ->
|
|
case current_rate(Node) of
|
|
{ok, CurrentRate} ->
|
|
merge_cluster_rate(CurrentRate, Cluster);
|
|
{badrpc, Reason} ->
|
|
{badrpc, {Node, Reason}}
|
|
end;
|
|
(_Node, Error) ->
|
|
Error
|
|
end,
|
|
case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)) of
|
|
{badrpc, Reason} ->
|
|
{badrpc, Reason};
|
|
Rate ->
|
|
{ok, Rate}
|
|
end.
|
|
|
|
current_rate(all) ->
|
|
current_rate();
|
|
current_rate(Node) when Node == node() ->
|
|
try
|
|
{ok, Rate} = do_call(current_rate),
|
|
{ok, Rate}
|
|
catch
|
|
_E:R ->
|
|
?SLOG(warning, #{msg => "Dashboard monitor error", reason => R}),
|
|
%% Rate map 0, ensure api will not crash.
|
|
%% When joining cluster, dashboard monitor restart.
|
|
Rate0 = [
|
|
{Key, 0}
|
|
|| Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
|
|
],
|
|
{ok, maps:from_list(Rate0)}
|
|
end;
|
|
current_rate(Node) ->
|
|
case emqx_dashboard_proto_v1:current_rate(Node) of
|
|
{badrpc, Reason} ->
|
|
{badrpc, {Node, Reason}};
|
|
{ok, Rate} ->
|
|
{ok, Rate}
|
|
end.
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
%% gen_server functions
|
|
|
|
start_link() ->
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
|
|
init([]) ->
|
|
sample_timer(),
|
|
clean_timer(),
|
|
{ok, #state{last = undefined}}.
|
|
|
|
handle_call(current_rate, _From, State = #state{last = Last}) ->
|
|
NowTime = erlang:system_time(millisecond),
|
|
NowSamplers = sample(NowTime),
|
|
Rate = cal_rate(NowSamplers, Last),
|
|
{reply, {ok, Rate}, State};
|
|
handle_call(_Request, _From, State = #state{}) ->
|
|
{reply, ok, State}.
|
|
|
|
handle_cast(_Request, State = #state{}) ->
|
|
{noreply, State}.
|
|
|
|
handle_info({sample, Time}, State = #state{last = Last}) ->
|
|
Now = sample(Time),
|
|
{atomic, ok} = flush(Last, Now),
|
|
sample_timer(),
|
|
{noreply, State#state{last = Now}};
|
|
handle_info(clean_expired, State) ->
|
|
clean(),
|
|
clean_timer(),
|
|
{noreply, State};
|
|
handle_info(_Info, State = #state{}) ->
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State = #state{}) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
%% Internal functions
|
|
|
|
do_call(Request) ->
|
|
gen_server:call(?MODULE, Request, 5000).
|
|
|
|
do_sample(all, Time) ->
|
|
do_sample(mria_mnesia:cluster_nodes(running), Time, #{});
|
|
do_sample(Node, Time) when Node == node() ->
|
|
MS = match_spec(Time),
|
|
internal_format(ets:select(?TAB, MS));
|
|
do_sample(Node, Time) ->
|
|
case emqx_dashboard_proto_v1:do_sample(Node, Time) of
|
|
{badrpc, Reason} ->
|
|
{badrpc, {Node, Reason}};
|
|
Res ->
|
|
Res
|
|
end.
|
|
|
|
do_sample([], _Time, Res) ->
|
|
Res;
|
|
do_sample([Node | Nodes], Time, Res) ->
|
|
case do_sample(Node, Time) of
|
|
{badrpc, Reason} ->
|
|
{badrpc, Reason};
|
|
Samplers ->
|
|
do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res))
|
|
end.
|
|
|
|
match_spec(infinity) ->
|
|
[{'$1', [], ['$1']}];
|
|
match_spec(Time) ->
|
|
[{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
|
|
|
|
merge_cluster_samplers(Node, Cluster) ->
|
|
maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
|
|
|
|
merge_cluster_samplers(TS, NodeData, Cluster) ->
|
|
case maps:get(TS, Cluster, undefined) of
|
|
undefined ->
|
|
Cluster#{TS => NodeData};
|
|
ClusterData ->
|
|
Cluster#{TS => merge_cluster_sampler_map(NodeData, ClusterData)}
|
|
end.
|
|
|
|
merge_cluster_sampler_map(M1, M2) ->
|
|
Fun =
|
|
fun
|
|
(topics, Map) ->
|
|
Map#{topics => maps:get(topics, M1)};
|
|
(Key, Map) ->
|
|
Map#{Key => maps:get(Key, M1) + maps:get(Key, M2)}
|
|
end,
|
|
lists:foldl(Fun, #{}, ?SAMPLER_LIST).
|
|
|
|
merge_cluster_rate(Node, Cluster) ->
|
|
Fun =
|
|
fun
|
|
(topics, Value, NCluster) ->
|
|
NCluster#{topics => Value};
|
|
(Key, Value, NCluster) ->
|
|
ClusterValue = maps:get(Key, NCluster, 0),
|
|
NCluster#{Key => Value + ClusterValue}
|
|
end,
|
|
maps:fold(Fun, Cluster, Node).
|
|
|
|
format({badrpc, Reason}) ->
|
|
{badrpc, Reason};
|
|
format(Data) ->
|
|
All = maps:fold(fun format/3, [], Data),
|
|
Compare = fun(#{time_stamp := T1}, #{time_stamp := T2}) -> T1 =< T2 end,
|
|
lists:sort(Compare, All).
|
|
|
|
format(TimeStamp, Data, All) ->
|
|
[Data#{time_stamp => TimeStamp} | All].
|
|
|
|
cal_rate(_Now, undefined) ->
|
|
AllSamples = ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP),
|
|
lists:foldl(fun(Key, Acc) -> Acc#{Key => 0} end, #{}, AllSamples);
|
|
cal_rate(
|
|
#emqx_monit{data = NowData, time = NowTime},
|
|
#emqx_monit{data = LastData, time = LastTime} = Last
|
|
) ->
|
|
case NowTime - LastTime of
|
|
0 ->
|
|
%% make sure: not divide by zero
|
|
timer:sleep(5),
|
|
NewSamplers = sample(erlang:system_time(millisecond)),
|
|
cal_rate(NewSamplers, Last);
|
|
TimeDelta ->
|
|
Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end,
|
|
Gauge = maps:filter(Filter, NowData),
|
|
{_, _, _, Rate} =
|
|
lists:foldl(
|
|
fun cal_rate_/2,
|
|
{NowData, LastData, TimeDelta, Gauge},
|
|
?DELTA_SAMPLER_LIST
|
|
),
|
|
Rate
|
|
end.
|
|
|
|
cal_rate_(Key, {Now, Last, TDelta, Res}) ->
|
|
NewValue = maps:get(Key, Now),
|
|
LastValue = maps:get(Key, Last),
|
|
Rate = ((NewValue - LastValue) * 1000) div TDelta,
|
|
RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP),
|
|
{Now, Last, TDelta, Res#{RateKey => Rate}}.
|
|
|
|
granularity_adapter([], Res) ->
|
|
lists:reverse(Res);
|
|
granularity_adapter([Sampler], Res) ->
|
|
granularity_adapter([], [Sampler | Res]);
|
|
granularity_adapter([Sampler1, Sampler2 | Rest], Res) ->
|
|
Fun =
|
|
fun(Key, M) ->
|
|
Value1 = maps:get(Key, Sampler1),
|
|
Value2 = maps:get(Key, Sampler2),
|
|
M#{Key => Value1 + Value2}
|
|
end,
|
|
granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]).
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
%% timer
|
|
|
|
sample_timer() ->
|
|
{NextTime, Remaining} = next_interval(),
|
|
erlang:send_after(Remaining, self(), {sample, NextTime}).
|
|
|
|
clean_timer() ->
|
|
erlang:send_after(?CLEAN_EXPIRED_INTERVAL, self(), clean_expired).
|
|
|
|
%% Per interval seconds.
|
|
%% As an example:
|
|
%% Interval = 10
|
|
%% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ...
|
|
%% Ensure that the monitor data of all nodes in the cluster are aligned in time
|
|
next_interval() ->
|
|
Interval = emqx_conf:get([dashboard, sample_interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000,
|
|
Now = erlang:system_time(millisecond),
|
|
NextTime = ((Now div Interval) + 1) * Interval,
|
|
Remaining = NextTime - Now,
|
|
{NextTime, Remaining}.
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
%% data
|
|
|
|
sample(Time) ->
|
|
Fun =
|
|
fun(Key, Res) ->
|
|
maps:put(Key, getstats(Key), Res)
|
|
end,
|
|
Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST),
|
|
#emqx_monit{time = Time, data = Data}.
|
|
|
|
flush(_Last = undefined, Now) ->
|
|
store(Now);
|
|
flush(_Last = #emqx_monit{data = LastData}, Now = #emqx_monit{data = NowData}) ->
|
|
Store = Now#emqx_monit{data = delta(LastData, NowData)},
|
|
store(Store).
|
|
|
|
delta(LastData, NowData) ->
|
|
Fun =
|
|
fun(Key, Data) ->
|
|
Value = maps:get(Key, NowData) - maps:get(Key, LastData),
|
|
Data#{Key => Value}
|
|
end,
|
|
lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST).
|
|
|
|
store(MonitData) ->
|
|
{atomic, ok} =
|
|
mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]).
|
|
|
|
clean() ->
|
|
Now = erlang:system_time(millisecond),
|
|
ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
|
|
Expired = ets:select(?TAB, ExpiredMS),
|
|
lists:foreach(
|
|
fun(Data) ->
|
|
true = ets:delete_object(?TAB, Data)
|
|
end,
|
|
Expired
|
|
),
|
|
ok.
|
|
|
|
%% To make it easier to do data aggregation
|
|
internal_format(List) when is_list(List) ->
|
|
Fun =
|
|
fun(Data, All) ->
|
|
maps:merge(internal_format(Data), All)
|
|
end,
|
|
lists:foldl(Fun, #{}, List);
|
|
internal_format(#emqx_monit{time = Time, data = Data}) ->
|
|
#{Time => Data}.
|
|
|
|
getstats(Key) ->
|
|
%% Stats ets maybe not exist when ekka join.
|
|
try
|
|
stats(Key)
|
|
catch
|
|
_:_ -> 0
|
|
end.
|
|
|
|
stats(connections) -> emqx_stats:getstat('connections.count');
|
|
stats(topics) -> emqx_stats:getstat('topics.count');
|
|
stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
|
|
stats(received) -> emqx_metrics:val('messages.received');
|
|
stats(received_bytes) -> emqx_metrics:val('bytes.received');
|
|
stats(sent) -> emqx_metrics:val('messages.sent');
|
|
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
|
|
stats(dropped) -> emqx_metrics:val('messages.dropped').
|