From c21bc9d329ef904f3a7a80aa3a8ae4752fc6c749 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 24 Feb 2022 21:25:39 +0800 Subject: [PATCH] feat: dashboard monitor granularity adapter --- .../emqx_dashboard/include/emqx_dashboard.hrl | 14 +- .../src/emqx_dashboard_monitor.erl | 136 ++++++++++++++---- .../src/emqx_dashboard_monitor_api.erl | 47 +++++- 3 files changed, 162 insertions(+), 35 deletions(-) diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index a32affef0..c8a2f8a1f 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -55,8 +55,18 @@ , dropped ]). --define(SAMPLER_LIST, +-define(GAUGE_SAMPLER_LIST, [ subscriptions , routes , connections - ] ++ ?DELTA_SAMPLER_LIST). + ]). + +-define(SAMPLER_LIST, ?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST). + +-define(DELTA_SAMPLER_RATE_MAP, #{ + received => received_rate, + received_bytes => received_bytes_rate, + sent => sent_rate, + sent_bytes => sent_bytes_rate, + dropped => dropped_rate + }). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 40008f23e..1eeb5b68f 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -35,12 +35,14 @@ -export([ mnesia/1]). -export([ samplers/0 - , samplers/1 , samplers/2 + , current_rate/0 + , current_rate/1 + , granularity_adapter/1 ]). %% for rpc --export([ do_sample/1]). +-export([ do_sample/2]). -define(TAB, ?MODULE). @@ -66,30 +68,53 @@ mnesia(boot) -> {record_name, emqx_monit}, {attributes, record_info(fields, emqx_monit)}]). +%% ------------------------------------------------------------------------------------------------- +%% API + samplers() -> - samplers(all). + format(do_sample(all, infinity)). -samplers(NodeOrCluster) -> - format(do_sample(NodeOrCluster)). - -samplers(NodeOrCluster, 0) -> - samplers(NodeOrCluster); samplers(NodeOrCluster, Latest) -> - case samplers(NodeOrCluster) of + Now = erlang:system_time(millisecond), + MatchTime = Now - (Latest * 1000), + case format(do_sample(NodeOrCluster, MatchTime)) of {badrpc, Reason} -> {badrpc, Reason}; List when is_list(List) -> - case erlang:length(List) - Latest of - Start when Start > 0 -> - lists:sublist(List, Start, Latest); - _ -> - List - end + granularity_adapter(List) end. -%%%=================================================================== -%%% gen_server functions -%%%=================================================================== +granularity_adapter(List) when length(List) > 100 -> + granularity_adapter(List, []); +granularity_adapter(List) -> + List. + +current_rate() -> + Fun = + fun(Node, Cluster) -> + case current_rate(Node) of + {ok, CurrentRate} -> + merge_cluster_rate(CurrentRate, Cluster); + {badrpc, Reason} -> + {badrpc, {Node, Reason}} + end + end, + lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)). + +current_rate(all) -> + current_rate(); +current_rate(Node) when Node == node() -> + do_call(current_rate); +current_rate(Node) -> + case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000) of + {badrpc, Reason} -> + {badrpc, {Node, Reason}}; + {ok, Rate} -> + {ok, Rate} + end. + +%% ------------------------------------------------------------------------------------------------- +%% gen_server functions start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -99,6 +124,11 @@ init([]) -> clean_timer(), {ok, #state{last = undefined}}. +handle_call(current_rate, _From, State = #state{last = Last}) -> + NowTime = erlang:system_time(millisecond), + NowSamplers = sample(NowTime), + Rate = cal_rate(NowSamplers, Last), + {reply, {ok, Rate}, State}; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -125,14 +155,16 @@ terminate(_Reason, _State = #state{}) -> code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. -%%%=================================================================== -%%% Internal functions -%%%=================================================================== +%% ------------------------------------------------------------------------------------------------- +%% Internal functions -do_sample(all) -> +do_call(Request) -> + gen_server:call(?MODULE, Request, 5000). + +do_sample(all, MatchTime) -> Fun = fun(Node, All) -> - case do_sample(Node) of + case do_sample(Node, MatchTime) of {badrpc, Reason} -> {badrpc, {Node, Reason}}; NodeSamplers -> @@ -140,11 +172,16 @@ do_sample(all) -> end end, lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)); -do_sample(Node) when Node == node() -> - ExpiredMS = [{'$1',[],['$1']}], - internal_format(ets:select(?TAB, ExpiredMS)); -do_sample(Node) -> - rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000). +do_sample(Node, MatchTime) when Node == node() -> + MS = match_spec(MatchTime), + internal_format(ets:select(?TAB, MS)); +do_sample(Node, MatchTime) -> + rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, MatchTime], 5000). + +match_spec(infinity) -> + [{'$1',[],['$1']}]; +match_spec(MatchTime) -> + [{{'_', '$1', '_'}, [{'>=', '$1', MatchTime}], ['$_']}]. merge_cluster_samplers(Node, Cluster) -> maps:fold(fun merge_cluster_samplers/3, Cluster, Node). @@ -157,6 +194,14 @@ merge_cluster_samplers(TS, NodeData, Cluster) -> Cluster#{TS => count_map(NodeData, ClusterData)} end. +merge_cluster_rate(Node, Cluster) -> + Fun = + fun(Key, Value, NCluster) -> + ClusterValue = maps:get(Key, NCluster, 0), + NCluster#{Key => Value + ClusterValue} + end, + maps:fold(Fun, Cluster, Node). + format({badrpc, Reason}) -> {badrpc, Reason}; format(Data) -> @@ -167,6 +212,38 @@ format(Data) -> format(TimeStamp, Data, All) -> [Data#{time_stamp => TimeStamp} | All]. +cal_rate( #emqx_monit{data = NowData, time = NowTime} + , #emqx_monit{data = LastData, time = LastTime}) -> + TimeDelta = NowTime - LastTime, + Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end, + Gauge = maps:filter(Filter, NowData), + {_, _, _, Rate} = + lists:foldl(fun cal_rate_/2, {NowData, LastData, TimeDelta, Gauge}, ?DELTA_SAMPLER_LIST), + Rate. + +cal_rate_(Key, {Now, Last, TDelta, Res}) -> + NewValue = maps:get(Key, Now), + LastValue = maps:get(Key, Last), + Rate = ((NewValue - LastValue) * 1000) div TDelta, + RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP), + {Now, Last, TDelta, Res#{RateKey => Rate}}. + +granularity_adapter([], Res) -> + lists:reverse(Res); +granularity_adapter([Sampler], Res) -> + granularity_adapter([], [Sampler | Res]); +granularity_adapter([Sampler1, Sampler2 | Rest], Res) -> + Fun = + fun(Key, M) -> + Value1 = maps:get(Key, Sampler1), + Value2 = maps:get(Key, Sampler2), + M#{Key => Value1 + Value2} + end, + granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]). + +%% ------------------------------------------------------------------------------------------------- +%% timer + sample_timer() -> {NextTime, Remaining} = next_interval(), erlang:send_after(Remaining, self(), {sample, NextTime}). @@ -186,6 +263,9 @@ next_interval() -> Remaining = NextTime - Now, {NextTime, Remaining}. +%% ------------------------------------------------------------------------------------------------- +%% data + sample(Time) -> Fun = fun(Key, Res) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 65fb3a153..e0662c972 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -16,7 +16,9 @@ , fields/1 ]). --export([ monitor/2]). +-export([ monitor/2 + , monitor_current/2 + ]). api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). @@ -24,6 +26,7 @@ api_spec() -> paths() -> [ "/monitor" , "/monitor/nodes/:node" + , "/monitor/current" ]. schema("/monitor") -> @@ -55,19 +58,34 @@ schema("/monitor/nodes/:node") -> 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>) } } + }; + +schema("/monitor/current") -> + #{ + 'operationId' => monitor_current, + get => #{ + description => <<"Current monitor data. Gauge and rate">>, + responses => #{ + 200 => hoconsc:mk(hoconsc:ref(sampler_current), #{}) + } + } }. fields(sampler) -> Samplers = [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} || SamplerName <- ?SAMPLER_LIST], - [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]. + [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]; + +fields(sampler_current) -> + [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})} + || SamplerName <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST]. %% ------------------------------------------------------------------------------------------------- %% API monitor(get, #{query_string := QS, bindings := Bindings}) -> - Latest = maps:get(<<"latest">>, QS, 0), + Latest = maps:get(<<"latest">>, QS, 1000), Node = binary_to_atom(maps:get(node, Bindings, <<"all">>)), case emqx_dashboard_monitor:samplers(Node, Latest) of {badrpc, {Node, Reason}} -> @@ -77,6 +95,16 @@ monitor(get, #{query_string := QS, bindings := Bindings}) -> {200, Samplers} end. +monitor_current(get, #{query_string := QS}) -> + NodeOrCluster = binary_to_atom(maps:get(<<"node">>, QS, <<"all">>), utf8), + case emqx_dashboard_monitor:current_rate(NodeOrCluster) of + {ok, CurrentRate} -> + {200, CurrentRate}; + {badrpc, {Node, Reason}} -> + Message = list_to_binary(io_lib:format("Bad node ~p, rpc failed ~p", [Node, Reason])), + {400, 'BAD_RPC', Message} + end. + %% ------------------------------------------------------------------------------------------------- %% Internal @@ -93,8 +121,17 @@ sampler_desc(routes) -> " Can only represent the approximate state">>; sampler_desc(connections) -> <<"Connections at the time of sampling." - " Can only represent the approximate state">>. + " Can only represent the approximate state">>; + +sampler_desc(received_rate) -> sampler_desc_format("Dropped messages ", per); +sampler_desc(received_bytes_rate) -> sampler_desc_format("Received bytes ", per); +sampler_desc(sent_rate) -> sampler_desc_format("Sent messages ", per); +sampler_desc(sent_bytes_rate) -> sampler_desc_format("Sent bytes ", per); +sampler_desc(dropped_rate) -> sampler_desc_format("Dropped messages ", per). sampler_desc_format(Format) -> + sampler_desc_format(Format, last). + +sampler_desc_format(Format, Type) -> Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL), - list_to_binary(io_lib:format(Format ++ "last ~p seconds", [Interval])). + list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).