From 776aabda7da55c78f7769027c99d806fe02f605b Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Wed, 11 Aug 2021 14:13:07 +0800 Subject: [PATCH] fix: merge counters each node --- .../src/emqx_dashboard_collection.erl | 17 +++- .../src/emqx_dashboard_monitor_api.erl | 80 ++++++++++++++----- 2 files changed, 71 insertions(+), 26 deletions(-) diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index bf172ee97..91d60e1ab 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -56,7 +56,7 @@ start_link() -> get_collect() -> gen_server:call(whereis(?MODULE), get_collect). init([]) -> - timer(timer:seconds(interval()), collect), + timer(next_interval(), collect), timer(get_today_remaining_seconds(), clear_expire_data), ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), State = #{ @@ -68,6 +68,15 @@ init([]) -> }, {ok, State}. +%% @doc every whole interval seconds; +%% example: +%% interval is 10s +%% now 15:01:07 (or 15:07:01 ~ 15:07:10) +%% next will be 15:01:10, 15:01:20, 15:01:30 ... +%% ensure all counters in cluster have sync time +next_interval() -> + (1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1. + interval() -> emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL). @@ -82,17 +91,17 @@ handle_cast(_Req, State) -> {noreply, State}. handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) -> + timer(next_interval(), collect), NewLastCollect = flush(collect_all(Collect), LastCollect), TempCollect1 = temp_collect(TempCollect), - timer(timer:seconds(interval()), collect), {noreply, State#{count => count(), collect => ?COLLECT, temp_collect => TempCollect1, last_collects => NewLastCollect}}; handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) -> + timer(next_interval(), collect), TempCollect1 = temp_collect(TempCollect), - timer(timer:seconds(interval()), collect), {noreply, State#{count => Count - 1, collect => collect_all(Collect), temp_collect => TempCollect1}, hibernate}; @@ -170,4 +179,4 @@ get_today_remaining_seconds() -> get_local_time() -> (calendar:datetime_to_gregorian_seconds(calendar:local_time()) - - calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})) * 1000. + calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 130139780..277b0b1fd 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -8,11 +8,6 @@ -behaviour(minirest_api). --export([ sampling/1 - , sampling/2 - , get_collect/1 - ]). - -export([api_spec/0]). -export([counters/2, current_counters/2]). @@ -36,8 +31,7 @@ monitor_api() -> name => node, in => query, required => false, - schema => #{type => string}, - example => node() + schema => #{type => string} }, #{ name => counter, @@ -47,7 +41,7 @@ monitor_api() -> } ], responses => #{ - <<"200">> => emqx_mgmt_util:response_array_schema(<<"Monitor count data">>, counters)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Monitor count data">>, counters)}}}, {"/monitor", Metadata, counters}. monitor_current_api() -> Metadata = #{ @@ -62,9 +56,6 @@ current_counters_schema() -> #{ type => object, properties => #{ - nodes => #{ - type => integer, - description => <<"Nodes count">>}, connection => #{type => integer}, sent => #{type => integer}, received => #{type => integer}, @@ -72,13 +63,11 @@ current_counters_schema() -> }. counters_schema() -> - Node = #{ - node => #{ - type => string, - example => node() - } - }, - Properties = lists:foldl(fun(K, M) -> maps:merge(M, counters_schema(K)) end, Node, ?COUNTERS), + Fun = + fun(K, M) -> + maps:merge(M, counters_schema(K)) + end, + Properties = lists:foldl(Fun, #{}, ?COUNTERS), #{ counters => #{ type => object, @@ -100,8 +89,7 @@ counters_schema(Name) -> counters(get, Request) -> case cowboy_req:parse_qs(Request) of [] -> - Response = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], - {200, Response}; + {200, get_collect()}; Params -> lookup(Params) end. @@ -144,6 +132,10 @@ format_current_metrics([], Acc) -> format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) -> format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). +get_collect() -> + Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], + merger_counters(Counters). + get_collect(Node) when Node =:= node() -> emqx_dashboard_collection:get_collect(); get_collect(Node) -> @@ -152,17 +144,61 @@ get_collect(Node) -> Res -> Res end. +merger_counters(ClusterCounters) -> + lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters). + +merger_node_counters(NodeCounters, Counters) -> + maps:fold(fun merger_counter/3, Counters, NodeCounters). + +merger_counter(Key, Counters, Res) -> + case maps:get(Key, Res, undefined) of + undefined -> + Res#{Key => Counters}; + OldCounters -> + NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters), + Res#{Key => NCounters} + end. + +merger_counter(#{timestamp := Timestamp, count := Value}, Counters) -> + Comparison = + fun(Counter) -> + case maps:get(timestamp, Counter) =:= Timestamp of + true -> + Count = maps:get(count, Counter), + {ok, Counter#{count => Count + Value}}; + false -> + ignore + end + end, + key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}). + +key_replace(List, Comparison, Default) -> + key_replace(List, List, Comparison, Default). + +key_replace([], All, _Comparison, Default) -> + [Default | All]; + +key_replace([Term | List], All, Comparison, Default) -> + case Comparison(Term) of + {ok, NTerm} -> + Tail = [NTerm | List], + Header = lists:sublist(All, length(All) - length(Tail)), + lists:append(Header, Tail); + _ -> + key_replace(List, All, Comparison, Default) + end. + sampling(Node) when Node =:= node() -> Time = emqx_dashboard_collection:get_local_time() - 7200000, All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), - maps:put(node, Node, format(lists:sort(All))); + format(lists:sort(All)); sampling(Node) -> rpc:call(Node, ?MODULE, sampling, [Node]). sampling(Node, Counter) when Node =:= node() -> Time = emqx_dashboard_collection:get_local_time() - 7200000, All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), - maps:put(node, Node, format_single(lists:sort(All), Counter)); + format_single(lists:sort(All), Counter); sampling(Node, Counter) -> rpc:call(Node, ?MODULE, sampling, [Node, Counter]).