emqx/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

411 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_dashboard_monitor).
-include("emqx_dashboard.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(gen_server).
-boot_mnesia({mnesia, [boot]}).
-export([start_link/0]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-export([mnesia/1]).
-export([
samplers/0,
samplers/2,
current_rate/0,
current_rate/1,
granularity_adapter/1
]).
%% for rpc
-export([do_sample/2]).
-define(TAB, ?MODULE).
%% 1 hour = 60 * 60 * 1000 milliseconds
-define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000).
%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds
-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000).
-record(state, {
last
}).
-record(emqx_monit, {
time :: integer(),
data :: map()
}).
mnesia(boot) ->
ok = mria:create_table(?TAB, [
{type, set},
{local_content, true},
{storage, disc_copies},
{record_name, emqx_monit},
{attributes, record_info(fields, emqx_monit)}
]).
%% -------------------------------------------------------------------------------------------------
%% API
samplers() ->
format(do_sample(all, infinity)).
samplers(NodeOrCluster, Latest) ->
Time = latest2time(Latest),
case format(do_sample(NodeOrCluster, Time)) of
{badrpc, Reason} ->
{badrpc, Reason};
List when is_list(List) ->
granularity_adapter(List)
end.
latest2time(infinity) -> infinity;
latest2time(Latest) -> erlang:system_time(millisecond) - (Latest * 1000).
%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI.
%% granularity_adapter is an oversampling of the samples.
%% Use more granular data and reduce data density.
%%
%% [
%% Data1 = #{time => T1, k1 => 1, k2 => 2},
%% Data2 = #{time => T2, k1 => 3, k2 => 4},
%% ...
%% ]
%% After granularity_adapter, Merge Data1 Data2
%%
%% [
%% #{time => T2, k1 => 1 + 3, k2 => 2 + 6},
%% ...
%% ]
%%
granularity_adapter(List) when length(List) > 1000 ->
granularity_adapter(List, []);
granularity_adapter(List) ->
List.
%% Get the current rate. Not the current sampler data.
current_rate() ->
Fun =
fun
(Node, Cluster) when is_map(Cluster) ->
case current_rate(Node) of
{ok, CurrentRate} ->
merge_cluster_rate(CurrentRate, Cluster);
{badrpc, Reason} ->
{badrpc, {Node, Reason}}
end;
(_Node, Error) ->
Error
end,
case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)) of
{badrpc, Reason} ->
{badrpc, Reason};
Rate ->
{ok, Rate}
end.
current_rate(all) ->
current_rate();
current_rate(Node) when Node == node() ->
try
{ok, Rate} = do_call(current_rate),
{ok, Rate}
catch
_E:R ->
?SLOG(warning, #{msg => "Dashboard monitor error", reason => R}),
%% Rate map 0, ensure api will not crash.
%% When joining cluster, dashboard monitor restart.
Rate0 = [
{Key, 0}
|| Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
],
{ok, maps:from_list(Rate0)}
end;
current_rate(Node) ->
case emqx_dashboard_proto_v1:current_rate(Node) of
{badrpc, Reason} ->
{badrpc, {Node, Reason}};
{ok, Rate} ->
{ok, Rate}
end.
%% -------------------------------------------------------------------------------------------------
%% gen_server functions
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
sample_timer(),
clean_timer(),
{ok, #state{last = undefined}}.
handle_call(current_rate, _From, State = #state{last = Last}) ->
NowTime = erlang:system_time(millisecond),
NowSamplers = sample(NowTime),
Rate = cal_rate(NowSamplers, Last),
{reply, {ok, Rate}, State};
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
handle_info({sample, Time}, State = #state{last = Last}) ->
Now = sample(Time),
{atomic, ok} = flush(Last, Now),
sample_timer(),
{noreply, State#state{last = Now}};
handle_info(clean_expired, State) ->
clean(),
clean_timer(),
{noreply, State};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
terminate(_Reason, _State = #state{}) ->
ok.
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%% -------------------------------------------------------------------------------------------------
%% Internal functions
do_call(Request) ->
gen_server:call(?MODULE, Request, 5000).
do_sample(all, Time) ->
do_sample(mria_mnesia:cluster_nodes(running), Time, #{});
do_sample(Node, Time) when Node == node() ->
MS = match_spec(Time),
internal_format(ets:select(?TAB, MS));
do_sample(Node, Time) ->
case emqx_dashboard_proto_v1:do_sample(Node, Time) of
{badrpc, Reason} ->
{badrpc, {Node, Reason}};
Res ->
Res
end.
do_sample([], _Time, Res) ->
Res;
do_sample([Node | Nodes], Time, Res) ->
case do_sample(Node, Time) of
{badrpc, Reason} ->
{badrpc, Reason};
Samplers ->
do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res))
end.
match_spec(infinity) ->
[{'$1', [], ['$1']}];
match_spec(Time) ->
[{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
merge_cluster_samplers(Node, Cluster) ->
maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
merge_cluster_samplers(TS, NodeData, Cluster) ->
case maps:get(TS, Cluster, undefined) of
undefined ->
Cluster#{TS => NodeData};
ClusterData ->
Cluster#{TS => merge_cluster_sampler_map(NodeData, ClusterData)}
end.
merge_cluster_sampler_map(M1, M2) ->
Fun =
fun
(topics, Map) ->
Map#{topics => maps:get(topics, M1)};
(Key, Map) ->
Map#{Key => maps:get(Key, M1) + maps:get(Key, M2)}
end,
lists:foldl(Fun, #{}, ?SAMPLER_LIST).
merge_cluster_rate(Node, Cluster) ->
Fun =
fun
(topics, Value, NCluster) ->
NCluster#{topics => Value};
(Key, Value, NCluster) ->
ClusterValue = maps:get(Key, NCluster, 0),
NCluster#{Key => Value + ClusterValue}
end,
maps:fold(Fun, Cluster, Node).
format({badrpc, Reason}) ->
{badrpc, Reason};
format(Data) ->
All = maps:fold(fun format/3, [], Data),
Compare = fun(#{time_stamp := T1}, #{time_stamp := T2}) -> T1 =< T2 end,
lists:sort(Compare, All).
format(TimeStamp, Data, All) ->
[Data#{time_stamp => TimeStamp} | All].
cal_rate(_Now, undefined) ->
AllSamples = ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP),
lists:foldl(fun(Key, Acc) -> Acc#{Key => 0} end, #{}, AllSamples);
cal_rate(
#emqx_monit{data = NowData, time = NowTime},
#emqx_monit{data = LastData, time = LastTime} = Last
) ->
case NowTime - LastTime of
0 ->
%% make sure: not divide by zero
timer:sleep(5),
NewSamplers = sample(erlang:system_time(millisecond)),
cal_rate(NewSamplers, Last);
TimeDelta ->
Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end,
Gauge = maps:filter(Filter, NowData),
{_, _, _, Rate} =
lists:foldl(
fun cal_rate_/2,
{NowData, LastData, TimeDelta, Gauge},
?DELTA_SAMPLER_LIST
),
Rate
end.
cal_rate_(Key, {Now, Last, TDelta, Res}) ->
NewValue = maps:get(Key, Now),
LastValue = maps:get(Key, Last),
Rate = ((NewValue - LastValue) * 1000) div TDelta,
RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP),
{Now, Last, TDelta, Res#{RateKey => Rate}}.
granularity_adapter([], Res) ->
lists:reverse(Res);
granularity_adapter([Sampler], Res) ->
granularity_adapter([], [Sampler | Res]);
granularity_adapter([Sampler1, Sampler2 | Rest], Res) ->
Fun =
fun(Key, M) ->
Value1 = maps:get(Key, Sampler1),
Value2 = maps:get(Key, Sampler2),
M#{Key => Value1 + Value2}
end,
granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]).
%% -------------------------------------------------------------------------------------------------
%% timer
sample_timer() ->
{NextTime, Remaining} = next_interval(),
erlang:send_after(Remaining, self(), {sample, NextTime}).
clean_timer() ->
erlang:send_after(?CLEAN_EXPIRED_INTERVAL, self(), clean_expired).
%% Per interval seconds.
%% As an example:
%% Interval = 10
%% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ...
%% Ensure that the monitor data of all nodes in the cluster are aligned in time
next_interval() ->
Interval = emqx_conf:get([dashboard, sample_interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000,
Now = erlang:system_time(millisecond),
NextTime = ((Now div Interval) + 1) * Interval,
Remaining = NextTime - Now,
{NextTime, Remaining}.
%% -------------------------------------------------------------------------------------------------
%% data
sample(Time) ->
Fun =
fun(Key, Res) ->
maps:put(Key, getstats(Key), Res)
end,
Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST),
#emqx_monit{time = Time, data = Data}.
flush(_Last = undefined, Now) ->
store(Now);
flush(_Last = #emqx_monit{data = LastData}, Now = #emqx_monit{data = NowData}) ->
Store = Now#emqx_monit{data = delta(LastData, NowData)},
store(Store).
delta(LastData, NowData) ->
Fun =
fun(Key, Data) ->
Value = maps:get(Key, NowData) - maps:get(Key, LastData),
Data#{Key => Value}
end,
lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST).
store(MonitData) ->
{atomic, ok} =
mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]).
clean() ->
Now = erlang:system_time(millisecond),
ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
Expired = ets:select(?TAB, ExpiredMS),
lists:foreach(
fun(Data) ->
true = ets:delete_object(?TAB, Data)
end,
Expired
),
ok.
%% To make it easier to do data aggregation
internal_format(List) when is_list(List) ->
Fun =
fun(Data, All) ->
maps:merge(internal_format(Data), All)
end,
lists:foldl(Fun, #{}, List);
internal_format(#emqx_monit{time = Time, data = Data}) ->
#{Time => Data}.
getstats(Key) ->
%% Stats ets maybe not exist when ekka join.
try
stats(Key)
catch
_:_ -> 0
end.
stats(connections) -> emqx_stats:getstat('connections.count');
stats(topics) -> emqx_stats:getstat('topics.count');
stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
stats(received) -> emqx_metrics:val('messages.received');
stats(received_bytes) -> emqx_metrics:val('bytes.received');
stats(sent) -> emqx_metrics:val('messages.sent');
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
stats(dropped) -> emqx_metrics:val('messages.dropped').