Merge pull request #5451 from DDDHuang/fix_monitor_params

fix: merge counters each node
This commit is contained in:
DDDHuang 2021-08-11 17:46:25 +08:00 committed by GitHub
commit d62e7239c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 26 deletions

View File

@ -56,7 +56,7 @@ start_link() ->
get_collect() -> gen_server:call(whereis(?MODULE), get_collect). get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
init([]) -> init([]) ->
timer(timer:seconds(interval()), collect), timer(next_interval(), collect),
timer(get_today_remaining_seconds(), clear_expire_data), timer(get_today_remaining_seconds(), clear_expire_data),
ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
State = #{ State = #{
@ -68,6 +68,15 @@ init([]) ->
}, },
{ok, State}. {ok, State}.
%% @doc every whole interval seconds;
%% example:
%% interval is 10s
%% now 15:01:07 (or 15:07:01 ~ 15:07:10)
%% next will be 15:01:10, 15:01:20, 15:01:30 ...
%% ensure all counters in cluster have sync time
next_interval() ->
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
interval() -> interval() ->
emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL). emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL).
@ -82,17 +91,17 @@ handle_cast(_Req, State) ->
{noreply, State}. {noreply, State}.
handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) -> handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) ->
timer(next_interval(), collect),
NewLastCollect = flush(collect_all(Collect), LastCollect), NewLastCollect = flush(collect_all(Collect), LastCollect),
TempCollect1 = temp_collect(TempCollect), TempCollect1 = temp_collect(TempCollect),
timer(timer:seconds(interval()), collect),
{noreply, State#{count => count(), {noreply, State#{count => count(),
collect => ?COLLECT, collect => ?COLLECT,
temp_collect => TempCollect1, temp_collect => TempCollect1,
last_collects => NewLastCollect}}; last_collects => NewLastCollect}};
handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) -> handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) ->
timer(next_interval(), collect),
TempCollect1 = temp_collect(TempCollect), TempCollect1 = temp_collect(TempCollect),
timer(timer:seconds(interval()), collect),
{noreply, State#{count => Count - 1, {noreply, State#{count => Count - 1,
collect => collect_all(Collect), collect => collect_all(Collect),
temp_collect => TempCollect1}, hibernate}; temp_collect => TempCollect1}, hibernate};
@ -170,4 +179,4 @@ get_today_remaining_seconds() ->
get_local_time() -> get_local_time() ->
(calendar:datetime_to_gregorian_seconds(calendar:local_time()) - (calendar:datetime_to_gregorian_seconds(calendar:local_time()) -
calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})) * 1000. calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})).

View File

@ -8,11 +8,6 @@
-behaviour(minirest_api). -behaviour(minirest_api).
-export([ sampling/1
, sampling/2
, get_collect/1
]).
-export([api_spec/0]). -export([api_spec/0]).
-export([counters/2, current_counters/2]). -export([counters/2, current_counters/2]).
@ -36,8 +31,7 @@ monitor_api() ->
name => node, name => node,
in => query, in => query,
required => false, required => false,
schema => #{type => string}, schema => #{type => string}
example => node()
}, },
#{ #{
name => counter, name => counter,
@ -47,7 +41,7 @@ monitor_api() ->
} }
], ],
responses => #{ responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"Monitor count data">>, counters)}}}, <<"200">> => emqx_mgmt_util:response_schema(<<"Monitor count data">>, counters)}}},
{"/monitor", Metadata, counters}. {"/monitor", Metadata, counters}.
monitor_current_api() -> monitor_current_api() ->
Metadata = #{ Metadata = #{
@ -62,9 +56,6 @@ current_counters_schema() ->
#{ #{
type => object, type => object,
properties => #{ properties => #{
nodes => #{
type => integer,
description => <<"Nodes count">>},
connection => #{type => integer}, connection => #{type => integer},
sent => #{type => integer}, sent => #{type => integer},
received => #{type => integer}, received => #{type => integer},
@ -72,13 +63,11 @@ current_counters_schema() ->
}. }.
counters_schema() -> counters_schema() ->
Node = #{ Fun =
node => #{ fun(K, M) ->
type => string, maps:merge(M, counters_schema(K))
example => node() end,
} Properties = lists:foldl(Fun, #{}, ?COUNTERS),
},
Properties = lists:foldl(fun(K, M) -> maps:merge(M, counters_schema(K)) end, Node, ?COUNTERS),
#{ #{
counters => #{ counters => #{
type => object, type => object,
@ -100,8 +89,7 @@ counters_schema(Name) ->
counters(get, Request) -> counters(get, Request) ->
case cowboy_req:parse_qs(Request) of case cowboy_req:parse_qs(Request) of
[] -> [] ->
Response = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], {200, get_collect()};
{200, Response};
Params -> Params ->
lookup(Params) lookup(Params)
end. end.
@ -144,6 +132,10 @@ format_current_metrics([], Acc) ->
format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) -> format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) ->
format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
get_collect() ->
Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()],
merger_counters(Counters).
get_collect(Node) when Node =:= node() -> get_collect(Node) when Node =:= node() ->
emqx_dashboard_collection:get_collect(); emqx_dashboard_collection:get_collect();
get_collect(Node) -> get_collect(Node) ->
@ -152,17 +144,61 @@ get_collect(Node) ->
Res -> Res Res -> Res
end. end.
merger_counters(ClusterCounters) ->
lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters).
merger_node_counters(NodeCounters, Counters) ->
maps:fold(fun merger_counter/3, Counters, NodeCounters).
merger_counter(Key, Counters, Res) ->
case maps:get(Key, Res, undefined) of
undefined ->
Res#{Key => Counters};
OldCounters ->
NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters),
Res#{Key => NCounters}
end.
merger_counter(#{timestamp := Timestamp, count := Value}, Counters) ->
Comparison =
fun(Counter) ->
case maps:get(timestamp, Counter) =:= Timestamp of
true ->
Count = maps:get(count, Counter),
{ok, Counter#{count => Count + Value}};
false ->
ignore
end
end,
key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}).
key_replace(List, Comparison, Default) ->
key_replace(List, List, Comparison, Default).
key_replace([], All, _Comparison, Default) ->
[Default | All];
key_replace([Term | List], All, Comparison, Default) ->
case Comparison(Term) of
{ok, NTerm} ->
Tail = [NTerm | List],
Header = lists:sublist(All, length(All) - length(Tail)),
lists:append(Header, Tail);
_ ->
key_replace(List, All, Comparison, Default)
end.
sampling(Node) when Node =:= node() -> sampling(Node) when Node =:= node() ->
Time = emqx_dashboard_collection:get_local_time() - 7200000, Time = emqx_dashboard_collection:get_local_time() - 7200000,
All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
maps:put(node, Node, format(lists:sort(All))); format(lists:sort(All));
sampling(Node) -> sampling(Node) ->
rpc:call(Node, ?MODULE, sampling, [Node]). rpc:call(Node, ?MODULE, sampling, [Node]).
sampling(Node, Counter) when Node =:= node() -> sampling(Node, Counter) when Node =:= node() ->
Time = emqx_dashboard_collection:get_local_time() - 7200000, Time = emqx_dashboard_collection:get_local_time() - 7200000,
All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
maps:put(node, Node, format_single(lists:sort(All), Counter)); format_single(lists:sort(All), Counter);
sampling(Node, Counter) -> sampling(Node, Counter) ->
rpc:call(Node, ?MODULE, sampling, [Node, Counter]). rpc:call(Node, ?MODULE, sampling, [Node, Counter]).