From 48c143b97a5827ab68c08f21a195e4a8dbcf40eb Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 24 May 2022 14:39:55 +0200 Subject: [PATCH] fix(emqx_trace): Read the data locally --- apps/emqx/src/emqx_trace/emqx_trace.erl | 39 ++++++++++++------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index b02e33f01..162ed4770 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -21,9 +21,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). --boot_mnesia({mnesia, [boot]}). --export([mnesia/1]). - -export([ publish/1, subscribe/3, @@ -55,6 +52,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TRACE, ?MODULE). +-define(SHARD, ?COMMON_SHARD). -define(MAX_SIZE, 30). -define(OWN_KEYS, [level, filters, filter_default, handlers]). @@ -77,15 +75,6 @@ end_at :: integer() | undefined | '_' }). -mnesia(boot) -> - ok = mria:create_table(?TRACE, [ - {type, set}, - {rlog_shard, ?COMMON_SHARD}, - {storage, disc_copies}, - {record_name, ?TRACE}, - {attributes, record_info(fields, ?TRACE)} - ]). - publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when @@ -255,12 +244,19 @@ format(Traces) -> ). init([]) -> - ok = mria:wait_for_tables([?TRACE]), erlang:process_flag(trap_exit, true), + ok = mria:create_table(?TRACE, [ + {type, set}, + {rlog_shard, ?SHARD}, + {storage, disc_copies}, + {record_name, ?TRACE}, + {attributes, record_info(fields, ?TRACE)} + ]), + ok = mria:wait_for_tables([?TRACE]), + {ok, _} = mnesia:subscribe({table, ?TRACE, simple}), ok = filelib:ensure_dir(filename:join([trace_dir(), dummy])), ok = filelib:ensure_dir(filename:join([zip_dir(), dummy])), - {ok, _} = mnesia:subscribe({table, ?TRACE, simple}), - Traces = get_enable_trace(), + Traces = get_enabled_trace(), TRef = update_trace(Traces), update_trace_handler(), {ok, #{timer => TRef, monitors => #{}}}. @@ -288,7 +284,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor {noreply, State#{monitors => NewMonitors}} end; handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) -> - Traces = get_enable_trace(), + Traces = get_enabled_trace(), NextTRef = update_trace(Traces), update_trace_handler(), ?tp(update_trace_done, #{}), @@ -347,10 +343,13 @@ stop_all_trace_handler() -> fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end, emqx_trace_handler:running() ). -get_enable_trace() -> - transaction(fun() -> - mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) - end). + +get_enabled_trace() -> + {atomic, Traces} = + mria:ro_transaction(?SHARD, fun() -> + mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) + end), + Traces. find_closest_time(Traces, Now) -> Sec =