From 21b9943df9ee3fd12544a0f59c5604b638fe0cb6 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 18 Feb 2022 19:34:33 +0800 Subject: [PATCH] feat: new monitor TODO: API --- apps/emqx_dashboard/etc/emqx_dashboard.conf | 1 + .../src/emqx_dashboard_monitor.erl | 196 ++++++++++++++++-- .../src/emqx_dashboard_schema.erl | 1 + .../emqx_dashboard/src/emqx_dashboard_sup.erl | 2 +- 4 files changed, 179 insertions(+), 21 deletions(-) diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index 1c5a757d7..e5a15b515 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -6,6 +6,7 @@ dashboard { default_username = "admin" default_password = "public" ## notice: sample_interval should be divisible by 60. + ## as like 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s sample_interval = 10s ## api jwt timeout. default is 30 minute token_expired_time = 60m diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 53f9bb5f8..c588f5a25 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -15,9 +15,14 @@ %%-------------------------------------------------------------------- -module(emqx_dashboard_monitor). + +-include_lib("stdlib/include/ms_transform.hrl"). + -behaviour(gen_server). --export([start_link/0]). +-boot_mnesia({mnesia, [boot]}). + +-export([ start_link/0]). -export([ init/1 , handle_call/3 @@ -29,36 +34,88 @@ -export([ mnesia/1]). +-export([ samples/0 + , samples/1 + , aggregate_samplers/0 + ]). + -define(TAB, ?MODULE). --record(state, {}). --define(INIT_DATA, - #{ - sent => 0, - received => 0, - sent_bytes => 0, - received_bytes => 0, - dropped => 0, - subscriptions => 0, - routes => 0, - connection => 0 +%% 10 seconds +-define(DEFAULT_INTERVAL, 10). + +-ifdef(TEST). +%% for test +-define(CLEAN_EXPIRED_INTERVAL, 2 * 1000). +-define(RETENTION_TIME, 3 * 1000). +-define(DEFAULT_GET_DATA_TIME, 5* 1000). + +-else. + +%% 1 hour = 60 * 60 * 1000 milliseconds +-define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000). +%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds +-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000). +%% 1 day = 60 * 60 * 1000 milliseconds +-define(DEFAULT_GET_DATA_TIME, 60 * 60 * 1000). + +-endif. + +-record(state, { + last }). --record(monitor_data) +-record(emqx_monit, { + time :: integer(), + data :: map() + }). + + +-define(DELTA_LIST, + [ received + , received_bytes + , sent + , sent_bytes + , dropped + ]). + +-define(SAMPLER_LIST, + [ subscriptions + , routes + , connections + ] ++ ?DELTA_LIST). mnesia(boot) -> ok = mria:create_table(?TAB, [ {type, set}, {local_content, true}, {storage, disc_copies}, - {record_name, monitor_data}, - {attributes, record_info(fields, monitor_data)}]). + {record_name, emqx_monit}, + {attributes, record_info(fields, emqx_monit)}]). + +aggregate_samplers() -> + [#{node => Node, data => samples(Node)} || Node <- mria_mnesia:cluster_nodes(running)]. + +samples() -> + All = [samples(Node) || Node <- mria_mnesia:cluster_nodes(running)], + lists:foldl(fun merge_cluster_samplers/2, #{}, All). + +samples(Node) when Node == node() -> + get_data(?DEFAULT_GET_DATA_TIME); +samples(Node) -> + rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node]). + +%%%=================================================================== +%%% gen_server functions +%%%=================================================================== start_link() -> - gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> - {ok, #state{}}. + sample_timer(), + clean_timer(), + {ok, #state{last = undefined}}. handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -66,6 +123,17 @@ handle_call(_Request, _From, State = #state{}) -> handle_cast(_Request, State = #state{}) -> {noreply, State}. +handle_info({sample, Time}, State = #state{last = Last}) -> + Now = sample(Time), + {atomic, ok} = flush(Last, Now), + sample_timer(), + {noreply, State#state{last = Now}}; + +handle_info(clean_expired, State) -> + clean(), + clean_timer(), + {noreply, State}; + handle_info(_Info, State = #state{}) -> {noreply, State}. @@ -79,9 +147,97 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +sample_timer() -> + {NextTime, Remaining} = next_interval(), + erlang:send_after(Remaining, self(), {sample, NextTime}). + +clean_timer() -> + erlang:send_after(?CLEAN_EXPIRED_INTERVAL, self(), clean_expired). + +%% Per interval seconds. +%% As an example: +%% Interval = 10 +%% The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ... +%% Ensure that the monitor data of all nodes in the cluster are aligned in time next_interval() -> - ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL), + Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_INTERVAL) * 1000, + Now = erlang:system_time(millisecond), + NextTime = ((Now div Interval) + 1) * Interval, + Remaining = NextTime - Now, + {NextTime, Remaining}. + +sample(Time) -> + Fun = + fun(Key, Res) -> + maps:put(Key, value(Key), Res) + end, + Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST), + #emqx_monit{time = Time, data = Data}. + +flush(_Last = undefined, Now) -> + store(Now); +flush(_Last = #emqx_monit{data = LastData}, Now = #emqx_monit{data = NowData}) -> + Store = Now#emqx_monit{data = delta(LastData, NowData)}, + store(Store). + +delta(LastData, NowData) -> + Fun = + fun(Key, Data) -> + Value = maps:get(Key, NowData) - maps:get(Key, LastData), + Data#{Key => Value} + end, + lists:foldl(Fun, NowData, ?DELTA_LIST). + +store(MonitData) -> + {atomic, ok} = + mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]). + +clean() -> + Now = erlang:system_time(millisecond), + ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}], + Expired = ets:select(?TAB, ExpiredMS), + lists:foreach(fun(Data) -> + true = ets:delete_object(?TAB, Data) + end, Expired), ok. -monit() -> - ok. +get_data(PastTime) -> + Now = erlang:system_time(millisecond), + ExpiredMS = [{{'_', '$1', '_'}, [{'<', {'-', Now, '$1'}, PastTime}], ['$_']}], + format(ets:select(?TAB, ExpiredMS)). + +format(List) when is_list(List) -> + Fun = + fun(Data, All) -> + maps:merge(format(Data), All) + end, + lists:foldl(Fun, #{}, List); +format(#emqx_monit{time = Time, data = Data}) -> + #{Time => Data}. + +merge_cluster_samplers(Node, Cluster) -> + maps:fold(fun merge_cluster_samplers/3, Cluster, Node). + +merge_cluster_samplers(TS, NodeData, Cluster) -> + case maps:get(TS, Cluster, undefined) of + undefined -> + Cluster#{TS => NodeData}; + ClusterData -> + Cluster#{TS => count_map(NodeData, ClusterData)} + end. + +count_map(M1, M2) -> + Fun = + fun(Key, Map) -> + Map#{key => maps:get(Key, M1) + maps:get(Key, M2)} + end, + lists:foldl(Fun, #{}, ?SAMPLER_LIST). + +value(connections) -> emqx_stats:getstat('connections.count'); +value(routes) -> emqx_stats:getstat('routes.count'); +value(subscriptions) -> emqx_stats:getstat('subscriptions.count'); +value(received) -> emqx_metrics:val('messages.received'); +value(received_bytes) -> emqx_metrics:val('bytes.received'); +value(sent) -> emqx_metrics:val('messages.sent'); +value(sent_bytes) -> emqx_metrics:val('bytes.sent'); +value(dropped) -> emqx_metrics:val('messages.dropped'). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index f112fc852..efd890bec 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -39,6 +39,7 @@ but use the same port. """ })} , {default_username, fun default_username/1} , {default_password, fun default_password/1} + %% TODO: enum 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s , {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})} , {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})} , {cors, fun cors/1} diff --git a/apps/emqx_dashboard/src/emqx_dashboard_sup.erl b/apps/emqx_dashboard/src/emqx_dashboard_sup.erl index a4ce812b6..7e13221bd 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_sup.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_sup.erl @@ -29,4 +29,4 @@ start_link() -> init([]) -> {ok, {{one_for_all, 10, 100}, - [?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_collection)]}}. + [?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_monitor)]}}.