fix: monitor crash by ets badarg

This commit is contained in:
Zhongwen Deng 2022-04-24 11:13:33 +08:00
parent 2a25767112
commit b7a6bd973f
1 changed files with 47 additions and 35 deletions

View File

@ -24,27 +24,29 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-export([ start_link/0]). -export([start_link/0]).
-export([ init/1 -export([
, handle_call/3 init/1,
, handle_cast/2 handle_call/3,
, handle_info/2 handle_cast/2,
, terminate/2 handle_info/2,
, code_change/3 terminate/2,
]). code_change/3
]).
-export([ mnesia/1]). -export([mnesia/1]).
-export([ samplers/0 -export([
, samplers/2 samplers/0,
, current_rate/0 samplers/2,
, current_rate/1 current_rate/0,
, granularity_adapter/1 current_rate/1,
]). granularity_adapter/1
]).
%% for rpc %% for rpc
-export([ do_sample/2]). -export([do_sample/2]).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
@ -55,12 +57,12 @@
-record(state, { -record(state, {
last last
}). }).
-record(emqx_monit, { -record(emqx_monit, {
time :: integer(), time :: integer(),
data :: map() data :: map()
}). }).
mnesia(boot) -> mnesia(boot) ->
ok = mria:create_table(?TAB, [ ok = mria:create_table(?TAB, [
@ -68,7 +70,8 @@ mnesia(boot) ->
{local_content, true}, {local_content, true},
{storage, disc_copies}, {storage, disc_copies},
{record_name, emqx_monit}, {record_name, emqx_monit},
{attributes, record_info(fields, emqx_monit)}]). {attributes, record_info(fields, emqx_monit)}
]).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% API %% API
@ -120,7 +123,7 @@ current_rate() ->
{badrpc, {Node, Reason}} {badrpc, {Node, Reason}}
end end
end, end,
case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running))of case lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)) of
{badrpc, Reason} -> {badrpc, Reason} ->
{badrpc, Reason}; {badrpc, Reason};
Rate -> Rate ->
@ -133,12 +136,16 @@ current_rate(Node) when Node == node() ->
try try
{ok, Rate} = do_call(current_rate), {ok, Rate} = do_call(current_rate),
{ok, Rate} {ok, Rate}
catch _E:R -> catch
?SLOG(warning, #{msg => "Dashboard monitor error", reason => R}), _E:R ->
%% Rate map 0, ensure api will not crash. ?SLOG(warning, #{msg => "Dashboard monitor error", reason => R}),
%% When joining cluster, dashboard monitor restart. %% Rate map 0, ensure api will not crash.
Rate0 = [{Key, 0} || Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)], %% When joining cluster, dashboard monitor restart.
{ok, maps:from_list(Rate0)} Rate0 = [
{Key, 0}
|| Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
],
{ok, maps:from_list(Rate0)}
end; end;
current_rate(Node) -> current_rate(Node) ->
case emqx_dashboard_proto_v1:current_rate(Node) of case emqx_dashboard_proto_v1:current_rate(Node) of
@ -175,12 +182,10 @@ handle_info({sample, Time}, State = #state{last = Last}) ->
{atomic, ok} = flush(Last, Now), {atomic, ok} = flush(Last, Now),
sample_timer(), sample_timer(),
{noreply, State#state{last = Now}}; {noreply, State#state{last = Now}};
handle_info(clean_expired, State) -> handle_info(clean_expired, State) ->
clean(), clean(),
clean_timer(), clean_timer(),
{noreply, State}; {noreply, State};
handle_info(_Info, State = #state{}) -> handle_info(_Info, State = #state{}) ->
{noreply, State}. {noreply, State}.
@ -256,8 +261,10 @@ format(TimeStamp, Data, All) ->
cal_rate(_Now, undefined) -> cal_rate(_Now, undefined) ->
AllSamples = ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP), AllSamples = ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP),
lists:foldl(fun(Key, Acc) -> Acc#{Key => 0} end, #{}, AllSamples); lists:foldl(fun(Key, Acc) -> Acc#{Key => 0} end, #{}, AllSamples);
cal_rate( #emqx_monit{data = NowData, time = NowTime} cal_rate(
, #emqx_monit{data = LastData, time = LastTime}) -> #emqx_monit{data = NowData, time = NowTime},
#emqx_monit{data = LastData, time = LastTime}
) ->
TimeDelta = NowTime - LastTime, TimeDelta = NowTime - LastTime,
Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end, Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end,
Gauge = maps:filter(Filter, NowData), Gauge = maps:filter(Filter, NowData),
@ -340,9 +347,12 @@ clean() ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}], ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
Expired = ets:select(?TAB, ExpiredMS), Expired = ets:select(?TAB, ExpiredMS),
lists:foreach(fun(Data) -> lists:foreach(
true = ets:delete_object(?TAB, Data) fun(Data) ->
end, Expired), true = ets:delete_object(?TAB, Data)
end,
Expired
),
ok. ok.
%% To make it easier to do data aggregation %% To make it easier to do data aggregation
@ -364,8 +374,10 @@ count_map(M1, M2) ->
getstats(Key) -> getstats(Key) ->
%% Stats ets maybe not exist when ekka join. %% Stats ets maybe not exist when ekka join.
try stats(Key) try
catch _: _ -> 0 stats(Key)
catch
_:_ -> 0
end. end.
stats(connections) -> emqx_stats:getstat('connections.count'); stats(connections) -> emqx_stats:getstat('connections.count');