From a6558740e8320e0d2af199ba6b251aebf1b46b04 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 11 Apr 2024 12:09:04 +0200 Subject: [PATCH] feat(emqx_trace): add JSON trace log entry formatter This commit makes it possible to select the JSON trace log entry formatter when crating a trace pattern. This will make it easier for the dashboard and automatic tests to parse the log entries. Fixes: EMQX-12025 (partly) --- apps/emqx/include/emqx_trace.hrl | 3 +- apps/emqx/src/emqx_trace/emqx_trace.erl | 19 ++- .../src/emqx_trace/emqx_trace_handler.erl | 7 + .../emqx_trace/emqx_trace_json_formatter.erl | 113 ++++++++++++++ .../src/emqx_mgmt_api_trace.erl | 9 ++ .../test/emqx_mgmt_api_trace_SUITE.erl | 145 ++++++++++++++++++ rel/i18n/emqx_mgmt_api_trace.hocon | 5 + 7 files changed, 296 insertions(+), 5 deletions(-) create mode 100644 apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 476794223..a2f77c044 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -32,7 +32,8 @@ payload_encode = text :: hex | text | hidden | '_', extra = #{} :: map() | '_', start_at :: integer() | undefined | '_', - end_at :: integer() | undefined | '_' + end_at :: integer() | undefined | '_', + formatter = plain :: plain | json }). -define(SHARD, ?COMMON_SHARD). diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 4ae973722..6a255806a 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -183,8 +183,10 @@ create(Trace) -> case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of true -> case to_trace(Trace) of - {ok, TraceRec} -> insert_new_trace(TraceRec); - {error, Reason} -> {error, Reason} + {ok, TraceRec} -> + insert_new_trace(TraceRec); + {error, Reason} -> + {error, Reason} end; false -> {error, @@ -392,9 +394,16 @@ start_trace(Trace) -> type = Type, filter = Filter, start_at = Start, - payload_encode = PayloadEncode + payload_encode = PayloadEncode, + formatter = Formatter } = Trace, - Who = #{name => Name, type => Type, filter => Filter, payload_encode => PayloadEncode}, + Who = #{ + name => Name, + type => Type, + filter => Filter, + payload_encode => PayloadEncode, + formatter => Formatter + }, emqx_trace_handler:install(Who, debug, log_file(Name, Start)). stop_trace(Finished, Started) -> @@ -559,6 +568,8 @@ to_trace(#{end_at := EndAt} = Trace, Rec) -> {ok, _Sec} -> {error, "end_at time has already passed"} end; +to_trace(#{formatter := Formatter} = Trace, Rec) -> + to_trace(maps:remove(formatter, Trace), Rec#?TRACE{formatter = Formatter}); to_trace(_, Rec) -> {ok, Rec}. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index c69809052..d80fa70ec 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -183,6 +183,13 @@ filters(#{type := ip_address, filter := Filter, name := Name}) -> filters(#{type := ruleid, filter := Filter, name := Name}) -> [{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}]. +formatter(#{type := _Type, payload_encode := PayloadEncode, formatter := json}) -> + {emqx_trace_json_formatter, #{ + single_line => true, + max_size => unlimited, + depth => unlimited, + payload_encode => PayloadEncode + }}; formatter(#{type := _Type, payload_encode := PayloadEncode}) -> {emqx_trace_formatter, #{ %% template is for ?SLOG message not ?TRACE. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl new file mode 100644 index 000000000..b0548ac04 --- /dev/null +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -0,0 +1,113 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_trace_json_formatter). + +-include("emqx_mqtt.hrl"). + +-export([format/2]). + +%% logger_formatter:config/0 is not exported. +-type config() :: map(). + +%%%----------------------------------------------------------------- +%%% Callback Function +%%%----------------------------------------------------------------- + +-spec format(LogEvent, Config) -> unicode:chardata() when + LogEvent :: logger:log_event(), + Config :: config(). +format( + LogMap, + #{payload_encode := PEncode} +) -> + Time = emqx_utils_calendar:now_to_rfc3339(microsecond), + LogMap1 = LogMap#{time => Time}, + [format_log_map(LogMap1, PEncode), "\n"]. + +%%%----------------------------------------------------------------- +%%% Helper Functions +%%%----------------------------------------------------------------- + +format_log_map(Map, PEncode) -> + KeyValuePairs = format_key_value_pairs(maps:to_list(Map), PEncode, []), + ["{", KeyValuePairs, "}"]. + +format_key_value_pairs([], _PEncode, Acc) -> + lists:join(",", Acc); +format_key_value_pairs([{payload, Value} | Rest], PEncode, Acc) -> + FormattedPayload = format_payload(Value, PEncode), + FormattedPayloadEscaped = escape(FormattedPayload), + Pair = ["\"payload\": \"", FormattedPayloadEscaped, "\""], + format_key_value_pairs(Rest, PEncode, [Pair | Acc]); +format_key_value_pairs([{packet, Value} | Rest], PEncode, Acc) -> + Formatted = format_packet(Value, PEncode), + FormattedEscaped = escape(Formatted), + Pair = ["\"packet\": \"", FormattedEscaped, "\""], + format_key_value_pairs(Rest, PEncode, [Pair | Acc]); +format_key_value_pairs([{Key, Value} | Rest], PEncode, Acc) -> + FormattedKey = format_key(Key), + FormattedValue = format_value(Value, PEncode), + Pair = ["\"", FormattedKey, "\":", FormattedValue], + format_key_value_pairs(Rest, PEncode, [Pair | Acc]). + +format_key(Term) -> + %% Keys must be strings + String = emqx_logger_textfmt:try_format_unicode(Term), + escape(String). + +format_value(Map, PEncode) when is_map(Map) -> + format_log_map(Map, PEncode); +format_value(V, _PEncode) when is_integer(V) -> + integer_to_list(V); +format_value(V, _PEncode) when is_float(V) -> + float_to_list(V, [{decimals, 2}]); +format_value(true, _PEncode) -> + "true"; +format_value(false, _PEncode) -> + "false"; +format_value(V, _PEncode) -> + String = emqx_logger_textfmt:try_format_unicode(V), + ["\"", escape(String), "\""]. + +escape(IOList) -> + Bin = iolist_to_binary(IOList), + List = binary_to_list(Bin), + escape_list(List). + +escape_list([]) -> + []; +escape_list([$\n | Rest]) -> + %% 92 is backslash + [92, $n | escape_list(Rest)]; +escape_list([$" | Rest]) -> + [92, $" | escape_list(Rest)]; +escape_list([92 | Rest]) -> + [92, 92 | escape_list(Rest)]; +escape_list([X | Rest]) -> + [X | escape_list(Rest)]. + +format_packet(undefined, _) -> ""; +format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode). + +format_payload(undefined, _) -> + ""; +format_payload(_, hidden) -> + "******"; +format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> + unicode:characters_to_list(Payload); +format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload); +format_payload(<> = Payload, Type) -> + emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type). diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 19edc229d..f0efe9f85 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -314,6 +314,15 @@ fields(trace) -> example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}], required => false } + )}, + {formatter, + hoconsc:mk( + hoconsc:union([plain, json]), + #{ + description => ?DESC(trace_log_formatter), + example => plain, + required => false + } )} ]; fields(name) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index ef7b5a191..de74316f4 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("kernel/include/file.hrl"). -include_lib("stdlib/include/zip.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/logger.hrl"). %%-------------------------------------------------------------------- %% Setups @@ -169,9 +170,153 @@ t_http_test_rule_trace(_Config) -> {ok, Delete} = request_api(delete, api_path(["trace/", Name])), ?assertEqual(<<>>, Delete), + emqx_trace:clear(), unload(), ok. +t_http_test_json_formatter(_Config) -> + emqx_trace:clear(), + load(), + + Name = <<"testname">>, + Topic = <<"/x/y/z">>, + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"topic">>}, + {<<"topic">>, Topic}, + {<<"formatter">>, <<"json">>} + ], + + {ok, Create} = request_api(post, api_path("trace"), Trace), + ?assertMatch(#{<<"name">> := Name}, json(Create)), + + {ok, List} = request_api(get, api_path("trace")), + [Data] = json(List), + ?assertEqual(<<"json">>, maps:get(<<"formatter">>, Data)), + + {ok, List1} = request_api(get, api_path("trace")), + [Data1] = json(List1), + ?assertMatch( + #{ + <<"formatter">> := <<"json">> + }, + Data1 + ), + + %% Check that the log is empty + ok = emqx_trace_handler_SUITE:filesync(Name, topic), + {ok, _Detail} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), + %% Trace is empty which results in a not found error + {error, _} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), + + %% Start a client and send a message to get info to the log + ClientId = <<"my_client_id">>, + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + %% Normal message + emqtt:publish(Client, Topic, #{}, <<"log_this_message">>, [{qos, 2}]), + %% Escape line breaks + emqtt:publish(Client, Topic, #{}, <<"\nlog\nthis\nmessage">>, [{qos, 2}]), + %% Escape escape character + emqtt:publish(Client, Topic, #{}, <<"\\\nlog\n_\\n_this\nmessage\\">>, [{qos, 2}]), + %% Escape end of string + emqtt:publish(Client, Topic, #{}, <<"\"log_this_message\"">>, [{qos, 2}]), + + %% Manually create some trace messages to test the JSON formatter + + %% String key and value + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, "str" => "str"}), + %% Log Erlang term + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, term => {notjson}}), + %% Log Erlang term key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, {'notjson'} => term}), + %% Log Integer + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, integer => 42}), + %% Log Float + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, float => 1.2}), + %% Log Integer Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 42 => integer}), + %% Log Float Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 1.2 => float}), + %% Log Map Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, #{} => value}), + %% Empty submap + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{}}), + %% Non-empty submap + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{key => value}}), + %% Bolean values + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, true => true, false => false}), + %% Key value list + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + list => [ + {<<"key">>, <<"value">>}, + {<<"key2">>, <<"value2">>} + ] + }), + ok = emqx_trace_handler_SUITE:filesync(Name, topic), + {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), + {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), + {ok, [ + _Comment, + #zip_file{ + name = _ZipName, + info = #file_info{size = Size, type = regular, access = read_write} + } + ]} = zip:table(Bin), + ?assert(Size > 0), + {ok, [{_, LogContent}]} = zip:unzip(Bin, [memory]), + LogEntriesTrailing = string:split(LogContent, "\n", all), + LogEntries = lists:droplast(LogEntriesTrailing), + DecodedLogEntries = [ + begin + ct:pal("LOG ENTRY\n~s\n", [JSONEntry]), + emqx_utils_json:decode(JSONEntry) + end + || JSONEntry <- LogEntries + ], + ?assertMatch( + [ + #{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}}, + #{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}}, + #{ + <<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>} + }, + #{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}}, + #{<<"meta">> := #{<<"str">> := <<"str">>}}, + #{<<"meta">> := #{<<"term">> := <<"{notjson}">>}}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := #{<<"integer">> := 42}}, + #{<<"meta">> := #{<<"float">> := 1.2}}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := #{<<"sub">> := #{}}}, + #{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}}, + #{<<"meta">> := #{<<"true">> := <<"true">>, <<"false">> := <<"false">>}}, + #{ + <<"meta">> := #{ + <<"list">> := #{ + <<"key">> := <<"value">>, + <<"key2">> := <<"value2">> + } + } + } + | _ + ], + DecodedLogEntries + ), + {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))), + ?assertEqual(<<>>, Delete), + + {ok, List2} = request_api(get, api_path("trace")), + ?assertEqual([], json(List2)), + + ok = emqtt:disconnect(Client), + unload(), + emqx_trace:clear(), + ok. + t_create_failed(_Config) -> load(), Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}], diff --git a/rel/i18n/emqx_mgmt_api_trace.hocon b/rel/i18n/emqx_mgmt_api_trace.hocon index ba07d7d53..3b12caf97 100644 --- a/rel/i18n/emqx_mgmt_api_trace.hocon +++ b/rel/i18n/emqx_mgmt_api_trace.hocon @@ -115,4 +115,9 @@ current_trace_offset.desc: current_trace_offset.label: """Offset from the current trace position.""" +trace_log_formatter.desc: +"""The formatter that will be used to format the trace log entries. Set this to plain to format the log entries as plain text (default). Set it to json to format each log entry as a JSON object.""" +trace_log_formatter.label: +"""Trace Log Entry Formatter""" + }