diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 549c59184..02a881c53 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -24,27 +24,29 @@ -boot_mnesia({mnesia, [boot]}). --export([ start_link/0]). +-export([start_link/0]). --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). --export([ mnesia/1]). +-export([mnesia/1]). --export([ samplers/0 - , samplers/2 - , current_rate/0 - , current_rate/1 - , granularity_adapter/1 - ]). +-export([ + samplers/0, + samplers/2, + current_rate/0, + current_rate/1, + granularity_adapter/1 +]). %% for rpc --export([ do_sample/2]). +-export([do_sample/2]). -define(TAB, ?MODULE). @@ -55,12 +57,12 @@ -record(state, { last - }). +}). -record(emqx_monit, { time :: integer(), data :: map() - }). +}). mnesia(boot) -> ok = mria:create_table(?TAB, [ @@ -68,7 +70,8 @@ mnesia(boot) -> {local_content, true}, {storage, disc_copies}, {record_name, emqx_monit}, - {attributes, record_info(fields, emqx_monit)}]). + {attributes, record_info(fields, emqx_monit)} + ]). %% ------------------------------------------------------------------------------------------------- %% API @@ -120,7 +123,7 @@ current_rate() -> {badrpc, {Node, Reason}} end end, - case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running))of + case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)) of {badrpc, Reason} -> {badrpc, Reason}; Rate -> @@ -133,12 +136,16 @@ current_rate(Node) when Node == node() -> try {ok, Rate} = do_call(current_rate), {ok, Rate} - catch _E:R -> - ?SLOG(warning, #{msg => "Dashboard monitor error", reason => R}), - %% Rate map 0, ensure api will not crash. - %% When joining cluster, dashboard monitor restart. - Rate0 = [{Key, 0} || Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)], - {ok, maps:from_list(Rate0)} + catch + _E:R -> + ?SLOG(warning, #{msg => "Dashboard monitor error", reason => R}), + %% Rate map 0, ensure api will not crash. + %% When joining cluster, dashboard monitor restart. + Rate0 = [ + {Key, 0} + || Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP) + ], + {ok, maps:from_list(Rate0)} end; current_rate(Node) -> case emqx_dashboard_proto_v1:current_rate(Node) of @@ -175,12 +182,10 @@ handle_info({sample, Time}, State = #state{last = Last}) -> {atomic, ok} = flush(Last, Now), sample_timer(), {noreply, State#state{last = Now}}; - handle_info(clean_expired, State) -> clean(), clean_timer(), {noreply, State}; - handle_info(_Info, State = #state{}) -> {noreply, State}. @@ -256,8 +261,10 @@ format(TimeStamp, Data, All) -> cal_rate(_Now, undefined) -> AllSamples = ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP), lists:foldl(fun(Key, Acc) -> Acc#{Key => 0} end, #{}, AllSamples); -cal_rate( #emqx_monit{data = NowData, time = NowTime} - , #emqx_monit{data = LastData, time = LastTime}) -> +cal_rate( + #emqx_monit{data = NowData, time = NowTime}, + #emqx_monit{data = LastData, time = LastTime} +) -> TimeDelta = NowTime - LastTime, Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end, Gauge = maps:filter(Filter, NowData), @@ -340,9 +347,12 @@ clean() -> Now = erlang:system_time(millisecond), ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}], Expired = ets:select(?TAB, ExpiredMS), - lists:foreach(fun(Data) -> - true = ets:delete_object(?TAB, Data) - end, Expired), + lists:foreach( + fun(Data) -> + true = ets:delete_object(?TAB, Data) + end, + Expired + ), ok. %% To make it easier to do data aggregation @@ -364,8 +374,10 @@ count_map(M1, M2) -> getstats(Key) -> %% Stats ets maybe not exist when ekka join. - try stats(Key) - catch _: _ -> 0 + try + stats(Key) + catch + _:_ -> 0 end. stats(connections) -> emqx_stats:getstat('connections.count');