diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 476794223..a2f77c044 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -32,7 +32,8 @@ payload_encode = text :: hex | text | hidden | '_', extra = #{} :: map() | '_', start_at :: integer() | undefined | '_', - end_at :: integer() | undefined | '_' + end_at :: integer() | undefined | '_', + formatter = plain :: plain | json }). -define(SHARD, ?COMMON_SHARD). diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 4ae973722..6a255806a 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -183,8 +183,10 @@ create(Trace) -> case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of true -> case to_trace(Trace) of - {ok, TraceRec} -> insert_new_trace(TraceRec); - {error, Reason} -> {error, Reason} + {ok, TraceRec} -> + insert_new_trace(TraceRec); + {error, Reason} -> + {error, Reason} end; false -> {error, @@ -392,9 +394,16 @@ start_trace(Trace) -> type = Type, filter = Filter, start_at = Start, - payload_encode = PayloadEncode + payload_encode = PayloadEncode, + formatter = Formatter } = Trace, - Who = #{name => Name, type => Type, filter => Filter, payload_encode => PayloadEncode}, + Who = #{ + name => Name, + type => Type, + filter => Filter, + payload_encode => PayloadEncode, + formatter => Formatter + }, emqx_trace_handler:install(Who, debug, log_file(Name, Start)). stop_trace(Finished, Started) -> @@ -559,6 +568,8 @@ to_trace(#{end_at := EndAt} = Trace, Rec) -> {ok, _Sec} -> {error, "end_at time has already passed"} end; +to_trace(#{formatter := Formatter} = Trace, Rec) -> + to_trace(maps:remove(formatter, Trace), Rec#?TRACE{formatter = Formatter}); to_trace(_, Rec) -> {ok, Rec}. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index c69809052..d80fa70ec 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -183,6 +183,13 @@ filters(#{type := ip_address, filter := Filter, name := Name}) -> filters(#{type := ruleid, filter := Filter, name := Name}) -> [{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}]. +formatter(#{type := _Type, payload_encode := PayloadEncode, formatter := json}) -> + {emqx_trace_json_formatter, #{ + single_line => true, + max_size => unlimited, + depth => unlimited, + payload_encode => PayloadEncode + }}; formatter(#{type := _Type, payload_encode := PayloadEncode}) -> {emqx_trace_formatter, #{ %% template is for ?SLOG message not ?TRACE. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl new file mode 100644 index 000000000..b0548ac04 --- /dev/null +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -0,0 +1,113 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_trace_json_formatter). + +-include("emqx_mqtt.hrl"). + +-export([format/2]). + +%% logger_formatter:config/0 is not exported. +-type config() :: map(). + +%%%----------------------------------------------------------------- +%%% Callback Function +%%%----------------------------------------------------------------- + +-spec format(LogEvent, Config) -> unicode:chardata() when + LogEvent :: logger:log_event(), + Config :: config(). +format( + LogMap, + #{payload_encode := PEncode} +) -> + Time = emqx_utils_calendar:now_to_rfc3339(microsecond), + LogMap1 = LogMap#{time => Time}, + [format_log_map(LogMap1, PEncode), "\n"]. + +%%%----------------------------------------------------------------- +%%% Helper Functions +%%%----------------------------------------------------------------- + +format_log_map(Map, PEncode) -> + KeyValuePairs = format_key_value_pairs(maps:to_list(Map), PEncode, []), + ["{", KeyValuePairs, "}"]. + +format_key_value_pairs([], _PEncode, Acc) -> + lists:join(",", Acc); +format_key_value_pairs([{payload, Value} | Rest], PEncode, Acc) -> + FormattedPayload = format_payload(Value, PEncode), + FormattedPayloadEscaped = escape(FormattedPayload), + Pair = ["\"payload\": \"", FormattedPayloadEscaped, "\""], + format_key_value_pairs(Rest, PEncode, [Pair | Acc]); +format_key_value_pairs([{packet, Value} | Rest], PEncode, Acc) -> + Formatted = format_packet(Value, PEncode), + FormattedEscaped = escape(Formatted), + Pair = ["\"packet\": \"", FormattedEscaped, "\""], + format_key_value_pairs(Rest, PEncode, [Pair | Acc]); +format_key_value_pairs([{Key, Value} | Rest], PEncode, Acc) -> + FormattedKey = format_key(Key), + FormattedValue = format_value(Value, PEncode), + Pair = ["\"", FormattedKey, "\":", FormattedValue], + format_key_value_pairs(Rest, PEncode, [Pair | Acc]). + +format_key(Term) -> + %% Keys must be strings + String = emqx_logger_textfmt:try_format_unicode(Term), + escape(String). + +format_value(Map, PEncode) when is_map(Map) -> + format_log_map(Map, PEncode); +format_value(V, _PEncode) when is_integer(V) -> + integer_to_list(V); +format_value(V, _PEncode) when is_float(V) -> + float_to_list(V, [{decimals, 2}]); +format_value(true, _PEncode) -> + "true"; +format_value(false, _PEncode) -> + "false"; +format_value(V, _PEncode) -> + String = emqx_logger_textfmt:try_format_unicode(V), + ["\"", escape(String), "\""]. + +escape(IOList) -> + Bin = iolist_to_binary(IOList), + List = binary_to_list(Bin), + escape_list(List). + +escape_list([]) -> + []; +escape_list([$\n | Rest]) -> + %% 92 is backslash + [92, $n | escape_list(Rest)]; +escape_list([$" | Rest]) -> + [92, $" | escape_list(Rest)]; +escape_list([92 | Rest]) -> + [92, 92 | escape_list(Rest)]; +escape_list([X | Rest]) -> + [X | escape_list(Rest)]. + +format_packet(undefined, _) -> ""; +format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode). + +format_payload(undefined, _) -> + ""; +format_payload(_, hidden) -> + "******"; +format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> + unicode:characters_to_list(Payload); +format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload); +format_payload(<> = Payload, Type) -> + emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type). diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 19edc229d..f0efe9f85 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -314,6 +314,15 @@ fields(trace) -> example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}], required => false } + )}, + {formatter, + hoconsc:mk( + hoconsc:union([plain, json]), + #{ + description => ?DESC(trace_log_formatter), + example => plain, + required => false + } )} ]; fields(name) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index ef7b5a191..de74316f4 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("kernel/include/file.hrl"). -include_lib("stdlib/include/zip.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/logger.hrl"). %%-------------------------------------------------------------------- %% Setups @@ -169,9 +170,153 @@ t_http_test_rule_trace(_Config) -> {ok, Delete} = request_api(delete, api_path(["trace/", Name])), ?assertEqual(<<>>, Delete), + emqx_trace:clear(), unload(), ok. +t_http_test_json_formatter(_Config) -> + emqx_trace:clear(), + load(), + + Name = <<"testname">>, + Topic = <<"/x/y/z">>, + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"topic">>}, + {<<"topic">>, Topic}, + {<<"formatter">>, <<"json">>} + ], + + {ok, Create} = request_api(post, api_path("trace"), Trace), + ?assertMatch(#{<<"name">> := Name}, json(Create)), + + {ok, List} = request_api(get, api_path("trace")), + [Data] = json(List), + ?assertEqual(<<"json">>, maps:get(<<"formatter">>, Data)), + + {ok, List1} = request_api(get, api_path("trace")), + [Data1] = json(List1), + ?assertMatch( + #{ + <<"formatter">> := <<"json">> + }, + Data1 + ), + + %% Check that the log is empty + ok = emqx_trace_handler_SUITE:filesync(Name, topic), + {ok, _Detail} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), + %% Trace is empty which results in a not found error + {error, _} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), + + %% Start a client and send a message to get info to the log + ClientId = <<"my_client_id">>, + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + %% Normal message + emqtt:publish(Client, Topic, #{}, <<"log_this_message">>, [{qos, 2}]), + %% Escape line breaks + emqtt:publish(Client, Topic, #{}, <<"\nlog\nthis\nmessage">>, [{qos, 2}]), + %% Escape escape character + emqtt:publish(Client, Topic, #{}, <<"\\\nlog\n_\\n_this\nmessage\\">>, [{qos, 2}]), + %% Escape end of string + emqtt:publish(Client, Topic, #{}, <<"\"log_this_message\"">>, [{qos, 2}]), + + %% Manually create some trace messages to test the JSON formatter + + %% String key and value + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, "str" => "str"}), + %% Log Erlang term + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, term => {notjson}}), + %% Log Erlang term key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, {'notjson'} => term}), + %% Log Integer + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, integer => 42}), + %% Log Float + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, float => 1.2}), + %% Log Integer Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 42 => integer}), + %% Log Float Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 1.2 => float}), + %% Log Map Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, #{} => value}), + %% Empty submap + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{}}), + %% Non-empty submap + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{key => value}}), + %% Bolean values + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, true => true, false => false}), + %% Key value list + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + list => [ + {<<"key">>, <<"value">>}, + {<<"key2">>, <<"value2">>} + ] + }), + ok = emqx_trace_handler_SUITE:filesync(Name, topic), + {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), + {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), + {ok, [ + _Comment, + #zip_file{ + name = _ZipName, + info = #file_info{size = Size, type = regular, access = read_write} + } + ]} = zip:table(Bin), + ?assert(Size > 0), + {ok, [{_, LogContent}]} = zip:unzip(Bin, [memory]), + LogEntriesTrailing = string:split(LogContent, "\n", all), + LogEntries = lists:droplast(LogEntriesTrailing), + DecodedLogEntries = [ + begin + ct:pal("LOG ENTRY\n~s\n", [JSONEntry]), + emqx_utils_json:decode(JSONEntry) + end + || JSONEntry <- LogEntries + ], + ?assertMatch( + [ + #{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}}, + #{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}}, + #{ + <<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>} + }, + #{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}}, + #{<<"meta">> := #{<<"str">> := <<"str">>}}, + #{<<"meta">> := #{<<"term">> := <<"{notjson}">>}}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := #{<<"integer">> := 42}}, + #{<<"meta">> := #{<<"float">> := 1.2}}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := #{<<"sub">> := #{}}}, + #{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}}, + #{<<"meta">> := #{<<"true">> := <<"true">>, <<"false">> := <<"false">>}}, + #{ + <<"meta">> := #{ + <<"list">> := #{ + <<"key">> := <<"value">>, + <<"key2">> := <<"value2">> + } + } + } + | _ + ], + DecodedLogEntries + ), + {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))), + ?assertEqual(<<>>, Delete), + + {ok, List2} = request_api(get, api_path("trace")), + ?assertEqual([], json(List2)), + + ok = emqtt:disconnect(Client), + unload(), + emqx_trace:clear(), + ok. + t_create_failed(_Config) -> load(), Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}], diff --git a/rel/i18n/emqx_mgmt_api_trace.hocon b/rel/i18n/emqx_mgmt_api_trace.hocon index ba07d7d53..3b12caf97 100644 --- a/rel/i18n/emqx_mgmt_api_trace.hocon +++ b/rel/i18n/emqx_mgmt_api_trace.hocon @@ -115,4 +115,9 @@ current_trace_offset.desc: current_trace_offset.label: """Offset from the current trace position.""" +trace_log_formatter.desc: +"""The formatter that will be used to format the trace log entries. Set this to plain to format the log entries as plain text (default). Set it to json to format each log entry as a JSON object.""" +trace_log_formatter.label: +"""Trace Log Entry Formatter""" + }