From 1476accd63319a169d7af45a3971ff30ddeba498 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Mon, 15 Nov 2021 15:28:19 +0800 Subject: [PATCH] fix: monitor bad mnesia write & rpc call (#6060) --- .../emqx_dashboard/include/emqx_dashboard.hrl | 2 ++ .../src/emqx_dashboard_collection.erl | 26 ++++++++++++------- .../src/emqx_dashboard_monitor_api.erl | 22 ++++++++++------ 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 712be13a0..395234bbf 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -33,6 +33,8 @@ extra = [] :: term() %% not used so far, fur future extension }). +-define(TAB_COLLECT, emqx_collect). + -define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). -define(DASHBOARD_SHARD, emqx_dashboard_shard). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index ab5767229..dc9c894b6 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -40,10 +40,10 @@ -define(EXPIRE_INTERVAL, 86400000 * 7). mnesia(boot) -> - ok = mria:create_table(emqx_collect, [ + ok = mria:create_table(?TAB_COLLECT, [ {type, set}, {local_content, true}, - {storage, disc_only_copies}, + {storage, disc_copies}, {record_name, mqtt_collect}, {attributes, record_info(fields, mqtt_collect)}]). @@ -87,7 +87,10 @@ handle_call(_Req, _From, State) -> handle_cast(_Req, State) -> {noreply, State}. -handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) -> +handle_info(collect, State = #{ collect := Collect + , count := 1 + , temp_collect := TempCollect + , last_collects := LastCollect}) -> timer(next_interval(), collect), NewLastCollect = flush(collect_all(Collect), LastCollect), TempCollect1 = temp_collect(TempCollect), @@ -107,9 +110,9 @@ handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) -> timer(?CLEAR_INTERVAL, clear_expire_data), T1 = get_local_time(), Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end), - Collects = dets:select(emqx_collect, Spec), + Collects = ets:select(?TAB_COLLECT, Spec), lists:foreach(fun(Collect) -> - dets:delete_object(emqx_collect, Collect) + true = ets:delete_object(?TAB_COLLECT, Collect) end, Collects), {noreply, State, hibernate}; @@ -131,9 +134,9 @@ temp_collect({_, _, Received, Sent}) -> Sent1}. collect_all({Connection, Route, Subscription}) -> - {[collect(connections)| Connection], - [collect(routes)| Route], - [collect(subscriptions)| Subscription]}. + {[collect(connections) | Connection], + [collect(routes) | Route], + [collect(subscriptions) | Subscription]}. collect(connections) -> emqx_stats:getstat('connections.count'); @@ -159,8 +162,11 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> diff(Sent, Sent0), diff(Dropped, Dropped0)}, Ts = get_local_time(), - _ = mria:transaction(mria:local_content_shard(), - fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]), + {atomic, ok} = mria:transaction(mria:local_content_shard(), + fun mnesia:write/3, + [ ?TAB_COLLECT + , #mqtt_collect{timestamp = Ts, collect = Collect} + , write]), {Received, Sent, Dropped}. avg(Items) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index ba2fc6dbe..efbb973da 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -18,6 +18,10 @@ , current_counters/2 ]). +-export([ sampling/1 + , sampling/2 + ]). + -define(COUNTERS, [ connection , route , subscriptions @@ -174,8 +178,10 @@ format_current_metrics(Collects) -> format_current_metrics(Collects, {0,0,0,0}). format_current_metrics([], Acc) -> Acc; -format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) -> - format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). +format_current_metrics([{Received, Sent, Sub, Conn} | Collects], + {Received1, Sent1, Sub1, Conn1}) -> + format_current_metrics(Collects, + {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). %%%============================================================================================== @@ -260,19 +266,19 @@ key_replace([Term | List], All, Comparison, Default) -> end. sampling(Node) when Node =:= node() -> - Time = emqx_dashboard_collection:get_local_time() - 7200000, - All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), - format(lists:sort(All)); + format(lists:sort(select_data())); sampling(Node) -> rpc:call(Node, ?MODULE, sampling, [Node]). sampling(Node, Counter) when Node =:= node() -> - Time = emqx_dashboard_collection:get_local_time() - 7200000, - All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), - format_single(lists:sort(All), Counter); + format_single(lists:sort(select_data()), Counter); sampling(Node, Counter) -> rpc:call(Node, ?MODULE, sampling, [Node, Counter]). +select_data() -> + Time = emqx_dashboard_collection:get_local_time() - 7200000, + ets:select(?TAB_COLLECT, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]). + format(Collects) -> format(Collects, {[],[],[],[],[],[]}). format([], {Connection, Route, Subscription, Received, Sent, Dropped}) ->