From 489fb7f806b69ff9b59f8310c5da2e8fa93936aa Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 29 Dec 2021 22:54:38 +0800 Subject: [PATCH] fix(trace): copy binary:encode_hex/2 from binary.erl --- apps/emqx/include/logger.hrl | 15 +++- apps/emqx/src/emqx_broker.erl | 4 +- apps/emqx/src/emqx_cm.erl | 27 +++--- apps/emqx/src/emqx_connection.erl | 2 +- apps/emqx/src/emqx_flapping.erl | 4 +- apps/emqx/src/emqx_logger.erl | 10 +-- apps/emqx/src/emqx_logger_textfmt.erl | 73 +++++++++++++-- apps/emqx/src/emqx_packet.erl | 88 +++++++++++++++++-- apps/emqx/src/emqx_schema.erl | 10 +-- apps/emqx/src/emqx_trace/emqx_trace.erl | 47 +++++----- .../src/emqx_trace/emqx_trace_formatter.erl | 21 ++--- .../src/emqx_trace/emqx_trace_handler.erl | 11 +-- apps/emqx/test/emqx_trace_handler_SUITE.erl | 19 ++-- apps/emqx_authn/test/emqx_authn_api_SUITE.erl | 6 +- apps/emqx_conf/src/emqx_conf_schema.erl | 11 +-- .../src/emqx_connector_pgsql.erl | 3 +- .../emqx_management/src/emqx_mgmt_api_app.erl | 2 +- .../src/emqx_mgmt_api_trace.erl | 5 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 38 ++++---- 19 files changed, 265 insertions(+), 131 deletions(-) diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 2398a7dee..d549e2ccb 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -67,13 +67,24 @@ case logger:allow(Level, ?MODULE) of true -> logger:log(Level, (Data), (Meta#{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} - , line => ?LINE + , line => ?LINE })); false -> ok end). --define(TRACE(Event, Msg, Meta), emqx_trace:log(Event, Msg, Meta)). +-define(TRACE_FILTER, emqx_trace_filter). + +%% Only evaluate when necessary +-define(TRACE(Event, Msg, Meta), + begin + case persistent_term:get(?TRACE_FILTER, undefined) of + undefined -> ok; + [] -> ok; + List -> + emqx_trace:log(List, Event, Msg, Meta) + end + end). %% print to 'user' group leader -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 4085b6130..9dbfb0b43 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -205,8 +205,8 @@ publish(Msg) when is_record(Msg, message) -> emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}, topic = Topic} -> - Message = emqx_message:to_log_map(Msg), - ?TRACE("MQTT", "msg_publish_not_allowed", #{message => Message, topic => Topic}), + ?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg), + topic => Topic}), []; Msg1 = #message{topic = Topic} -> emqx_persistent_session:persist_message(Msg1), diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index a2101efa1..eae8dd43d 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -375,7 +375,7 @@ discard_session(ClientId) when is_binary(ClientId) -> -spec kick_or_kill(kick | discard, module(), pid()) -> ok. kick_or_kill(Action, ConnMod, Pid) -> try - %% this is essentailly a gen_server:call implemented in emqx_connection + %% this is essentially a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) @@ -390,19 +390,12 @@ kick_or_kill(Action, ConnMod, Pid) -> ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); _ : {timeout, {gen_server, call, _}} -> ?tp(warning, "session_kick_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), + #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}), ok = force_kill(Pid); _ : Error : St -> ?tp(error, "session_kick_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), + #{pid => Pid, action => Action, reason => Error, stacktrace => St, + stale_channel => stale_channel_info(Pid)}), ok = force_kill(Pid) end. @@ -449,21 +442,21 @@ kick_session(Action, ClientId, ChanPid) -> , error => Error , reason => Reason }, - #{clientid => unicode:characters_to_list(ClientId, utf8)}) + #{clientid => ClientId}) end. kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> ?SLOG(warning, #{msg => "kicked_an_unknown_session"}, - #{clientid => unicode:characters_to_list(ClientId, utf8)}), + #{clientid => ClientId}), ok; ChanPids -> case length(ChanPids) > 1 of true -> ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}, - #{clientid => unicode:characters_to_list(ClientId, utf8)}); + #{clientid => ClientId}); false -> ok end, lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) @@ -480,12 +473,12 @@ with_channel(ClientId, Fun) -> Pids -> Fun(lists:last(Pids)) end. -%% @doc Get all registed channel pids. Debugg/test interface +%% @doc Get all registered channel pids. Debug/test interface all_channels() -> Pat = [{{'_', '$1'}, [], ['$1']}], ets:select(?CHAN_TAB, Pat). -%% @doc Get all registed clientIDs. Debugg/test interface +%% @doc Get all registered clientIDs. Debug/test interface all_client_ids() -> Pat = [{{'$1', '_'}, [], ['$1']}], ets:select(?CHAN_TAB, Pat). @@ -513,7 +506,7 @@ lookup_channels(local, ClientId) -> rpc_call(Node, Fun, Args, Timeout) -> case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of {badrpc, Reason} -> - %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler + %% since emqx app 4.3.10, the 'kick' and 'discard' calls handler %% should catch all exceptions and always return 'ok'. %% This leaves 'badrpc' only possible when there is problem %% calling the remote node. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index e1dab3260..d334ac23e 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -754,7 +754,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> <<>> -> ?SLOG(warning, #{ msg => "packet_is_discarded", reason => "frame_is_too_large", - packet => emqx_packet:format(Packet, null) + packet => emqx_packet:format(Packet, hidden) }), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index df7c7523f..b34819e53 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -121,7 +121,7 @@ handle_cast({detected, #flapping{clientid = ClientId, peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, wind_time_in_ms => WindTime - }, #{clientid => unicode:characters_to_list(ClientId, utf8)}), + }, #{clientid => ClientId}), Now = erlang:system_time(second), Banned = #banned{who = {clientid, ClientId}, by = <<"flapping detector">>, @@ -136,7 +136,7 @@ handle_cast({detected, #flapping{clientid = ClientId, peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, interval => Interval - }, #{clientid => unicode:characters_to_list(ClientId, utf8)}) + }, #{clientid => ClientId}) end, {noreply, State}; diff --git a/apps/emqx/src/emqx_logger.erl b/apps/emqx/src/emqx_logger.erl index 79ac5e6b8..66274a711 100644 --- a/apps/emqx/src/emqx_logger.erl +++ b/apps/emqx/src/emqx_logger.erl @@ -197,15 +197,7 @@ critical(Metadata, Format, Args) when is_map(Metadata) -> set_metadata_clientid(<<>>) -> ok; set_metadata_clientid(ClientId) -> - try - %% try put string format client-id metadata so - %% so the log is not like <<"...">> - Id = unicode:characters_to_list(ClientId, utf8), - set_proc_metadata(#{clientid => Id}) - catch - _: _-> - ok - end. + set_proc_metadata(#{clientid => ClientId}). -spec(set_metadata_peername(peername_str()) -> ok). set_metadata_peername(Peername) -> diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index 986c0fd8a..4e7dbcf14 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -18,22 +18,77 @@ -export([format/2]). -export([check_config/1]). +-export([try_format_unicode/1]). check_config(X) -> logger_formatter:check_config(X). -format(#{msg := {report, Report}, meta := Meta} = Event, Config) when is_map(Report) -> - logger_formatter:format(Event#{msg := {report, enrich(Report, Meta)}}, Config); -format(#{msg := Msg, meta := Meta} = Event, Config) -> - NewMsg = enrich_fmt(Msg, Meta), - logger_formatter:format(Event#{msg := NewMsg}, Config). +format(#{msg := {report, Report0}, meta := Meta} = Event, Config) when is_map(Report0) -> + Report1 = enrich_report_mfa(Report0, Meta), + Report2 = enrich_report_clientid(Report1, Meta), + Report3 = enrich_report_peername(Report2, Meta), + Report4 = enrich_report_topic(Report3, Meta), + logger_formatter:format(Event#{msg := {report, Report4}}, Config); +format(#{msg := {string, String}} = Event, Config) -> + format(Event#{msg => {"~ts ", String}}, Config); +format(#{msg := Msg0, meta := Meta} = Event, Config) -> + Msg1 = enrich_client_info(Msg0, Meta), + Msg2 = enrich_mfa(Msg1, Meta), + Msg3 = enrich_topic(Msg2, Meta), + logger_formatter:format(Event#{msg := Msg3}, Config). -enrich(Report, #{mfa := Mfa, line := Line}) -> +try_format_unicode(Char) -> + List = + try + case unicode:characters_to_list(Char) of + {error, _, _} -> error; + {incomplete, _, _} -> error; + Binary -> Binary + end + catch _:_ -> + error + end, + case List of + error -> io_lib:format("~0p", [Char]); + _ -> List + end. + +enrich_report_mfa(Report, #{mfa := Mfa, line := Line}) -> Report#{mfa => mfa(Mfa), line => Line}; -enrich(Report, _) -> Report. +enrich_report_mfa(Report, _) -> Report. -enrich_fmt({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) -> +enrich_report_clientid(Report, #{clientid := ClientId}) -> + Report#{clientid => try_format_unicode(ClientId)}; +enrich_report_clientid(Report, _) -> Report. + +enrich_report_peername(Report, #{peername := Peername}) -> + Report#{peername => Peername}; +enrich_report_peername(Report, _) -> Report. + +%% clientid and peername always in emqx_conn's process metadata. +%% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2 +enrich_report_topic(Report, #{topic := Topic}) -> + Report#{topic => try_format_unicode(Topic)}; +enrich_report_topic(Report = #{topic := Topic}, _) -> + Report#{topic => try_format_unicode(Topic)}; +enrich_report_topic(Report, _) -> Report. + +enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) -> {Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]}; -enrich_fmt(Msg, _) -> +enrich_mfa(Msg, _) -> + Msg. + +enrich_client_info({Fmt, Args}, #{clientid := ClientId, peername := Peer}) when is_list(Fmt) -> + {" ~ts@~ts " ++ Fmt, [ClientId, Peer | Args] }; +enrich_client_info({Fmt, Args}, #{clientid := ClientId}) when is_list(Fmt) -> + {" ~ts " ++ Fmt, [ClientId | Args]}; +enrich_client_info({Fmt, Args}, #{peername := Peer}) when is_list(Fmt) -> + {" ~ts " ++ Fmt, [Peer | Args]}; +enrich_client_info(Msg, _) -> + Msg. + +enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) -> + {" topic: ~ts" ++ Fmt, [Topic | Args]}; +enrich_topic(Msg, _) -> Msg. mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A). diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 53e4eabfe..23b8390e5 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -48,6 +48,8 @@ , format/2 ]). +-export([encode_hex/1]). + -define(TYPE_NAMES, { 'CONNECT' , 'CONNACK' @@ -440,7 +442,7 @@ will_msg(#mqtt_packet_connect{clientid = ClientId, format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()). %% @doc Format packet --spec(format(emqx_types:packet(), hex | text | null) -> iolist()). +-spec(format(emqx_types:packet(), hex | text | hidden) -> iolist()). format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) -> HeaderIO = format_header(Header), case format_variable(Variable, Payload, PayloadEncode) of @@ -504,11 +506,13 @@ format_variable(#mqtt_packet_puback{packet_id = PacketId, format_variable(#mqtt_packet_subscribe{packet_id = PacketId, topic_filters = TopicFilters}, _) -> - io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]); + [io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=", + format_topic_filters(TopicFilters)]; format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, topic_filters = Topics}, _) -> - io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]); + [io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=", + format_topic_filters(Topics)]; format_variable(#mqtt_packet_suback{packet_id = PacketId, reason_codes = ReasonCodes}, _) -> @@ -527,9 +531,83 @@ format_password(undefined) -> "undefined"; format_password(_Password) -> "******". format_payload(Payload, text) -> ["Payload=", io_lib:format("~ts", [Payload])]; -format_payload(Payload, hex) -> ["Payload(hex)=", binary:encode_hex(Payload)]; -format_payload(_, null) -> "Payload=******". +format_payload(Payload, hex) -> ["Payload(hex)=", encode_hex(Payload)]; +format_payload(_, hidden) -> "Payload=******". i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. + +format_topic_filters(Filters) -> + ["[", + lists:join(",", + lists:map( + fun({TopicFilter, SubOpts}) -> + io_lib:format("~ts(~p)", [TopicFilter, SubOpts]); + (TopicFilter) -> + io_lib:format("~ts", [TopicFilter]) + end, Filters)), + "]"]. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Hex encoding functions +%% Copy from binary:encode_hex/1 (was only introduced in OTP24). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-define(HEX(X), (hex(X)):16). +-compile({inline,[hex/1]}). +-spec encode_hex(Bin) -> Bin2 when + Bin :: binary(), + Bin2 :: <<_:_*16>>. +encode_hex(Data) when byte_size(Data) rem 8 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 7 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 6 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 5 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 4 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 3 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 2 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when is_binary(Data) -> + << <> || <> <= Data >>; +encode_hex(Bin) -> + erlang:error(badarg, [Bin]). + +hex(X) -> + element( + X+1, {16#3030, 16#3031, 16#3032, 16#3033, 16#3034, 16#3035, 16#3036, 16#3037, 16#3038, 16#3039, 16#3041, + 16#3042, 16#3043, 16#3044, 16#3045, 16#3046, + 16#3130, 16#3131, 16#3132, 16#3133, 16#3134, 16#3135, 16#3136, 16#3137, 16#3138, 16#3139, 16#3141, + 16#3142, 16#3143, 16#3144, 16#3145, 16#3146, + 16#3230, 16#3231, 16#3232, 16#3233, 16#3234, 16#3235, 16#3236, 16#3237, 16#3238, 16#3239, 16#3241, + 16#3242, 16#3243, 16#3244, 16#3245, 16#3246, + 16#3330, 16#3331, 16#3332, 16#3333, 16#3334, 16#3335, 16#3336, 16#3337, 16#3338, 16#3339, 16#3341, + 16#3342, 16#3343, 16#3344, 16#3345, 16#3346, + 16#3430, 16#3431, 16#3432, 16#3433, 16#3434, 16#3435, 16#3436, 16#3437, 16#3438, 16#3439, 16#3441, + 16#3442, 16#3443, 16#3444, 16#3445, 16#3446, + 16#3530, 16#3531, 16#3532, 16#3533, 16#3534, 16#3535, 16#3536, 16#3537, 16#3538, 16#3539, 16#3541, + 16#3542, 16#3543, 16#3544, 16#3545, 16#3546, + 16#3630, 16#3631, 16#3632, 16#3633, 16#3634, 16#3635, 16#3636, 16#3637, 16#3638, 16#3639, 16#3641, + 16#3642, 16#3643, 16#3644, 16#3645, 16#3646, + 16#3730, 16#3731, 16#3732, 16#3733, 16#3734, 16#3735, 16#3736, 16#3737, 16#3738, 16#3739, 16#3741, + 16#3742, 16#3743, 16#3744, 16#3745, 16#3746, + 16#3830, 16#3831, 16#3832, 16#3833, 16#3834, 16#3835, 16#3836, 16#3837, 16#3838, 16#3839, 16#3841, + 16#3842, 16#3843, 16#3844, 16#3845, 16#3846, + 16#3930, 16#3931, 16#3932, 16#3933, 16#3934, 16#3935, 16#3936, 16#3937, 16#3938, 16#3939, 16#3941, + 16#3942, 16#3943, 16#3944, 16#3945, 16#3946, + 16#4130, 16#4131, 16#4132, 16#4133, 16#4134, 16#4135, 16#4136, 16#4137, 16#4138, 16#4139, 16#4141, + 16#4142, 16#4143, 16#4144, 16#4145, 16#4146, + 16#4230, 16#4231, 16#4232, 16#4233, 16#4234, 16#4235, 16#4236, 16#4237, 16#4238, 16#4239, 16#4241, + 16#4242, 16#4243, 16#4244, 16#4245, 16#4246, + 16#4330, 16#4331, 16#4332, 16#4333, 16#4334, 16#4335, 16#4336, 16#4337, 16#4338, 16#4339, 16#4341, + 16#4342, 16#4343, 16#4344, 16#4345, 16#4346, + 16#4430, 16#4431, 16#4432, 16#4433, 16#4434, 16#4435, 16#4436, 16#4437, 16#4438, 16#4439, 16#4441, + 16#4442, 16#4443, 16#4444, 16#4445, 16#4446, + 16#4530, 16#4531, 16#4532, 16#4533, 16#4534, 16#4535, 16#4536, 16#4537, 16#4538, 16#4539, 16#4541, + 16#4542, 16#4543, 16#4544, 16#4545, 16#4546, + 16#4630, 16#4631, 16#4632, 16#4633, 16#4634, 16#4635, 16#4636, 16#4637, 16#4638, 16#4639, 16#4641, + 16#4642, 16#4643, 16#4644, 16#4645, 16#4646}). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 123ee5379..fc90ba1bb 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -985,13 +985,13 @@ fields("latency_stats") -> desc => "the number of smaples for calculate the average latency of delivery"})} ]; fields("trace") -> - [ {"payload_encode", sc(hoconsc:enum([hex, text, null]), #{ + [ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{ default => text, desc => """ -Determine the format of the payload format in the trace file.
-- `text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.
-- `hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.
-- `null`: Don't show payload in trace log file. +Determine the format of the payload format in the trace file.
+`text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.
+`hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.
+`hidden`: payload is obfuscated as `******` """ })} ]. diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 3869d300d..5af0d156e 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -26,7 +26,7 @@ -export([ publish/1 , subscribe/3 , unsubscribe/2 - , log/3 + , log/4 ]). -export([ start_link/0 @@ -52,8 +52,7 @@ -define(TRACE, ?MODULE). -define(MAX_SIZE, 30). --define(TRACE_FILTER, emqx_trace_filter). --define(OWN_KEYS,[level,filters,filter_default,handlers]). +-define(OWN_KEYS, [level, filters, filter_default, handlers]). -ifdef(TEST). -export([ log_file/2 @@ -94,18 +93,14 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). -log(Event, Msg, Meta0) -> - case persistent_term:get(?TRACE_FILTER, undefined) of - undefined -> ok; - List -> - Meta = - case logger:get_process_metadata() of - undefined -> Meta0; - ProcMeta -> maps:merge(ProcMeta, Meta0) - end, - Log = #{level => trace, event => Event, meta => Meta, msg => Msg}, - log_filter(List, Log) - end. +log(List, Event, Msg, Meta0) -> + Meta = + case logger:get_process_metadata() of + undefined -> Meta0; + ProcMeta -> maps:merge(ProcMeta, Meta0) + end, + Log = #{level => trace, event => Event, meta => Meta, msg => Msg}, + log_filter(List, Log). log_filter([], _Log) -> ok; log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) -> @@ -196,7 +191,7 @@ update(Name, Enable) -> transaction(Tran). check() -> - erlang:send(?MODULE, {mnesia_table_event, check}). + gen_server:call(?MODULE, check). -spec get_trace_filename(Name :: binary()) -> {ok, FileName :: string()} | {error, not_found}. @@ -241,6 +236,9 @@ init([]) -> update_trace_handler(), {ok, #{timer => TRef, monitors => #{}}}. +handle_call(check, _From, State) -> + {_, NewState} = handle_info({mnesia_table_event, check}, State), + {reply, ok, NewState}; handle_call(Req, _From, State) -> ?SLOG(error, #{unexpected_call => Req}), {reply, ok, State}. @@ -259,8 +257,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor lists:foreach(fun file:delete/1, Files), {noreply, State#{monitors => NewMonitors}} end; -handle_info({timeout, TRef, update_trace}, - #{timer := TRef} = State) -> +handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) -> Traces = get_enable_trace(), NextTRef = update_trace(Traces), update_trace_handler(), @@ -344,10 +341,10 @@ disable_finished(Traces) -> start_trace(Traces, Started0) -> Started = lists:map(fun(#{name := Name}) -> Name end, Started0), - lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) -> + lists:foldl(fun(#?TRACE{name = Name} = Trace, + {Running, StartedAcc}) -> case lists:member(Name, StartedAcc) of - true -> - {[Name | Running], StartedAcc}; + true -> {[Name | Running], StartedAcc}; false -> case start_trace(Trace) of ok -> {[Name | Running], [Name | StartedAcc]}; @@ -366,9 +363,11 @@ start_trace(Trace) -> emqx_trace_handler:install(Who, debug, log_file(Name, Start)). stop_trace(Finished, Started) -> - lists:foreach(fun(#{name := Name, type := Type}) -> + lists:foreach(fun(#{name := Name, type := Type, filter := Filter}) -> case lists:member(Name, Finished) of - true -> emqx_trace_handler:uninstall(Type, Name); + true -> + ?TRACE("API", "trace_stopping", #{Type => Filter}), + emqx_trace_handler:uninstall(Type, Name); false -> ok end end, Started). @@ -455,7 +454,7 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) -> case validate_ip_address(Filter) of ok -> Trace0 = maps:without([type, ip_address], Trace), - to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter}); + to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = binary_to_list(Filter)}); Error -> Error end; to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index a9c4ca31d..2ef142d38 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -25,7 +25,7 @@ format(#{level := trace, event := Event, meta := Meta, msg := Msg}, #{payload_encode := PEncode}) -> Time = calendar:system_time_to_rfc3339(erlang:system_time(second)), - ClientId = maps:get(clientid, Meta, ""), + ClientId = to_iolist(maps:get(clientid, Meta, "")), Peername = maps:get(peername, Meta, ""), MetaBin = format_meta(Meta, PEncode), [Time, " [", Event, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"]; @@ -39,10 +39,7 @@ format_meta(Meta0, Encode) -> Meta1 = maps:without([msg, clientid, peername, packet, payload], Meta0), case Meta1 =:= #{} of true -> [Packet, Payload]; - false -> - Meta2 = lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end, - maps:to_list(Meta1)), - [Packet, ", ", lists:join(",", Meta2), Payload] + false -> [Packet, ", ", map_to_iolist(Meta1), Payload] end. format_packet(undefined, _) -> ""; @@ -50,12 +47,16 @@ format_packet(Packet, Encode) -> [", packet: ", emqx_packet:format(Packet, Encod format_payload(undefined, _) -> ""; format_payload(Payload, text) -> [", payload: ", io_lib:format("~ts", [Payload])]; -format_payload(Payload, hex) -> [", payload(hex): ", binary:encode_hex(Payload)]; -format_payload(_, null) -> ", payload=******". +format_payload(Payload, hex) -> [", payload(hex): ", emqx_packet:encode_hex(Payload)]; +format_payload(_, hidden) -> ", payload=******". to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom); to_iolist(Int) when is_integer(Int) -> integer_to_list(Int); to_iolist(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 2}]); -to_iolist(Bin)when is_binary(Bin) -> unicode:characters_to_binary(Bin); -to_iolist(List) when is_list(List) -> unicode:characters_to_list(List); -to_iolist(Term) -> io_lib:format("~0p", [Term]). +to_iolist(SubMap) when is_map(SubMap) -> ["[", map_to_iolist(SubMap), "]"]; +to_iolist(Char) -> emqx_logger_textfmt:try_format_unicode(Char). + +map_to_iolist(Map) -> + lists:join(",", + lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end, + maps:to_list(Map))). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index f8a5f3828..320421309 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -119,11 +119,11 @@ uninstall(HandlerId) -> running() -> lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). --spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop. +-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log; filter_clientid(_Log, _ExpectId) -> stop. --spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop. +-spec filter_topic(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) -> case emqx_topic:match(Topic, TopicFilter) of true -> Log; @@ -140,7 +140,7 @@ filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) -> filter_ip_address(_Log, _ExpectId) -> stop. filters(#{type := clientid, filter := Filter, name := Name}) -> - [{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}]; + [{clientid, {fun ?MODULE:filter_clientid/2, {Filter, Name}}}]; filters(#{type := topic, filter := Filter, name := Name}) -> [{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}]; filters(#{type := ip_address, filter := Filter, name := Name}) -> @@ -149,8 +149,9 @@ filters(#{type := ip_address, filter := Filter, name := Name}) -> formatter(#{type := _Type}) -> {emqx_trace_formatter, #{ - template => [], - single_line => false, + %% template is for ?SLOG message not ?TRACE. + template => [time," [",level,"] ", msg,"\n"], + single_line => true, max_size => unlimited, depth => unlimited, payload_encode => payload_encode() diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index f3ab7b5b8..1224fdac9 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -32,19 +32,22 @@ all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_ init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([emqx_modules]), + emqx_common_test_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_modules]). + emqx_common_test_helpers:stop_apps([]). init_per_testcase(t_trace_clientid, Config) -> + init(), Config; init_per_testcase(_Case, Config) -> _ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()], + init(), Config. end_per_testcase(_Case, _Config) -> + terminate(), ok. t_trace_clientid(_Config) -> @@ -66,11 +69,11 @@ t_trace_clientid(_Config) -> ?assert(filelib:is_regular("tmp/client3.log")), %% Get current traces - ?assertMatch([#{type := clientid, filter := "client", name := <<"CLI-client1">>, + ?assertMatch([#{type := clientid, filter := <<"client">>, name := <<"CLI-client1">>, level := debug, dst := "tmp/client.log"}, - #{type := clientid, filter := "client2", name := <<"CLI-client2">> + #{type := clientid, filter := <<"client2">>, name := <<"CLI-client2">> , level := debug, dst := "tmp/client2.log"}, - #{type := clientid, filter := "client3", name := <<"CLI-client3">>, + #{type := clientid, filter := <<"client3">>, name := <<"CLI-client3">>, level := debug, dst := "tmp/client3.log"} ], emqx_trace_handler:running()), @@ -231,3 +234,9 @@ filesync(Name0, Type, Retry) -> ct:sleep(100), filesync(Name, Type, Retry - 1) end. + +init() -> + emqx_trace:start_link(). + +terminate() -> + catch ok = gen_server:stop(emqx_trace, normal, 5000). diff --git a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl index 820eeb859..31e5e52e1 100644 --- a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl @@ -43,6 +43,7 @@ groups() -> []. init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), emqx_authn_test_lib:delete_authenticators( [?CONF_NS_ATOM], ?GLOBAL), @@ -55,8 +56,9 @@ init_per_testcase(_, Config) -> Config. init_per_suite(Config) -> + _ = application:load(emqx_conf), ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_authn, emqx_dashboard], + [emqx_authn, emqx_dashboard], fun set_special_configs/1), ?AUTHN:delete_chain(?GLOBAL), @@ -65,7 +67,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_authn, emqx_dashboard]), + emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authn]), ok. set_special_configs(emqx_dashboard) -> diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 1ea368826..385953b87 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -792,16 +792,7 @@ do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) -> }}; do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) -> {emqx_logger_textfmt, - #{template => - [time," [",level,"] ", - {clientid, - [{peername, - [clientid,"@",peername," "], - [clientid, " "]}], - [{peername, - [peername," "], - []}]}, - msg,"\n"], + #{template => [time," [",level,"] ", msg,"\n"], chars_limit => CharsLimit, single_line => SingleLine, time_offset => TimeOffSet, diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 5d9d0e1d9..81435a1c5 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -88,7 +88,8 @@ on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) -> {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]}; {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]} end, - ?TRACE("QUERY", "postgresql_connector_received", #{connector => InstId, command => Command, args => Args, state => State}}), + ?TRACE("QUERY", "postgresql_connector_received", + #{connector => InstId, command => Command, args => Args, state => State}), case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of {error, Reason} -> ?SLOG(error, #{ diff --git a/apps/emqx_management/src/emqx_mgmt_api_app.erl b/apps/emqx_management/src/emqx_mgmt_api_app.erl index dfce3cf30..489d679be 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_app.erl @@ -149,7 +149,7 @@ api_key(post, #{body := App}) -> Desc = unicode:characters_to_binary(Desc0, unicode), case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of {ok, NewApp} -> {200, format(NewApp)}; - {error, Reason} -> {400, Reason} + {error, Reason} -> {400, io_lib:format("~p", [Reason])} end. api_key_by_name(get, #{bindings := #{name := Name}}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 15277fa8e..296cecea2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -129,11 +129,10 @@ schema("/trace/:name/log") -> hoconsc:ref(position), hoconsc:ref(node) ], - %% todo response data responses => #{ 200 => [ - {items, hoconsc:mk(binary(), #{example => "BinBinBin"})} + {items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})} | fields(bytes) ++ fields(position) ] } @@ -307,7 +306,7 @@ download_trace_log(get, #{bindings := #{name := Name}}) -> {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), emqx_trace:delete_files_after_send(ZipFileName, Zips), Headers = #{ - <<"content-type">> => <<"application/octet-stream">>, + <<"content-type">> => <<"application/x-zip">>, <<"content-disposition">> => iolist_to_binary("attachment; filename=" ++ filename:basename(ZipFile)) }, diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 0084855ab..3a723f33c 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -18,6 +18,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("emqx_mgmt.hrl"). @@ -386,19 +387,19 @@ trace(["list"]) -> emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) end, emqx_trace_handler:running()); -trace(["stop", Operation, ClientId]) -> - case trace_type(Operation) of - {ok, Type} -> trace_off(Type, ClientId); +trace(["stop", Operation, Filter0]) -> + case trace_type(Operation, Filter0) of + {ok, Type, Filter} -> trace_off(Type, Filter); error -> trace([]) end; trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); -trace(["start", Operation, Filter, LogFile, Level]) -> - case trace_type(Operation) of - {ok, Type} -> - trace_on(name(Filter), Type, Filter, +trace(["start", Operation, Filter0, LogFile, Level]) -> + case trace_type(Operation, Filter0) of + {ok, Type, Filter} -> + trace_on(name(Filter0), Type, Filter, list_to_existing_atom(Level), LogFile); error -> trace([]) end; @@ -428,13 +429,14 @@ trace_on(Name, Type, Filter, Level, LogFile) -> emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error]) end. -trace_off(Who, Filter) -> - case emqx_trace_handler:uninstall(Who, name(Filter)) of +trace_off(Type, Filter) -> + ?TRACE("CLI", "trace_stopping", #{Type => Filter}), + case emqx_trace_handler:uninstall(Type, name(Filter)) of ok -> emqx_trace:check(), - emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Filter]); + emqx_ctl:print("stop tracing ~s ~s successfully~n", [Type, Filter]); {error, Error} -> - emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Filter, Error]) + emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Type, Filter, Error]) end. %%-------------------------------------------------------------------- @@ -463,9 +465,9 @@ traces(["delete", Name]) -> traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter, "900"]); -traces(["start", Name, Operation, Filter, DurationS]) -> - case trace_type(Operation) of - {ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS); +traces(["start", Name, Operation, Filter0, DurationS]) -> + case trace_type(Operation, Filter0) of + {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); error -> traces([]) end; @@ -507,10 +509,10 @@ trace_cluster_off(Name) -> {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error]) end. -trace_type("client") -> {ok, clientid}; -trace_type("topic") -> {ok, topic}; -trace_type("ip_address") -> {ok, ip_address}; -trace_type(_) -> error. +trace_type("client", ClientId) -> {ok, clientid, list_to_binary(ClientId)}; +trace_type("topic", Topic) -> {ok, topic, list_to_binary(Topic)}; +trace_type("ip_address", IP) -> {ok, ip_address, IP}; +trace_type(_, _) -> error. %%-------------------------------------------------------------------- %% @doc Listeners Command