From 668180388c08c6655358cb92a0f9a7216e6f2907 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 20 Dec 2021 09:23:00 +0800 Subject: [PATCH] feat(trace): replace logger_formatter by emqx_trace_formatter --- apps/emqx/include/logger.hrl | 2 + apps/emqx/src/emqx_broker.erl | 3 +- apps/emqx/src/emqx_channel.erl | 2 +- apps/emqx/src/emqx_connection.erl | 15 ++- apps/emqx/src/emqx_packet.erl | 3 +- apps/emqx/src/emqx_trace/emqx_trace.erl | 101 ++++++++++++------ .../src/emqx_trace/emqx_trace_formatter.erl | 60 +++++++++++ .../src/emqx_trace/emqx_trace_handler.erl | 65 +++++------ apps/emqx/src/emqx_ws_connection.erl | 11 +- apps/emqx/test/emqx_trace_handler_SUITE.erl | 18 ++-- .../src/emqx_connector_http.erl | 5 +- .../src/emqx_connector_ldap.erl | 5 +- .../src/emqx_connector_mongo.erl | 5 +- .../src/emqx_connector_mqtt.erl | 4 +- .../src/emqx_connector_mysql.erl | 4 +- .../src/emqx_connector_pgsql.erl | 4 +- .../src/emqx_connector_redis.erl | 4 +- .../src/emqx_mgmt_api_trace.erl | 4 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 27 +++-- .../src/emqx_rule_outputs.erl | 4 +- .../src/emqx_rule_runtime.erl | 2 +- .../src/emqx_rule_sqltester.erl | 2 +- 22 files changed, 214 insertions(+), 136 deletions(-) create mode 100644 apps/emqx/src/emqx_trace/emqx_trace_formatter.erl diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index c2ee5ab95..ecedfafe7 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -69,6 +69,8 @@ ok end). +-define(TRACE(Action, Meta, Msg), emqx_trace:log(Action, Meta, Msg)). + %% print to 'user' group leader -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)). -define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index a82ab9b45..6dce69136 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -205,8 +205,7 @@ publish(Msg) when is_record(Msg, message) -> emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> - ?SLOG(debug, #{msg => "message_not_published", - payload => emqx_message:to_log_map(Msg)}), + ?TRACE("NotAllow", #{payload => emqx_message:to_log_map(Msg)}, "message_not_published"), []; Msg1 = #message{topic = Topic} -> emqx_persistent_session:persist_message(Msg1), diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index eb71aca58..7fe366ad1 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -292,7 +292,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) -> fun check_banned/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), + ?TRACE("RECV", #{}, Packet), NChannel1 = NChannel#channel{ will_msg = emqx_packet:will_msg(NConnPkt), alias_maximum = init_alias_maximum(NConnPkt, ClientInfo) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 6919c6ff8..b8a6b4b7b 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -449,14 +449,12 @@ handle_msg({'$gen_cast', Req}, State) -> {ok, NewState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> - ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => Inet}), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), when_bytes_in(Oct, Data, State); handle_msg({quic, Data, _Sock, _, _, _}, State) -> - ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => quic}), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), @@ -528,7 +526,7 @@ handle_msg({connack, ConnAck}, State) -> handle_outgoing(ConnAck, State); handle_msg({close, Reason}, State) -> - ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), + ?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"), handle_info({sock_closed, Reason}, close_socket(State)); handle_msg({event, connected}, State = #state{channel = Channel}) -> @@ -566,7 +564,8 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, Channel1 = emqx_channel:set_conn_state(disconnected, Channel), emqx_congestion:cancel_alarms(Socket, Transport, Channel1), emqx_channel:terminate(Reason, Channel1), - close_socket_ok(State) + close_socket_ok(State), + ?TRACE("TERMINATE", #{reason => Reason}, "terminated") catch E : C : S -> ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) @@ -716,7 +715,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> ok = inc_incoming_stats(Packet), - ?SLOG(debug, #{msg => "RECV_packet", packet => emqx_packet:format(Packet)}), + ?TRACE("RECV", #{}, Packet), with_channel(handle_in, [Packet], State); handle_incoming(FrameError, State) -> @@ -760,10 +759,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; - Data -> ?SLOG(debug, #{ - msg => "SEND_packet", - packet => emqx_packet:format(Packet) - }), + Data -> + ?TRACE("SEND", #{}, Packet), ok = inc_outgoing_stats(Packet), Data catch diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 60835d4ab..02165a6b5 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -453,7 +453,7 @@ format_variable(undefined, _) -> format_variable(Variable, undefined) -> format_variable(Variable); format_variable(Variable, Payload) -> - io_lib:format("~ts, Payload=~0p", [format_variable(Variable), Payload]). + io_lib:format("~ts, Payload=~ts", [format_variable(Variable), Payload]). format_variable(#mqtt_packet_connect{ proto_ver = ProtoVer, @@ -520,4 +520,3 @@ format_password(_Password) -> '******'. i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. - diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 42e4d0baf..f4679e073 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -26,6 +26,7 @@ -export([ publish/1 , subscribe/3 , unsubscribe/2 + , log/3 ]). -export([ start_link/0 @@ -36,6 +37,7 @@ , delete/1 , clear/0 , update/2 + , check/0 ]). -export([ format/1 @@ -50,6 +52,8 @@ -define(TRACE, ?MODULE). -define(MAX_SIZE, 30). +-define(TRACE_FILTER, emqx_trace_filter). +-define(OWN_KEYS,[level,filters,filter_default,handlers]). -ifdef(TEST). -export([ log_file/2 @@ -80,27 +84,53 @@ mnesia(boot) -> publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> - emqx_logger:info( - #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "PUBLISH to ~s: ~0p", - [Topic, Payload] - ). + ?TRACE("PUBLISH", #{topic => Topic}, {publish, Payload}). subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore; subscribe(Topic, SubId, SubOpts) -> - emqx_logger:info( - #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "~ts SUBSCRIBE ~ts: Options: ~0p", - [SubId, Topic, SubOpts] - ). + ?TRACE("SUBSCRIBE", #{topic => Topic}, {subscribe, SubId, SubOpts}). unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; unsubscribe(Topic, SubOpts) -> - emqx_logger:info( - #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "~ts UNSUBSCRIBE ~ts: Options: ~0p", - [maps:get(subid, SubOpts, ""), Topic, SubOpts] - ). + ?TRACE("UNSUBSCRIBE", #{topic => Topic}, {unsubscribe, SubOpts}). + +log(Action, Meta0, Msg) -> + case persistent_term:get(?TRACE_FILTER, undefined) of + undefined -> ok; + List -> + Meta = maps:merge(logger:get_process_metadata(), Meta0), + Log = #{level => trace, action => Action, meta => Meta, msg => Msg}, + log_filter(List, Log) + end. + +log_filter([], _Log) -> ok; +log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) -> + case FilterFun(Log0, {Filter, Name}) of + stop -> stop; + ignore -> ignore; + Log -> + case logger_config:get(ets:whereis(logger), Id) of + {ok, #{module := Module} = HandlerConfig0} -> + HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0), + try Module:log(Log, HandlerConfig) + catch C:R:S -> + case logger:remove_handler(Id) of + ok -> + logger:internal_log(error, {removed_failing_handler, Id}); + {error,{not_found,_}} -> + %% Probably already removed by other client + %% Don't report again + ok; + {error,Reason} -> + logger:internal_log(error, + {removed_handler_failed, Id, Reason, C, R, S}) + end + end; + {error, {not_found, Id}} -> ok; + {error, Reason} -> logger:internal_log(error, {find_handle_id_failed, Id, Reason}) + end + end, + log_filter(Rest, Log0). -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> @@ -161,6 +191,9 @@ update(Name, Enable) -> end, transaction(Tran). +check() -> + erlang:send(?MODULE, {mnesia_table_event, check}). + -spec get_trace_filename(Name :: binary()) -> {ok, FileName :: string()} | {error, not_found}. get_trace_filename(Name) -> @@ -196,14 +229,13 @@ format(Traces) -> init([]) -> ok = mria:wait_for_tables([?TRACE]), erlang:process_flag(trap_exit, true), - OriginLogLevel = emqx_logger:get_primary_log_level(), ok = filelib:ensure_dir(trace_dir()), ok = filelib:ensure_dir(zip_dir()), {ok, _} = mnesia:subscribe({table, ?TRACE, simple}), Traces = get_enable_trace(), - ok = update_log_primary_level(Traces, OriginLogLevel), TRef = update_trace(Traces), - {ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}. + update_trace_handler(), + {ok, #{timer => TRef, monitors => #{}}}. handle_call(Req, _From, State) -> ?SLOG(error, #{unexpected_call => Req}), @@ -224,10 +256,10 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor {noreply, State#{monitors => NewMonitors}} end; handle_info({timeout, TRef, update_trace}, - #{timer := TRef, primary_log_level := OriginLogLevel} = State) -> + #{timer := TRef} = State) -> Traces = get_enable_trace(), - ok = update_log_primary_level(Traces, OriginLogLevel), NextTRef = update_trace(Traces), + update_trace_handler(), {noreply, State#{timer => NextTRef}}; handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> @@ -238,11 +270,11 @@ handle_info(Info, State) -> ?SLOG(error, #{unexpected_info => Info}), {noreply, State}. -terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) -> - ok = set_log_primary_level(OriginLogLevel), +terminate(_Reason, #{timer := TRef}) -> _ = mnesia:unsubscribe({table, ?TRACE, simple}), emqx_misc:cancel_timer(TRef), stop_all_trace_handler(), + update_trace_handler(), _ = file:del_dir_r(zip_dir()), ok. @@ -270,7 +302,7 @@ update_trace(Traces) -> disable_finished(Finished), Started = emqx_trace_handler:running(), {NeedRunning, AllStarted} = start_trace(Running, Started), - NeedStop = AllStarted -- NeedRunning, + NeedStop = filter_cli_handler(AllStarted) -- NeedRunning, ok = stop_trace(NeedStop, Started), clean_stale_trace_files(), NextTime = find_closest_time(Traces, Now), @@ -481,11 +513,20 @@ transaction(Tran) -> {aborted, Reason} -> {error, Reason} end. -update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel); -update_log_primary_level(_, _) -> set_log_primary_level(debug). - -set_log_primary_level(NewLevel) -> - case NewLevel =/= emqx_logger:get_primary_log_level() of - true -> emqx_logger:set_primary_log_level(NewLevel); - false -> ok +update_trace_handler() -> + case emqx_trace_handler:running() of + [] -> persistent_term:erase(?TRACE_FILTER); + Running -> + List = lists:map(fun(#{id := Id, filter_fun := FilterFun, + filter := Filter, name := Name}) -> + {Id, FilterFun, Filter, Name} end, Running), + case List =/= persistent_term:get(?TRACE_FILTER, undefined) of + true -> persistent_term:put(?TRACE_FILTER, List); + false -> ok + end end. + +filter_cli_handler(Names) -> + lists:filter(fun(Name) -> + notmatch =:= re:run(Name, "^CLI-+.", []) + end, Names). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl new file mode 100644 index 000000000..1b37b7130 --- /dev/null +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_trace_formatter). + +-export([format/2]). + +%%%----------------------------------------------------------------- +%%% API +-spec format(LogEvent, Config) -> unicode:chardata() when + LogEvent :: logger:log_event(), + Config :: logger:config(). +format(#{level := trace, msg := Msg, meta := Meta, action := Action}, _Config) -> + Time = calendar:system_time_to_rfc3339(erlang:system_time(second)), + ClientId = maps:get(clientid, Meta, ""), + Peername = maps:get(peername, Meta, ""), + MsgBin = format_msg(Msg), + MetaBin = format_map(maps:without([clientid, peername], Meta)), + [Time, " [", Action, "] ", ClientId, "@", Peername, " ", MsgBin, " ( ", + MetaBin, ")\n"]; + +format(Event, Config) -> + emqx_logger_textfmt:format(Event, Config). + +format_msg(Bin)when is_binary(Bin) -> Bin; +format_msg(List) when is_list(List) -> List; +format_msg({publish, Payload}) -> + io_lib:format("Publish Payload:(~ts) TO ", [Payload]); +format_msg({subscribe, SubId, SubOpts}) -> + [io_lib:format("SUBSCRIBE ~ts, Opts( ", [SubId]), + format_map(SubOpts), ")"]; +format_msg({unsubscribe, SubOpts}) -> + [io_lib:format("UNSUBSCRIBE ~ts, Opts( ", [maps:get(subid, SubOpts, "undefined")]), + format_map(maps:without([subid], SubOpts)), ")"]; +format_msg(Packet) -> + emqx_packet:format(Packet). + +format_map(Map) -> + maps:fold(fun(K, V, Acc) -> + [to_iolist(K), ":", to_iolist(V), " "|Acc] + end, [], Map). + +to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom); +to_iolist(Int) when is_integer(Int) -> integer_to_list(Int); +to_iolist(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 2}]); +to_iolist(Bin)when is_binary(Bin) -> unicode:characters_to_binary(Bin); +to_iolist(List) when is_list(List) -> unicode:characters_to_list(List); +to_iolist(Term) -> io_lib:format("~0p", [Term]). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index c76bf1aa9..9c991301c 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -25,6 +25,7 @@ -export([ running/0 , install/3 , install/4 + , install/5 , uninstall/1 , uninstall/2 ]). @@ -77,22 +78,18 @@ install(Type, Filter, Level, LogFile) -> -spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}. install(Who, all, LogFile) -> install(Who, debug, LogFile); -install(Who, Level, LogFile) -> - PrimaryLevel = emqx_logger:get_primary_log_level(), - try logger:compare_levels(Level, PrimaryLevel) of - lt -> - {error, - io_lib:format( - "Cannot trace at a log level (~s) " - "lower than the primary log level (~s)", - [Level, PrimaryLevel] - )}; - _GtOrEq -> - install_handler(Who, Level, LogFile) - catch - error:badarg -> - {error, {invalid_log_level, Level}} - end. +install(Who = #{name := Name, type := Type}, Level, LogFile) -> + HandlerId = handler_id(Name, Type), + Config = #{ + level => Level, + formatter => formatter(Who), + filter_default => stop, + filters => filters(Who), + config => ?CONFIG(LogFile) + }, + Res = logger:add_handler(HandlerId, logger_disk_log_h, Config), + show_prompts(Res, Who, "start_trace"), + Res. -spec uninstall(Type :: clientid | topic | ip_address, Name :: binary() | list()) -> ok | {error, term()}. @@ -121,38 +118,25 @@ uninstall(HandlerId) -> running() -> lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). --spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore. +-spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop. filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log; -filter_clientid(_Log, _ExpectId) -> ignore. +filter_clientid(_Log, _ExpectId) -> stop. --spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore. +-spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop. filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) -> case emqx_topic:match(Topic, TopicFilter) of true -> Log; - false -> ignore + false -> stop end; -filter_topic(_Log, _ExpectId) -> ignore. +filter_topic(_Log, _ExpectId) -> stop. --spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore. +-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop. filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) -> case lists:prefix(IP, Peername) of true -> Log; - false -> ignore + false -> stop end; -filter_ip_address(_Log, _ExpectId) -> ignore. - -install_handler(Who = #{name := Name, type := Type}, Level, LogFile) -> - HandlerId = handler_id(Name, Type), - Config = #{ - level => Level, - formatter => formatter(Who), - filter_default => stop, - filters => filters(Who), - config => ?CONFIG(LogFile) - }, - Res = logger:add_handler(HandlerId, logger_disk_log_h, Config), - show_prompts(Res, Who, "Start trace"), - Res. +filter_ip_address(_Log, _ExpectId) -> stop. filters(#{type := clientid, filter := Filter, name := Name}) -> [{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}]; @@ -162,7 +146,7 @@ filters(#{type := ip_address, filter := Filter, name := Name}) -> [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]. formatter(#{type := Type}) -> - {logger_formatter, + {emqx_trace_formatter, #{ template => template(Type), single_line => false, @@ -176,7 +160,6 @@ formatter(#{type := Type}) -> %% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read. template(clientid) -> [time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"]; -%% TODO better format when clientid is utf8. template(_) -> [time, " [", level, "] ", {clientid, @@ -189,11 +172,11 @@ template(_) -> filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) -> Init = #{id => Id, level => Level, dst => Dst}, case Filters of - [{Type, {_FilterFun, {Filter, Name}}}] when + [{Type, {FilterFun, {Filter, Name}}}] when Type =:= topic orelse Type =:= clientid orelse Type =:= ip_address -> - [Init#{type => Type, filter => Filter, name => Name} | Acc]; + [Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc]; _ -> Acc end. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 375b1ae2f..6e6bc1c90 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -347,7 +347,6 @@ websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, Data}, State) -> - ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => websocket}), State2 = ensure_stats_timer(State), {Packets, State3} = parse_incoming(Data, [], State2), LenMsg = erlang:length(Packets), @@ -432,11 +431,11 @@ websocket_info(Info, State) -> websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) -> websocket_close(ReasonCode, State); websocket_close(Reason, State) -> - ?SLOG(debug, #{msg => "websocket_closed", reason => Reason}), + ?TRACE("CLOSED", #{transport => websocket, reason => Reason}, "websocket_closed"), handle_info({sock_closed, Reason}, State). terminate(Reason, _Req, #state{channel = Channel}) -> - ?SLOG(debug, #{msg => "terminated", reason => Reason}), + ?TRACE("TERMINATE", #{transport => websocket, reason => Reason}, "webscoket_terminated"), emqx_channel:terminate(Reason, Channel); terminate(_Reason, _Req, _UnExpectedState) -> @@ -480,7 +479,7 @@ handle_info({connack, ConnAck}, State) -> return(enqueue(ConnAck, State)); handle_info({close, Reason}, State) -> - ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), + ?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"), return(enqueue({close, Reason}, State)); handle_info({event, connected}, State = #state{channel = Channel}) -> @@ -663,7 +662,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when is_record(Packet, mqtt_packet) -> - ?SLOG(debug, #{msg => "RECV", packet => emqx_packet:format(Packet)}), + ?TRACE("RECV", #{transport => websocket}, Packet), ok = inc_incoming_stats(Packet), NState = case emqx_pd:get_counter(incoming_pubs) > get_active_n(Type, Listener) of @@ -727,7 +726,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; - Data -> ?SLOG(debug, #{msg => "SEND", packet => Packet}), + Data -> ?TRACE("SEND", #{transport => websocket}, Packet), ok = inc_outgoing_stats(Packet), Data catch diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index abe233b58..01d587028 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -32,36 +32,30 @@ all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_ init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), + emqx_common_test_helpers:start_apps([emqx_modules]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]). + emqx_common_test_helpers:stop_apps([emqx_modules]). init_per_testcase(t_trace_clientid, Config) -> Config; init_per_testcase(_Case, Config) -> - ok = emqx_logger:set_log_level(debug), _ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()], Config. end_per_testcase(_Case, _Config) -> - ok = emqx_logger:set_log_level(warning), ok. t_trace_clientid(_Config) -> %% Start tracing - emqx_logger:set_log_level(error), - {error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"), - emqx_logger:set_log_level(debug), %% add list clientid ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"), ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"), ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"), - {error, {invalid_log_level, bad_level}} = - emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"), {error, {handler_not_added, {file_error, ".", eisdir}}} = emqx_trace_handler:install(clientid, <<"client5">>, debug, "."), + emqx_trace:check(), ok = filesync(<<"client">>, clientid), ok = filesync(<<"client2">>, clientid), ok = filesync(<<"client3">>, clientid), @@ -106,10 +100,9 @@ t_trace_clientid(_Config) -> ?assertEqual([], emqx_trace_handler:running()). t_trace_clientid_utf8(_) -> - emqx_logger:set_log_level(debug), - Utf8Id = <<"client 漢字編碼"/utf8>>, ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"), + emqx_trace:check(), {ok, T} = emqtt:start_link([{clientid, Utf8Id}]), emqtt:connect(T), [begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)], @@ -126,9 +119,9 @@ t_trace_topic(_Config) -> emqtt:connect(T), %% Start tracing - emqx_logger:set_log_level(debug), ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"), ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"), + emqx_trace:check(), ok = filesync(<<"x/#">>, topic), ok = filesync(<<"y/#">>, topic), @@ -174,6 +167,7 @@ t_trace_ip_address(_Config) -> %% Start tracing ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"), ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"), + emqx_trace:check(), ok = filesync(<<"127.0.0.1">>, ip_address), ok = filesync(<<"192.168.1.1">>, ip_address), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 2b9bd48aa..b54d87f12 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -199,9 +199,8 @@ on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName, base_path := BasePath} = State) -> - ?SLOG(debug, #{msg => "http connector received request", - request => Request, connector => InstId, - state => State}), + ?TRACE("QUERY", #{request => Request, connector => InstId, state => State}, + "http connector received request"), NRequest = update_path(BasePath, Request), case Result = ehttpc:request(case KeyOrNum of undefined -> PoolName; diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 8af516b82..8aa1f9319 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -87,9 +87,8 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) -> Request = {Base, Filter, Attributes}, - ?SLOG(debug, #{msg => "ldap connector received request", - request => Request, connector => InstId, - state => State}), + ?TRACE("QUERY", #{request => Request, connector => InstId, state => State}, + "ldap connector received request"), case Result = ecpool:pick_and_do( PoolName, {?MODULE, search, [Base, Filter, Attributes]}, diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 6a1b15e57..d2594ab93 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -137,9 +137,8 @@ on_query(InstId, AfterQuery, #{poolname := PoolName} = State) -> Request = {Action, Collection, Selector, Docs}, - ?SLOG(debug, #{msg => "mongodb connector received request", - request => Request, connector => InstId, - state => State}), + ?TRACE("QUERY", #{request => Request, connector => InstId, state => State}, + "mongodb connector received request"), case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index f8d17ce32..6d620cc14 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -150,8 +150,8 @@ on_stop(_InstId, #{name := InstanceId}) -> end. on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> - ?SLOG(debug, #{msg => "send msg to remote node", message => Msg, - connector => InstanceId}), + ?TRACE("QUERY", #{message => Msg, connector => InstanceId}, + "send msg to remote node"), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_resource:query_success(AfterQuery). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index c93a1e350..def3904b4 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -85,8 +85,8 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State); on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> - ?SLOG(debug, #{msg => "mysql connector received sql query", - connector => InstId, sql => SQL, state => State}), + ?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State}, + "mysql connector received sql query"), case Result = ecpool:pick_and_do( PoolName, {mysql, query, [SQL, Params, Timeout]}, diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index f42bed666..ac864a45d 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -83,8 +83,8 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, []}, AfterQuery, State); on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> - ?SLOG(debug, #{msg => "postgresql connector received sql query", - connector => InstId, sql => SQL, state => State}), + ?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State}, + "postgresql connector received sql query"), case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of {error, Reason} -> ?SLOG(error, #{ diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 075ede0bc..61b716b8b 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -125,8 +125,8 @@ on_stop(InstId, #{poolname := PoolName}) -> emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> - ?SLOG(debug, #{msg => "redis connector received cmd query", - connector => InstId, sql => Command, state => State}), + ?TRACE("QUERY", #{connector => InstId, sql => Command, state => State}, + "redis connector received cmd query"), Result = case Type of cluster -> eredis_cluster:q(PoolName, Command); _ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index d6902d123..5669e5653 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -142,11 +142,11 @@ fields(trace) -> #{desc => """Filter type""", nullable => false, example => <<"clientid">>})}, - {topic, hoconsc:mk(binary(), + {topic, hoconsc:mk(emqx_schema:unicode_binary(), #{desc => """support mqtt wildcard topic.""", nullable => true, example => <<"/dev/#">>})}, - {clientid, hoconsc:mk(binary(), + {clientid, hoconsc:mk(emqx_schema:unicode_binary(), #{desc => """mqtt clientid.""", nullable => true, example => <<"dev-001">>})}, diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 5885a2b17..0084855ab 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -395,9 +395,11 @@ trace(["stop", Operation, ClientId]) -> trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); -trace(["start", Operation, ClientId, LogFile, Level]) -> +trace(["start", Operation, Filter, LogFile, Level]) -> case trace_type(Operation) of - {ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile); + {ok, Type} -> + trace_on(name(Filter), Type, Filter, + list_to_existing_atom(Level), LogFile); error -> trace([]) end; @@ -417,20 +419,22 @@ trace(_) -> "Stop tracing for a client ip on local node"} ]). -trace_on(Who, Name, Level, LogFile) -> - case emqx_trace_handler:install(Who, Name, Level, LogFile) of +trace_on(Name, Type, Filter, Level, LogFile) -> + case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of ok -> - emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]); + emqx_trace:check(), + emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]); {error, Error} -> - emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Who, Name, Error]) + emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error]) end. -trace_off(Who, Name) -> - case emqx_trace_handler:uninstall(Who, Name) of +trace_off(Who, Filter) -> + case emqx_trace_handler:uninstall(Who, name(Filter)) of ok -> - emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]); + emqx_trace:check(), + emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Filter]); {error, Error} -> - emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error]) + emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Filter, Error]) end. %%-------------------------------------------------------------------- @@ -716,3 +720,6 @@ format_listen_on({Addr, Port}) when is_list(Addr) -> io_lib:format("~ts:~w", [Addr, Port]); format_listen_on({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]). + +name(Filter) -> + iolist_to_binary(["CLI-", Filter]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index 61a520e81..70aa68cf5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -85,7 +85,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), - ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}), + ?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish message"), safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); %% in case this is a "$events/" event @@ -99,7 +99,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), - ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}), + ?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish"), safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). %%-------------------------------------------------------------------- diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 4225c6f72..c61296d87 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -248,7 +248,7 @@ handle_output(OutId, Selected, Envs) -> end. do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> - ?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}), + ?TRACE("SEND", #{bridge_id => BridgeId}, "output to bridge"), emqx_bridge:send_message(BridgeId, Selected); do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> Mod:Func(Selected, Envs, Args). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 74ec1bb1c..7a9da25a2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -77,7 +77,7 @@ flatten([D1 | L]) when is_list(D1) -> D1 ++ flatten(L). echo_action(Data, Envs) -> - ?SLOG(debug, #{msg => "testing_rule_sql_ok", data => Data, envs => Envs}), + ?TRACE("TEST", #{data => Data, envs => Envs}, "testing_rule_sql_ok"), Data. fill_default_values(Event, Context) ->