feat(trace): replace logger_formatter by emqx_trace_formatter
This commit is contained in:
parent
d781dc73a5
commit
668180388c
|
@ -69,6 +69,8 @@
|
||||||
ok
|
ok
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
-define(TRACE(Action, Meta, Msg), emqx_trace:log(Action, Meta, Msg)).
|
||||||
|
|
||||||
%% print to 'user' group leader
|
%% print to 'user' group leader
|
||||||
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
|
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
|
||||||
-define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)).
|
-define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)).
|
||||||
|
|
|
@ -205,8 +205,7 @@ publish(Msg) when is_record(Msg, message) ->
|
||||||
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
||||||
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
||||||
#message{headers = #{allow_publish := false}} ->
|
#message{headers = #{allow_publish := false}} ->
|
||||||
?SLOG(debug, #{msg => "message_not_published",
|
?TRACE("NotAllow", #{payload => emqx_message:to_log_map(Msg)}, "message_not_published"),
|
||||||
payload => emqx_message:to_log_map(Msg)}),
|
|
||||||
[];
|
[];
|
||||||
Msg1 = #message{topic = Topic} ->
|
Msg1 = #message{topic = Topic} ->
|
||||||
emqx_persistent_session:persist_message(Msg1),
|
emqx_persistent_session:persist_message(Msg1),
|
||||||
|
|
|
@ -292,7 +292,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
|
||||||
fun check_banned/2
|
fun check_banned/2
|
||||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||||
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||||
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
?TRACE("RECV", #{}, Packet),
|
||||||
NChannel1 = NChannel#channel{
|
NChannel1 = NChannel#channel{
|
||||||
will_msg = emqx_packet:will_msg(NConnPkt),
|
will_msg = emqx_packet:will_msg(NConnPkt),
|
||||||
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
||||||
|
|
|
@ -449,14 +449,12 @@ handle_msg({'$gen_cast', Req}, State) ->
|
||||||
{ok, NewState};
|
{ok, NewState};
|
||||||
|
|
||||||
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||||
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => Inet}),
|
|
||||||
Oct = iolist_size(Data),
|
Oct = iolist_size(Data),
|
||||||
inc_counter(incoming_bytes, Oct),
|
inc_counter(incoming_bytes, Oct),
|
||||||
ok = emqx_metrics:inc('bytes.received', Oct),
|
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||||
when_bytes_in(Oct, Data, State);
|
when_bytes_in(Oct, Data, State);
|
||||||
|
|
||||||
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
|
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
|
||||||
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => quic}),
|
|
||||||
Oct = iolist_size(Data),
|
Oct = iolist_size(Data),
|
||||||
inc_counter(incoming_bytes, Oct),
|
inc_counter(incoming_bytes, Oct),
|
||||||
ok = emqx_metrics:inc('bytes.received', Oct),
|
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||||
|
@ -528,7 +526,7 @@ handle_msg({connack, ConnAck}, State) ->
|
||||||
handle_outgoing(ConnAck, State);
|
handle_outgoing(ConnAck, State);
|
||||||
|
|
||||||
handle_msg({close, Reason}, State) ->
|
handle_msg({close, Reason}, State) ->
|
||||||
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
|
?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"),
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
||||||
|
@ -566,7 +564,8 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
|
||||||
Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
|
Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
|
||||||
emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
||||||
emqx_channel:terminate(Reason, Channel1),
|
emqx_channel:terminate(Reason, Channel1),
|
||||||
close_socket_ok(State)
|
close_socket_ok(State),
|
||||||
|
?TRACE("TERMINATE", #{reason => Reason}, "terminated")
|
||||||
catch
|
catch
|
||||||
E : C : S ->
|
E : C : S ->
|
||||||
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
|
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
|
||||||
|
@ -716,7 +715,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
|
|
||||||
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
||||||
ok = inc_incoming_stats(Packet),
|
ok = inc_incoming_stats(Packet),
|
||||||
?SLOG(debug, #{msg => "RECV_packet", packet => emqx_packet:format(Packet)}),
|
?TRACE("RECV", #{}, Packet),
|
||||||
with_channel(handle_in, [Packet], State);
|
with_channel(handle_in, [Packet], State);
|
||||||
|
|
||||||
handle_incoming(FrameError, State) ->
|
handle_incoming(FrameError, State) ->
|
||||||
|
@ -760,10 +759,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped'),
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
<<>>;
|
<<>>;
|
||||||
Data -> ?SLOG(debug, #{
|
Data ->
|
||||||
msg => "SEND_packet",
|
?TRACE("SEND", #{}, Packet),
|
||||||
packet => emqx_packet:format(Packet)
|
|
||||||
}),
|
|
||||||
ok = inc_outgoing_stats(Packet),
|
ok = inc_outgoing_stats(Packet),
|
||||||
Data
|
Data
|
||||||
catch
|
catch
|
||||||
|
|
|
@ -453,7 +453,7 @@ format_variable(undefined, _) ->
|
||||||
format_variable(Variable, undefined) ->
|
format_variable(Variable, undefined) ->
|
||||||
format_variable(Variable);
|
format_variable(Variable);
|
||||||
format_variable(Variable, Payload) ->
|
format_variable(Variable, Payload) ->
|
||||||
io_lib:format("~ts, Payload=~0p", [format_variable(Variable), Payload]).
|
io_lib:format("~ts, Payload=~ts", [format_variable(Variable), Payload]).
|
||||||
|
|
||||||
format_variable(#mqtt_packet_connect{
|
format_variable(#mqtt_packet_connect{
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
|
@ -520,4 +520,3 @@ format_password(_Password) -> '******'.
|
||||||
i(true) -> 1;
|
i(true) -> 1;
|
||||||
i(false) -> 0;
|
i(false) -> 0;
|
||||||
i(I) when is_integer(I) -> I.
|
i(I) when is_integer(I) -> I.
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
-export([ publish/1
|
-export([ publish/1
|
||||||
, subscribe/3
|
, subscribe/3
|
||||||
, unsubscribe/2
|
, unsubscribe/2
|
||||||
|
, log/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
|
@ -36,6 +37,7 @@
|
||||||
, delete/1
|
, delete/1
|
||||||
, clear/0
|
, clear/0
|
||||||
, update/2
|
, update/2
|
||||||
|
, check/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ format/1
|
-export([ format/1
|
||||||
|
@ -50,6 +52,8 @@
|
||||||
|
|
||||||
-define(TRACE, ?MODULE).
|
-define(TRACE, ?MODULE).
|
||||||
-define(MAX_SIZE, 30).
|
-define(MAX_SIZE, 30).
|
||||||
|
-define(TRACE_FILTER, emqx_trace_filter).
|
||||||
|
-define(OWN_KEYS,[level,filters,filter_default,handlers]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([ log_file/2
|
-export([ log_file/2
|
||||||
|
@ -80,27 +84,53 @@ mnesia(boot) ->
|
||||||
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
|
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
|
||||||
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
||||||
is_binary(From); is_atom(From) ->
|
is_binary(From); is_atom(From) ->
|
||||||
emqx_logger:info(
|
?TRACE("PUBLISH", #{topic => Topic}, {publish, Payload}).
|
||||||
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
|
||||||
"PUBLISH to ~s: ~0p",
|
|
||||||
[Topic, Payload]
|
|
||||||
).
|
|
||||||
|
|
||||||
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
|
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
|
||||||
subscribe(Topic, SubId, SubOpts) ->
|
subscribe(Topic, SubId, SubOpts) ->
|
||||||
emqx_logger:info(
|
?TRACE("SUBSCRIBE", #{topic => Topic}, {subscribe, SubId, SubOpts}).
|
||||||
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
|
||||||
"~ts SUBSCRIBE ~ts: Options: ~0p",
|
|
||||||
[SubId, Topic, SubOpts]
|
|
||||||
).
|
|
||||||
|
|
||||||
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
|
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
|
||||||
unsubscribe(Topic, SubOpts) ->
|
unsubscribe(Topic, SubOpts) ->
|
||||||
emqx_logger:info(
|
?TRACE("UNSUBSCRIBE", #{topic => Topic}, {unsubscribe, SubOpts}).
|
||||||
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
|
||||||
"~ts UNSUBSCRIBE ~ts: Options: ~0p",
|
log(Action, Meta0, Msg) ->
|
||||||
[maps:get(subid, SubOpts, ""), Topic, SubOpts]
|
case persistent_term:get(?TRACE_FILTER, undefined) of
|
||||||
).
|
undefined -> ok;
|
||||||
|
List ->
|
||||||
|
Meta = maps:merge(logger:get_process_metadata(), Meta0),
|
||||||
|
Log = #{level => trace, action => Action, meta => Meta, msg => Msg},
|
||||||
|
log_filter(List, Log)
|
||||||
|
end.
|
||||||
|
|
||||||
|
log_filter([], _Log) -> ok;
|
||||||
|
log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
|
||||||
|
case FilterFun(Log0, {Filter, Name}) of
|
||||||
|
stop -> stop;
|
||||||
|
ignore -> ignore;
|
||||||
|
Log ->
|
||||||
|
case logger_config:get(ets:whereis(logger), Id) of
|
||||||
|
{ok, #{module := Module} = HandlerConfig0} ->
|
||||||
|
HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
|
||||||
|
try Module:log(Log, HandlerConfig)
|
||||||
|
catch C:R:S ->
|
||||||
|
case logger:remove_handler(Id) of
|
||||||
|
ok ->
|
||||||
|
logger:internal_log(error, {removed_failing_handler, Id});
|
||||||
|
{error,{not_found,_}} ->
|
||||||
|
%% Probably already removed by other client
|
||||||
|
%% Don't report again
|
||||||
|
ok;
|
||||||
|
{error,Reason} ->
|
||||||
|
logger:internal_log(error,
|
||||||
|
{removed_handler_failed, Id, Reason, C, R, S})
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
{error, {not_found, Id}} -> ok;
|
||||||
|
{error, Reason} -> logger:internal_log(error, {find_handle_id_failed, Id, Reason})
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
log_filter(Rest, Log0).
|
||||||
|
|
||||||
-spec(start_link() -> emqx_types:startlink_ret()).
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
|
@ -161,6 +191,9 @@ update(Name, Enable) ->
|
||||||
end,
|
end,
|
||||||
transaction(Tran).
|
transaction(Tran).
|
||||||
|
|
||||||
|
check() ->
|
||||||
|
erlang:send(?MODULE, {mnesia_table_event, check}).
|
||||||
|
|
||||||
-spec get_trace_filename(Name :: binary()) ->
|
-spec get_trace_filename(Name :: binary()) ->
|
||||||
{ok, FileName :: string()} | {error, not_found}.
|
{ok, FileName :: string()} | {error, not_found}.
|
||||||
get_trace_filename(Name) ->
|
get_trace_filename(Name) ->
|
||||||
|
@ -196,14 +229,13 @@ format(Traces) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ok = mria:wait_for_tables([?TRACE]),
|
ok = mria:wait_for_tables([?TRACE]),
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
OriginLogLevel = emqx_logger:get_primary_log_level(),
|
|
||||||
ok = filelib:ensure_dir(trace_dir()),
|
ok = filelib:ensure_dir(trace_dir()),
|
||||||
ok = filelib:ensure_dir(zip_dir()),
|
ok = filelib:ensure_dir(zip_dir()),
|
||||||
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
||||||
Traces = get_enable_trace(),
|
Traces = get_enable_trace(),
|
||||||
ok = update_log_primary_level(Traces, OriginLogLevel),
|
|
||||||
TRef = update_trace(Traces),
|
TRef = update_trace(Traces),
|
||||||
{ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
|
update_trace_handler(),
|
||||||
|
{ok, #{timer => TRef, monitors => #{}}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{unexpected_call => Req}),
|
?SLOG(error, #{unexpected_call => Req}),
|
||||||
|
@ -224,10 +256,10 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor
|
||||||
{noreply, State#{monitors => NewMonitors}}
|
{noreply, State#{monitors => NewMonitors}}
|
||||||
end;
|
end;
|
||||||
handle_info({timeout, TRef, update_trace},
|
handle_info({timeout, TRef, update_trace},
|
||||||
#{timer := TRef, primary_log_level := OriginLogLevel} = State) ->
|
#{timer := TRef} = State) ->
|
||||||
Traces = get_enable_trace(),
|
Traces = get_enable_trace(),
|
||||||
ok = update_log_primary_level(Traces, OriginLogLevel),
|
|
||||||
NextTRef = update_trace(Traces),
|
NextTRef = update_trace(Traces),
|
||||||
|
update_trace_handler(),
|
||||||
{noreply, State#{timer => NextTRef}};
|
{noreply, State#{timer => NextTRef}};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
|
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
|
||||||
|
@ -238,11 +270,11 @@ handle_info(Info, State) ->
|
||||||
?SLOG(error, #{unexpected_info => Info}),
|
?SLOG(error, #{unexpected_info => Info}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) ->
|
terminate(_Reason, #{timer := TRef}) ->
|
||||||
ok = set_log_primary_level(OriginLogLevel),
|
|
||||||
_ = mnesia:unsubscribe({table, ?TRACE, simple}),
|
_ = mnesia:unsubscribe({table, ?TRACE, simple}),
|
||||||
emqx_misc:cancel_timer(TRef),
|
emqx_misc:cancel_timer(TRef),
|
||||||
stop_all_trace_handler(),
|
stop_all_trace_handler(),
|
||||||
|
update_trace_handler(),
|
||||||
_ = file:del_dir_r(zip_dir()),
|
_ = file:del_dir_r(zip_dir()),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -270,7 +302,7 @@ update_trace(Traces) ->
|
||||||
disable_finished(Finished),
|
disable_finished(Finished),
|
||||||
Started = emqx_trace_handler:running(),
|
Started = emqx_trace_handler:running(),
|
||||||
{NeedRunning, AllStarted} = start_trace(Running, Started),
|
{NeedRunning, AllStarted} = start_trace(Running, Started),
|
||||||
NeedStop = AllStarted -- NeedRunning,
|
NeedStop = filter_cli_handler(AllStarted) -- NeedRunning,
|
||||||
ok = stop_trace(NeedStop, Started),
|
ok = stop_trace(NeedStop, Started),
|
||||||
clean_stale_trace_files(),
|
clean_stale_trace_files(),
|
||||||
NextTime = find_closest_time(Traces, Now),
|
NextTime = find_closest_time(Traces, Now),
|
||||||
|
@ -481,11 +513,20 @@ transaction(Tran) ->
|
||||||
{aborted, Reason} -> {error, Reason}
|
{aborted, Reason} -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel);
|
update_trace_handler() ->
|
||||||
update_log_primary_level(_, _) -> set_log_primary_level(debug).
|
case emqx_trace_handler:running() of
|
||||||
|
[] -> persistent_term:erase(?TRACE_FILTER);
|
||||||
set_log_primary_level(NewLevel) ->
|
Running ->
|
||||||
case NewLevel =/= emqx_logger:get_primary_log_level() of
|
List = lists:map(fun(#{id := Id, filter_fun := FilterFun,
|
||||||
true -> emqx_logger:set_primary_log_level(NewLevel);
|
filter := Filter, name := Name}) ->
|
||||||
false -> ok
|
{Id, FilterFun, Filter, Name} end, Running),
|
||||||
|
case List =/= persistent_term:get(?TRACE_FILTER, undefined) of
|
||||||
|
true -> persistent_term:put(?TRACE_FILTER, List);
|
||||||
|
false -> ok
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
filter_cli_handler(Names) ->
|
||||||
|
lists:filter(fun(Name) ->
|
||||||
|
notmatch =:= re:run(Name, "^CLI-+.", [])
|
||||||
|
end, Names).
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_trace_formatter).
|
||||||
|
|
||||||
|
-export([format/2]).
|
||||||
|
|
||||||
|
%%%-----------------------------------------------------------------
|
||||||
|
%%% API
|
||||||
|
-spec format(LogEvent, Config) -> unicode:chardata() when
|
||||||
|
LogEvent :: logger:log_event(),
|
||||||
|
Config :: logger:config().
|
||||||
|
format(#{level := trace, msg := Msg, meta := Meta, action := Action}, _Config) ->
|
||||||
|
Time = calendar:system_time_to_rfc3339(erlang:system_time(second)),
|
||||||
|
ClientId = maps:get(clientid, Meta, ""),
|
||||||
|
Peername = maps:get(peername, Meta, ""),
|
||||||
|
MsgBin = format_msg(Msg),
|
||||||
|
MetaBin = format_map(maps:without([clientid, peername], Meta)),
|
||||||
|
[Time, " [", Action, "] ", ClientId, "@", Peername, " ", MsgBin, " ( ",
|
||||||
|
MetaBin, ")\n"];
|
||||||
|
|
||||||
|
format(Event, Config) ->
|
||||||
|
emqx_logger_textfmt:format(Event, Config).
|
||||||
|
|
||||||
|
format_msg(Bin)when is_binary(Bin) -> Bin;
|
||||||
|
format_msg(List) when is_list(List) -> List;
|
||||||
|
format_msg({publish, Payload}) ->
|
||||||
|
io_lib:format("Publish Payload:(~ts) TO ", [Payload]);
|
||||||
|
format_msg({subscribe, SubId, SubOpts}) ->
|
||||||
|
[io_lib:format("SUBSCRIBE ~ts, Opts( ", [SubId]),
|
||||||
|
format_map(SubOpts), ")"];
|
||||||
|
format_msg({unsubscribe, SubOpts}) ->
|
||||||
|
[io_lib:format("UNSUBSCRIBE ~ts, Opts( ", [maps:get(subid, SubOpts, "undefined")]),
|
||||||
|
format_map(maps:without([subid], SubOpts)), ")"];
|
||||||
|
format_msg(Packet) ->
|
||||||
|
emqx_packet:format(Packet).
|
||||||
|
|
||||||
|
format_map(Map) ->
|
||||||
|
maps:fold(fun(K, V, Acc) ->
|
||||||
|
[to_iolist(K), ":", to_iolist(V), " "|Acc]
|
||||||
|
end, [], Map).
|
||||||
|
|
||||||
|
to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||||
|
to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);
|
||||||
|
to_iolist(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 2}]);
|
||||||
|
to_iolist(Bin)when is_binary(Bin) -> unicode:characters_to_binary(Bin);
|
||||||
|
to_iolist(List) when is_list(List) -> unicode:characters_to_list(List);
|
||||||
|
to_iolist(Term) -> io_lib:format("~0p", [Term]).
|
|
@ -25,6 +25,7 @@
|
||||||
-export([ running/0
|
-export([ running/0
|
||||||
, install/3
|
, install/3
|
||||||
, install/4
|
, install/4
|
||||||
|
, install/5
|
||||||
, uninstall/1
|
, uninstall/1
|
||||||
, uninstall/2
|
, uninstall/2
|
||||||
]).
|
]).
|
||||||
|
@ -77,22 +78,18 @@ install(Type, Filter, Level, LogFile) ->
|
||||||
-spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}.
|
-spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}.
|
||||||
install(Who, all, LogFile) ->
|
install(Who, all, LogFile) ->
|
||||||
install(Who, debug, LogFile);
|
install(Who, debug, LogFile);
|
||||||
install(Who, Level, LogFile) ->
|
install(Who = #{name := Name, type := Type}, Level, LogFile) ->
|
||||||
PrimaryLevel = emqx_logger:get_primary_log_level(),
|
HandlerId = handler_id(Name, Type),
|
||||||
try logger:compare_levels(Level, PrimaryLevel) of
|
Config = #{
|
||||||
lt ->
|
level => Level,
|
||||||
{error,
|
formatter => formatter(Who),
|
||||||
io_lib:format(
|
filter_default => stop,
|
||||||
"Cannot trace at a log level (~s) "
|
filters => filters(Who),
|
||||||
"lower than the primary log level (~s)",
|
config => ?CONFIG(LogFile)
|
||||||
[Level, PrimaryLevel]
|
},
|
||||||
)};
|
Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
|
||||||
_GtOrEq ->
|
show_prompts(Res, Who, "start_trace"),
|
||||||
install_handler(Who, Level, LogFile)
|
Res.
|
||||||
catch
|
|
||||||
error:badarg ->
|
|
||||||
{error, {invalid_log_level, Level}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec uninstall(Type :: clientid | topic | ip_address,
|
-spec uninstall(Type :: clientid | topic | ip_address,
|
||||||
Name :: binary() | list()) -> ok | {error, term()}.
|
Name :: binary() | list()) -> ok | {error, term()}.
|
||||||
|
@ -121,38 +118,25 @@ uninstall(HandlerId) ->
|
||||||
running() ->
|
running() ->
|
||||||
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
||||||
|
|
||||||
-spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
|
-spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
|
||||||
filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log;
|
filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log;
|
||||||
filter_clientid(_Log, _ExpectId) -> ignore.
|
filter_clientid(_Log, _ExpectId) -> stop.
|
||||||
|
|
||||||
-spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
|
-spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
|
||||||
filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
|
filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
|
||||||
case emqx_topic:match(Topic, TopicFilter) of
|
case emqx_topic:match(Topic, TopicFilter) of
|
||||||
true -> Log;
|
true -> Log;
|
||||||
false -> ignore
|
false -> stop
|
||||||
end;
|
end;
|
||||||
filter_topic(_Log, _ExpectId) -> ignore.
|
filter_topic(_Log, _ExpectId) -> stop.
|
||||||
|
|
||||||
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
|
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
|
||||||
filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) ->
|
filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) ->
|
||||||
case lists:prefix(IP, Peername) of
|
case lists:prefix(IP, Peername) of
|
||||||
true -> Log;
|
true -> Log;
|
||||||
false -> ignore
|
false -> stop
|
||||||
end;
|
end;
|
||||||
filter_ip_address(_Log, _ExpectId) -> ignore.
|
filter_ip_address(_Log, _ExpectId) -> stop.
|
||||||
|
|
||||||
install_handler(Who = #{name := Name, type := Type}, Level, LogFile) ->
|
|
||||||
HandlerId = handler_id(Name, Type),
|
|
||||||
Config = #{
|
|
||||||
level => Level,
|
|
||||||
formatter => formatter(Who),
|
|
||||||
filter_default => stop,
|
|
||||||
filters => filters(Who),
|
|
||||||
config => ?CONFIG(LogFile)
|
|
||||||
},
|
|
||||||
Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
|
|
||||||
show_prompts(Res, Who, "Start trace"),
|
|
||||||
Res.
|
|
||||||
|
|
||||||
filters(#{type := clientid, filter := Filter, name := Name}) ->
|
filters(#{type := clientid, filter := Filter, name := Name}) ->
|
||||||
[{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}];
|
[{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}];
|
||||||
|
@ -162,7 +146,7 @@ filters(#{type := ip_address, filter := Filter, name := Name}) ->
|
||||||
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
|
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
|
||||||
|
|
||||||
formatter(#{type := Type}) ->
|
formatter(#{type := Type}) ->
|
||||||
{logger_formatter,
|
{emqx_trace_formatter,
|
||||||
#{
|
#{
|
||||||
template => template(Type),
|
template => template(Type),
|
||||||
single_line => false,
|
single_line => false,
|
||||||
|
@ -176,7 +160,6 @@ formatter(#{type := Type}) ->
|
||||||
%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read.
|
%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read.
|
||||||
template(clientid) ->
|
template(clientid) ->
|
||||||
[time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"];
|
[time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"];
|
||||||
%% TODO better format when clientid is utf8.
|
|
||||||
template(_) ->
|
template(_) ->
|
||||||
[time, " [", level, "] ",
|
[time, " [", level, "] ",
|
||||||
{clientid,
|
{clientid,
|
||||||
|
@ -189,11 +172,11 @@ template(_) ->
|
||||||
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
|
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
|
||||||
Init = #{id => Id, level => Level, dst => Dst},
|
Init = #{id => Id, level => Level, dst => Dst},
|
||||||
case Filters of
|
case Filters of
|
||||||
[{Type, {_FilterFun, {Filter, Name}}}] when
|
[{Type, {FilterFun, {Filter, Name}}}] when
|
||||||
Type =:= topic orelse
|
Type =:= topic orelse
|
||||||
Type =:= clientid orelse
|
Type =:= clientid orelse
|
||||||
Type =:= ip_address ->
|
Type =:= ip_address ->
|
||||||
[Init#{type => Type, filter => Filter, name => Name} | Acc];
|
[Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc];
|
||||||
_ ->
|
_ ->
|
||||||
Acc
|
Acc
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -347,7 +347,6 @@ websocket_handle({binary, Data}, State) when is_list(Data) ->
|
||||||
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
||||||
|
|
||||||
websocket_handle({binary, Data}, State) ->
|
websocket_handle({binary, Data}, State) ->
|
||||||
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => websocket}),
|
|
||||||
State2 = ensure_stats_timer(State),
|
State2 = ensure_stats_timer(State),
|
||||||
{Packets, State3} = parse_incoming(Data, [], State2),
|
{Packets, State3} = parse_incoming(Data, [], State2),
|
||||||
LenMsg = erlang:length(Packets),
|
LenMsg = erlang:length(Packets),
|
||||||
|
@ -432,11 +431,11 @@ websocket_info(Info, State) ->
|
||||||
websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) ->
|
websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) ->
|
||||||
websocket_close(ReasonCode, State);
|
websocket_close(ReasonCode, State);
|
||||||
websocket_close(Reason, State) ->
|
websocket_close(Reason, State) ->
|
||||||
?SLOG(debug, #{msg => "websocket_closed", reason => Reason}),
|
?TRACE("CLOSED", #{transport => websocket, reason => Reason}, "websocket_closed"),
|
||||||
handle_info({sock_closed, Reason}, State).
|
handle_info({sock_closed, Reason}, State).
|
||||||
|
|
||||||
terminate(Reason, _Req, #state{channel = Channel}) ->
|
terminate(Reason, _Req, #state{channel = Channel}) ->
|
||||||
?SLOG(debug, #{msg => "terminated", reason => Reason}),
|
?TRACE("TERMINATE", #{transport => websocket, reason => Reason}, "webscoket_terminated"),
|
||||||
emqx_channel:terminate(Reason, Channel);
|
emqx_channel:terminate(Reason, Channel);
|
||||||
|
|
||||||
terminate(_Reason, _Req, _UnExpectedState) ->
|
terminate(_Reason, _Req, _UnExpectedState) ->
|
||||||
|
@ -480,7 +479,7 @@ handle_info({connack, ConnAck}, State) ->
|
||||||
return(enqueue(ConnAck, State));
|
return(enqueue(ConnAck, State));
|
||||||
|
|
||||||
handle_info({close, Reason}, State) ->
|
handle_info({close, Reason}, State) ->
|
||||||
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
|
?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"),
|
||||||
return(enqueue({close, Reason}, State));
|
return(enqueue({close, Reason}, State));
|
||||||
|
|
||||||
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
||||||
|
@ -663,7 +662,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
|
|
||||||
handle_incoming(Packet, State = #state{listener = {Type, Listener}})
|
handle_incoming(Packet, State = #state{listener = {Type, Listener}})
|
||||||
when is_record(Packet, mqtt_packet) ->
|
when is_record(Packet, mqtt_packet) ->
|
||||||
?SLOG(debug, #{msg => "RECV", packet => emqx_packet:format(Packet)}),
|
?TRACE("RECV", #{transport => websocket}, Packet),
|
||||||
ok = inc_incoming_stats(Packet),
|
ok = inc_incoming_stats(Packet),
|
||||||
NState = case emqx_pd:get_counter(incoming_pubs) >
|
NState = case emqx_pd:get_counter(incoming_pubs) >
|
||||||
get_active_n(Type, Listener) of
|
get_active_n(Type, Listener) of
|
||||||
|
@ -727,7 +726,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped'),
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
<<>>;
|
<<>>;
|
||||||
Data -> ?SLOG(debug, #{msg => "SEND", packet => Packet}),
|
Data -> ?TRACE("SEND", #{transport => websocket}, Packet),
|
||||||
ok = inc_outgoing_stats(Packet),
|
ok = inc_outgoing_stats(Packet),
|
||||||
Data
|
Data
|
||||||
catch
|
catch
|
||||||
|
|
|
@ -32,36 +32,30 @@ all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
emqx_common_test_helpers:start_apps([]),
|
emqx_common_test_helpers:start_apps([emqx_modules]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:stop_apps([]).
|
emqx_common_test_helpers:stop_apps([emqx_modules]).
|
||||||
|
|
||||||
init_per_testcase(t_trace_clientid, Config) ->
|
init_per_testcase(t_trace_clientid, Config) ->
|
||||||
Config;
|
Config;
|
||||||
init_per_testcase(_Case, Config) ->
|
init_per_testcase(_Case, Config) ->
|
||||||
ok = emqx_logger:set_log_level(debug),
|
|
||||||
_ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()],
|
_ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()],
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_Case, _Config) ->
|
end_per_testcase(_Case, _Config) ->
|
||||||
ok = emqx_logger:set_log_level(warning),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_trace_clientid(_Config) ->
|
t_trace_clientid(_Config) ->
|
||||||
%% Start tracing
|
%% Start tracing
|
||||||
emqx_logger:set_log_level(error),
|
|
||||||
{error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"),
|
|
||||||
emqx_logger:set_log_level(debug),
|
|
||||||
%% add list clientid
|
%% add list clientid
|
||||||
ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"),
|
ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"),
|
||||||
ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"),
|
ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"),
|
||||||
ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"),
|
ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"),
|
||||||
{error, {invalid_log_level, bad_level}} =
|
|
||||||
emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
|
|
||||||
{error, {handler_not_added, {file_error, ".", eisdir}}} =
|
{error, {handler_not_added, {file_error, ".", eisdir}}} =
|
||||||
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
|
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
|
||||||
|
emqx_trace:check(),
|
||||||
ok = filesync(<<"client">>, clientid),
|
ok = filesync(<<"client">>, clientid),
|
||||||
ok = filesync(<<"client2">>, clientid),
|
ok = filesync(<<"client2">>, clientid),
|
||||||
ok = filesync(<<"client3">>, clientid),
|
ok = filesync(<<"client3">>, clientid),
|
||||||
|
@ -106,10 +100,9 @@ t_trace_clientid(_Config) ->
|
||||||
?assertEqual([], emqx_trace_handler:running()).
|
?assertEqual([], emqx_trace_handler:running()).
|
||||||
|
|
||||||
t_trace_clientid_utf8(_) ->
|
t_trace_clientid_utf8(_) ->
|
||||||
emqx_logger:set_log_level(debug),
|
|
||||||
|
|
||||||
Utf8Id = <<"client 漢字編碼"/utf8>>,
|
Utf8Id = <<"client 漢字編碼"/utf8>>,
|
||||||
ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"),
|
ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"),
|
||||||
|
emqx_trace:check(),
|
||||||
{ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
|
{ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
|
||||||
emqtt:connect(T),
|
emqtt:connect(T),
|
||||||
[begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)],
|
[begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)],
|
||||||
|
@ -126,9 +119,9 @@ t_trace_topic(_Config) ->
|
||||||
emqtt:connect(T),
|
emqtt:connect(T),
|
||||||
|
|
||||||
%% Start tracing
|
%% Start tracing
|
||||||
emqx_logger:set_log_level(debug),
|
|
||||||
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
|
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
|
||||||
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
|
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
|
||||||
|
emqx_trace:check(),
|
||||||
ok = filesync(<<"x/#">>, topic),
|
ok = filesync(<<"x/#">>, topic),
|
||||||
ok = filesync(<<"y/#">>, topic),
|
ok = filesync(<<"y/#">>, topic),
|
||||||
|
|
||||||
|
@ -174,6 +167,7 @@ t_trace_ip_address(_Config) ->
|
||||||
%% Start tracing
|
%% Start tracing
|
||||||
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
|
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
|
||||||
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
||||||
|
emqx_trace:check(),
|
||||||
ok = filesync(<<"127.0.0.1">>, ip_address),
|
ok = filesync(<<"127.0.0.1">>, ip_address),
|
||||||
ok = filesync(<<"192.168.1.1">>, ip_address),
|
ok = filesync(<<"192.168.1.1">>, ip_address),
|
||||||
|
|
||||||
|
|
|
@ -199,9 +199,8 @@ on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
||||||
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
|
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
|
||||||
on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
|
on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
|
||||||
#{pool_name := PoolName, base_path := BasePath} = State) ->
|
#{pool_name := PoolName, base_path := BasePath} = State) ->
|
||||||
?SLOG(debug, #{msg => "http connector received request",
|
?TRACE("QUERY", #{request => Request, connector => InstId, state => State},
|
||||||
request => Request, connector => InstId,
|
"http connector received request"),
|
||||||
state => State}),
|
|
||||||
NRequest = update_path(BasePath, Request),
|
NRequest = update_path(BasePath, Request),
|
||||||
case Result = ehttpc:request(case KeyOrNum of
|
case Result = ehttpc:request(case KeyOrNum of
|
||||||
undefined -> PoolName;
|
undefined -> PoolName;
|
||||||
|
|
|
@ -87,9 +87,8 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
||||||
|
|
||||||
on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
|
on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||||
Request = {Base, Filter, Attributes},
|
Request = {Base, Filter, Attributes},
|
||||||
?SLOG(debug, #{msg => "ldap connector received request",
|
?TRACE("QUERY", #{request => Request, connector => InstId, state => State},
|
||||||
request => Request, connector => InstId,
|
"ldap connector received request"),
|
||||||
state => State}),
|
|
||||||
case Result = ecpool:pick_and_do(
|
case Result = ecpool:pick_and_do(
|
||||||
PoolName,
|
PoolName,
|
||||||
{?MODULE, search, [Base, Filter, Attributes]},
|
{?MODULE, search, [Base, Filter, Attributes]},
|
||||||
|
|
|
@ -137,9 +137,8 @@ on_query(InstId,
|
||||||
AfterQuery,
|
AfterQuery,
|
||||||
#{poolname := PoolName} = State) ->
|
#{poolname := PoolName} = State) ->
|
||||||
Request = {Action, Collection, Selector, Docs},
|
Request = {Action, Collection, Selector, Docs},
|
||||||
?SLOG(debug, #{msg => "mongodb connector received request",
|
?TRACE("QUERY", #{request => Request, connector => InstId, state => State},
|
||||||
request => Request, connector => InstId,
|
"mongodb connector received request"),
|
||||||
state => State}),
|
|
||||||
case ecpool:pick_and_do(PoolName,
|
case ecpool:pick_and_do(PoolName,
|
||||||
{?MODULE, mongo_query, [Action, Collection, Selector, Docs]},
|
{?MODULE, mongo_query, [Action, Collection, Selector, Docs]},
|
||||||
no_handover) of
|
no_handover) of
|
||||||
|
|
|
@ -150,8 +150,8 @@ on_stop(_InstId, #{name := InstanceId}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
|
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
|
||||||
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
|
?TRACE("QUERY", #{message => Msg, connector => InstanceId},
|
||||||
connector => InstanceId}),
|
"send msg to remote node"),
|
||||||
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
|
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
|
||||||
emqx_resource:query_success(AfterQuery).
|
emqx_resource:query_success(AfterQuery).
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,8 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||||
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
|
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||||
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
|
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
|
||||||
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
|
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||||
?SLOG(debug, #{msg => "mysql connector received sql query",
|
?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State},
|
||||||
connector => InstId, sql => SQL, state => State}),
|
"mysql connector received sql query"),
|
||||||
case Result = ecpool:pick_and_do(
|
case Result = ecpool:pick_and_do(
|
||||||
PoolName,
|
PoolName,
|
||||||
{mysql, query, [SQL, Params, Timeout]},
|
{mysql, query, [SQL, Params, Timeout]},
|
||||||
|
|
|
@ -83,8 +83,8 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
||||||
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||||
on_query(InstId, {sql, SQL, []}, AfterQuery, State);
|
on_query(InstId, {sql, SQL, []}, AfterQuery, State);
|
||||||
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
|
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||||
?SLOG(debug, #{msg => "postgresql connector received sql query",
|
?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State},
|
||||||
connector => InstId, sql => SQL, state => State}),
|
"postgresql connector received sql query"),
|
||||||
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of
|
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
|
|
|
@ -125,8 +125,8 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
||||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||||
|
|
||||||
on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
|
on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
|
||||||
?SLOG(debug, #{msg => "redis connector received cmd query",
|
?TRACE("QUERY", #{connector => InstId, sql => Command, state => State},
|
||||||
connector => InstId, sql => Command, state => State}),
|
"redis connector received cmd query"),
|
||||||
Result = case Type of
|
Result = case Type of
|
||||||
cluster -> eredis_cluster:q(PoolName, Command);
|
cluster -> eredis_cluster:q(PoolName, Command);
|
||||||
_ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover)
|
_ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover)
|
||||||
|
|
|
@ -142,11 +142,11 @@ fields(trace) ->
|
||||||
#{desc => """Filter type""",
|
#{desc => """Filter type""",
|
||||||
nullable => false,
|
nullable => false,
|
||||||
example => <<"clientid">>})},
|
example => <<"clientid">>})},
|
||||||
{topic, hoconsc:mk(binary(),
|
{topic, hoconsc:mk(emqx_schema:unicode_binary(),
|
||||||
#{desc => """support mqtt wildcard topic.""",
|
#{desc => """support mqtt wildcard topic.""",
|
||||||
nullable => true,
|
nullable => true,
|
||||||
example => <<"/dev/#">>})},
|
example => <<"/dev/#">>})},
|
||||||
{clientid, hoconsc:mk(binary(),
|
{clientid, hoconsc:mk(emqx_schema:unicode_binary(),
|
||||||
#{desc => """mqtt clientid.""",
|
#{desc => """mqtt clientid.""",
|
||||||
nullable => true,
|
nullable => true,
|
||||||
example => <<"dev-001">>})},
|
example => <<"dev-001">>})},
|
||||||
|
|
|
@ -395,9 +395,11 @@ trace(["stop", Operation, ClientId]) ->
|
||||||
trace(["start", Operation, ClientId, LogFile]) ->
|
trace(["start", Operation, ClientId, LogFile]) ->
|
||||||
trace(["start", Operation, ClientId, LogFile, "all"]);
|
trace(["start", Operation, ClientId, LogFile, "all"]);
|
||||||
|
|
||||||
trace(["start", Operation, ClientId, LogFile, Level]) ->
|
trace(["start", Operation, Filter, LogFile, Level]) ->
|
||||||
case trace_type(Operation) of
|
case trace_type(Operation) of
|
||||||
{ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile);
|
{ok, Type} ->
|
||||||
|
trace_on(name(Filter), Type, Filter,
|
||||||
|
list_to_existing_atom(Level), LogFile);
|
||||||
error -> trace([])
|
error -> trace([])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -417,20 +419,22 @@ trace(_) ->
|
||||||
"Stop tracing for a client ip on local node"}
|
"Stop tracing for a client ip on local node"}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
trace_on(Who, Name, Level, LogFile) ->
|
trace_on(Name, Type, Filter, Level, LogFile) ->
|
||||||
case emqx_trace_handler:install(Who, Name, Level, LogFile) of
|
case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]);
|
emqx_trace:check(),
|
||||||
|
emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Who, Name, Error])
|
emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
trace_off(Who, Name) ->
|
trace_off(Who, Filter) ->
|
||||||
case emqx_trace_handler:uninstall(Who, Name) of
|
case emqx_trace_handler:uninstall(Who, name(Filter)) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]);
|
emqx_trace:check(),
|
||||||
|
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Filter]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error])
|
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Filter, Error])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -716,3 +720,6 @@ format_listen_on({Addr, Port}) when is_list(Addr) ->
|
||||||
io_lib:format("~ts:~w", [Addr, Port]);
|
io_lib:format("~ts:~w", [Addr, Port]);
|
||||||
format_listen_on({Addr, Port}) when is_tuple(Addr) ->
|
format_listen_on({Addr, Port}) when is_tuple(Addr) ->
|
||||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
|
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
|
||||||
|
|
||||||
|
name(Filter) ->
|
||||||
|
iolist_to_binary(["CLI-", Filter]).
|
||||||
|
|
|
@ -85,7 +85,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
|
||||||
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
||||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||||
?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
|
?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish message"),
|
||||||
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
|
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
|
||||||
|
|
||||||
%% in case this is a "$events/" event
|
%% in case this is a "$events/" event
|
||||||
|
@ -99,7 +99,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}},
|
||||||
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
||||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||||
?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
|
?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish"),
|
||||||
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
|
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -248,7 +248,7 @@ handle_output(OutId, Selected, Envs) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
|
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
|
||||||
?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}),
|
?TRACE("SEND", #{bridge_id => BridgeId}, "output to bridge"),
|
||||||
emqx_bridge:send_message(BridgeId, Selected);
|
emqx_bridge:send_message(BridgeId, Selected);
|
||||||
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
||||||
Mod:Func(Selected, Envs, Args).
|
Mod:Func(Selected, Envs, Args).
|
||||||
|
|
|
@ -77,7 +77,7 @@ flatten([D1 | L]) when is_list(D1) ->
|
||||||
D1 ++ flatten(L).
|
D1 ++ flatten(L).
|
||||||
|
|
||||||
echo_action(Data, Envs) ->
|
echo_action(Data, Envs) ->
|
||||||
?SLOG(debug, #{msg => "testing_rule_sql_ok", data => Data, envs => Envs}),
|
?TRACE("TEST", #{data => Data, envs => Envs}, "testing_rule_sql_ok"),
|
||||||
Data.
|
Data.
|
||||||
|
|
||||||
fill_default_values(Event, Context) ->
|
fill_default_values(Event, Context) ->
|
||||||
|
|
Loading…
Reference in New Issue