fix: merge counters each node

This commit is contained in:
DDDHuang 2021-08-11 14:13:07 +08:00
parent 1239eb23b0
commit 776aabda7d
2 changed files with 71 additions and 26 deletions

View File

@ -56,7 +56,7 @@ start_link() ->
get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
init([]) ->
timer(timer:seconds(interval()), collect),
timer(next_interval(), collect),
timer(get_today_remaining_seconds(), clear_expire_data),
ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
State = #{
@ -68,6 +68,15 @@ init([]) ->
},
{ok, State}.
%% @doc every whole interval seconds;
%% example:
%% interval is 10s
%% now 15:01:07 (or 15:07:01 ~ 15:07:10)
%% next will be 15:01:10, 15:01:20, 15:01:30 ...
%% ensure all counters in cluster have sync time
next_interval() ->
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
interval() ->
emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL).
@ -82,17 +91,17 @@ handle_cast(_Req, State) ->
{noreply, State}.
handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) ->
timer(next_interval(), collect),
NewLastCollect = flush(collect_all(Collect), LastCollect),
TempCollect1 = temp_collect(TempCollect),
timer(timer:seconds(interval()), collect),
{noreply, State#{count => count(),
collect => ?COLLECT,
temp_collect => TempCollect1,
last_collects => NewLastCollect}};
handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) ->
timer(next_interval(), collect),
TempCollect1 = temp_collect(TempCollect),
timer(timer:seconds(interval()), collect),
{noreply, State#{count => Count - 1,
collect => collect_all(Collect),
temp_collect => TempCollect1}, hibernate};
@ -170,4 +179,4 @@ get_today_remaining_seconds() ->
get_local_time() ->
(calendar:datetime_to_gregorian_seconds(calendar:local_time()) -
calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})) * 1000.
calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})).

View File

@ -8,11 +8,6 @@
-behaviour(minirest_api).
-export([ sampling/1
, sampling/2
, get_collect/1
]).
-export([api_spec/0]).
-export([counters/2, current_counters/2]).
@ -36,8 +31,7 @@ monitor_api() ->
name => node,
in => query,
required => false,
schema => #{type => string},
example => node()
schema => #{type => string}
},
#{
name => counter,
@ -47,7 +41,7 @@ monitor_api() ->
}
],
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"Monitor count data">>, counters)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Monitor count data">>, counters)}}},
{"/monitor", Metadata, counters}.
monitor_current_api() ->
Metadata = #{
@ -62,9 +56,6 @@ current_counters_schema() ->
#{
type => object,
properties => #{
nodes => #{
type => integer,
description => <<"Nodes count">>},
connection => #{type => integer},
sent => #{type => integer},
received => #{type => integer},
@ -72,13 +63,11 @@ current_counters_schema() ->
}.
counters_schema() ->
Node = #{
node => #{
type => string,
example => node()
}
},
Properties = lists:foldl(fun(K, M) -> maps:merge(M, counters_schema(K)) end, Node, ?COUNTERS),
Fun =
fun(K, M) ->
maps:merge(M, counters_schema(K))
end,
Properties = lists:foldl(Fun, #{}, ?COUNTERS),
#{
counters => #{
type => object,
@ -100,8 +89,7 @@ counters_schema(Name) ->
counters(get, Request) ->
case cowboy_req:parse_qs(Request) of
[] ->
Response = [sampling(Node) || Node <- ekka_mnesia:running_nodes()],
{200, Response};
{200, get_collect()};
Params ->
lookup(Params)
end.
@ -144,6 +132,10 @@ format_current_metrics([], Acc) ->
format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) ->
format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
get_collect() ->
Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()],
merger_counters(Counters).
get_collect(Node) when Node =:= node() ->
emqx_dashboard_collection:get_collect();
get_collect(Node) ->
@ -152,17 +144,61 @@ get_collect(Node) ->
Res -> Res
end.
merger_counters(ClusterCounters) ->
lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters).
merger_node_counters(NodeCounters, Counters) ->
maps:fold(fun merger_counter/3, Counters, NodeCounters).
merger_counter(Key, Counters, Res) ->
case maps:get(Key, Res, undefined) of
undefined ->
Res#{Key => Counters};
OldCounters ->
NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters),
Res#{Key => NCounters}
end.
merger_counter(#{timestamp := Timestamp, count := Value}, Counters) ->
Comparison =
fun(Counter) ->
case maps:get(timestamp, Counter) =:= Timestamp of
true ->
Count = maps:get(count, Counter),
{ok, Counter#{count => Count + Value}};
false ->
ignore
end
end,
key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}).
key_replace(List, Comparison, Default) ->
key_replace(List, List, Comparison, Default).
key_replace([], All, _Comparison, Default) ->
[Default | All];
key_replace([Term | List], All, Comparison, Default) ->
case Comparison(Term) of
{ok, NTerm} ->
Tail = [NTerm | List],
Header = lists:sublist(All, length(All) - length(Tail)),
lists:append(Header, Tail);
_ ->
key_replace(List, All, Comparison, Default)
end.
sampling(Node) when Node =:= node() ->
Time = emqx_dashboard_collection:get_local_time() - 7200000,
All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
maps:put(node, Node, format(lists:sort(All)));
format(lists:sort(All));
sampling(Node) ->
rpc:call(Node, ?MODULE, sampling, [Node]).
sampling(Node, Counter) when Node =:= node() ->
Time = emqx_dashboard_collection:get_local_time() - 7200000,
All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
maps:put(node, Node, format_single(lists:sort(All), Counter));
format_single(lists:sort(All), Counter);
sampling(Node, Counter) ->
rpc:call(Node, ?MODULE, sampling, [Node, Counter]).