Merge pull request #8036 from ieQu1/fix-get-enabled-trace

fix(emqx_trace): Read the data locally
This commit is contained in:
ieQu1 2022-05-24 15:42:54 +02:00 committed by GitHub
commit 1c147b9b2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 20 deletions

View File

@ -21,9 +21,6 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-boot_mnesia({mnesia, [boot]}).
-export([mnesia/1]).
-export([ -export([
publish/1, publish/1,
subscribe/3, subscribe/3,
@ -55,6 +52,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(TRACE, ?MODULE). -define(TRACE, ?MODULE).
-define(SHARD, ?COMMON_SHARD).
-define(MAX_SIZE, 30). -define(MAX_SIZE, 30).
-define(OWN_KEYS, [level, filters, filter_default, handlers]). -define(OWN_KEYS, [level, filters, filter_default, handlers]).
@ -77,15 +75,6 @@
end_at :: integer() | undefined | '_' end_at :: integer() | undefined | '_'
}). }).
mnesia(boot) ->
ok = mria:create_table(?TRACE, [
{type, set},
{rlog_shard, ?COMMON_SHARD},
{storage, disc_copies},
{record_name, ?TRACE},
{attributes, record_info(fields, ?TRACE)}
]).
publish(#message{topic = <<"$SYS/", _/binary>>}) -> publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ignore; ignore;
publish(#message{from = From, topic = Topic, payload = Payload}) when publish(#message{from = From, topic = Topic, payload = Payload}) when
@ -255,12 +244,19 @@ format(Traces) ->
). ).
init([]) -> init([]) ->
ok = mria:wait_for_tables([?TRACE]),
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
ok = mria:create_table(?TRACE, [
{type, set},
{rlog_shard, ?SHARD},
{storage, disc_copies},
{record_name, ?TRACE},
{attributes, record_info(fields, ?TRACE)}
]),
ok = mria:wait_for_tables([?TRACE]),
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
ok = filelib:ensure_dir(filename:join([trace_dir(), dummy])), ok = filelib:ensure_dir(filename:join([trace_dir(), dummy])),
ok = filelib:ensure_dir(filename:join([zip_dir(), dummy])), ok = filelib:ensure_dir(filename:join([zip_dir(), dummy])),
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}), Traces = get_enabled_trace(),
Traces = get_enable_trace(),
TRef = update_trace(Traces), TRef = update_trace(Traces),
update_trace_handler(), update_trace_handler(),
{ok, #{timer => TRef, monitors => #{}}}. {ok, #{timer => TRef, monitors => #{}}}.
@ -288,7 +284,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor
{noreply, State#{monitors => NewMonitors}} {noreply, State#{monitors => NewMonitors}}
end; end;
handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) -> handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
Traces = get_enable_trace(), Traces = get_enabled_trace(),
NextTRef = update_trace(Traces), NextTRef = update_trace(Traces),
update_trace_handler(), update_trace_handler(),
?tp(update_trace_done, #{}), ?tp(update_trace_done, #{}),
@ -347,10 +343,13 @@ stop_all_trace_handler() ->
fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end, fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
emqx_trace_handler:running() emqx_trace_handler:running()
). ).
get_enable_trace() ->
transaction(fun() -> get_enabled_trace() ->
{atomic, Traces} =
mria:ro_transaction(?SHARD, fun() ->
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
end). end),
Traces.
find_closest_time(Traces, Now) -> find_closest_time(Traces, Now) ->
Sec = Sec =