feat(emqx_trace): add JSON trace log entry formatter
This commit makes it possible to select the JSON trace log entry formatter when crating a trace pattern. This will make it easier for the dashboard and automatic tests to parse the log entries. Fixes: EMQX-12025 (partly)
This commit is contained in:
parent
2a2fadfbff
commit
a6558740e8
|
@ -32,7 +32,8 @@
|
||||||
payload_encode = text :: hex | text | hidden | '_',
|
payload_encode = text :: hex | text | hidden | '_',
|
||||||
extra = #{} :: map() | '_',
|
extra = #{} :: map() | '_',
|
||||||
start_at :: integer() | undefined | '_',
|
start_at :: integer() | undefined | '_',
|
||||||
end_at :: integer() | undefined | '_'
|
end_at :: integer() | undefined | '_',
|
||||||
|
formatter = plain :: plain | json
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(SHARD, ?COMMON_SHARD).
|
-define(SHARD, ?COMMON_SHARD).
|
||||||
|
|
|
@ -183,8 +183,10 @@ create(Trace) ->
|
||||||
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
|
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
|
||||||
true ->
|
true ->
|
||||||
case to_trace(Trace) of
|
case to_trace(Trace) of
|
||||||
{ok, TraceRec} -> insert_new_trace(TraceRec);
|
{ok, TraceRec} ->
|
||||||
{error, Reason} -> {error, Reason}
|
insert_new_trace(TraceRec);
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
{error,
|
{error,
|
||||||
|
@ -392,9 +394,16 @@ start_trace(Trace) ->
|
||||||
type = Type,
|
type = Type,
|
||||||
filter = Filter,
|
filter = Filter,
|
||||||
start_at = Start,
|
start_at = Start,
|
||||||
payload_encode = PayloadEncode
|
payload_encode = PayloadEncode,
|
||||||
|
formatter = Formatter
|
||||||
} = Trace,
|
} = Trace,
|
||||||
Who = #{name => Name, type => Type, filter => Filter, payload_encode => PayloadEncode},
|
Who = #{
|
||||||
|
name => Name,
|
||||||
|
type => Type,
|
||||||
|
filter => Filter,
|
||||||
|
payload_encode => PayloadEncode,
|
||||||
|
formatter => Formatter
|
||||||
|
},
|
||||||
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
|
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
|
||||||
|
|
||||||
stop_trace(Finished, Started) ->
|
stop_trace(Finished, Started) ->
|
||||||
|
@ -559,6 +568,8 @@ to_trace(#{end_at := EndAt} = Trace, Rec) ->
|
||||||
{ok, _Sec} ->
|
{ok, _Sec} ->
|
||||||
{error, "end_at time has already passed"}
|
{error, "end_at time has already passed"}
|
||||||
end;
|
end;
|
||||||
|
to_trace(#{formatter := Formatter} = Trace, Rec) ->
|
||||||
|
to_trace(maps:remove(formatter, Trace), Rec#?TRACE{formatter = Formatter});
|
||||||
to_trace(_, Rec) ->
|
to_trace(_, Rec) ->
|
||||||
{ok, Rec}.
|
{ok, Rec}.
|
||||||
|
|
||||||
|
|
|
@ -183,6 +183,13 @@ filters(#{type := ip_address, filter := Filter, name := Name}) ->
|
||||||
filters(#{type := ruleid, filter := Filter, name := Name}) ->
|
filters(#{type := ruleid, filter := Filter, name := Name}) ->
|
||||||
[{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}].
|
[{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}].
|
||||||
|
|
||||||
|
formatter(#{type := _Type, payload_encode := PayloadEncode, formatter := json}) ->
|
||||||
|
{emqx_trace_json_formatter, #{
|
||||||
|
single_line => true,
|
||||||
|
max_size => unlimited,
|
||||||
|
depth => unlimited,
|
||||||
|
payload_encode => PayloadEncode
|
||||||
|
}};
|
||||||
formatter(#{type := _Type, payload_encode := PayloadEncode}) ->
|
formatter(#{type := _Type, payload_encode := PayloadEncode}) ->
|
||||||
{emqx_trace_formatter, #{
|
{emqx_trace_formatter, #{
|
||||||
%% template is for ?SLOG message not ?TRACE.
|
%% template is for ?SLOG message not ?TRACE.
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_trace_json_formatter).
|
||||||
|
|
||||||
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
|
-export([format/2]).
|
||||||
|
|
||||||
|
%% logger_formatter:config/0 is not exported.
|
||||||
|
-type config() :: map().
|
||||||
|
|
||||||
|
%%%-----------------------------------------------------------------
|
||||||
|
%%% Callback Function
|
||||||
|
%%%-----------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec format(LogEvent, Config) -> unicode:chardata() when
|
||||||
|
LogEvent :: logger:log_event(),
|
||||||
|
Config :: config().
|
||||||
|
format(
|
||||||
|
LogMap,
|
||||||
|
#{payload_encode := PEncode}
|
||||||
|
) ->
|
||||||
|
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
|
||||||
|
LogMap1 = LogMap#{time => Time},
|
||||||
|
[format_log_map(LogMap1, PEncode), "\n"].
|
||||||
|
|
||||||
|
%%%-----------------------------------------------------------------
|
||||||
|
%%% Helper Functions
|
||||||
|
%%%-----------------------------------------------------------------
|
||||||
|
|
||||||
|
format_log_map(Map, PEncode) ->
|
||||||
|
KeyValuePairs = format_key_value_pairs(maps:to_list(Map), PEncode, []),
|
||||||
|
["{", KeyValuePairs, "}"].
|
||||||
|
|
||||||
|
format_key_value_pairs([], _PEncode, Acc) ->
|
||||||
|
lists:join(",", Acc);
|
||||||
|
format_key_value_pairs([{payload, Value} | Rest], PEncode, Acc) ->
|
||||||
|
FormattedPayload = format_payload(Value, PEncode),
|
||||||
|
FormattedPayloadEscaped = escape(FormattedPayload),
|
||||||
|
Pair = ["\"payload\": \"", FormattedPayloadEscaped, "\""],
|
||||||
|
format_key_value_pairs(Rest, PEncode, [Pair | Acc]);
|
||||||
|
format_key_value_pairs([{packet, Value} | Rest], PEncode, Acc) ->
|
||||||
|
Formatted = format_packet(Value, PEncode),
|
||||||
|
FormattedEscaped = escape(Formatted),
|
||||||
|
Pair = ["\"packet\": \"", FormattedEscaped, "\""],
|
||||||
|
format_key_value_pairs(Rest, PEncode, [Pair | Acc]);
|
||||||
|
format_key_value_pairs([{Key, Value} | Rest], PEncode, Acc) ->
|
||||||
|
FormattedKey = format_key(Key),
|
||||||
|
FormattedValue = format_value(Value, PEncode),
|
||||||
|
Pair = ["\"", FormattedKey, "\":", FormattedValue],
|
||||||
|
format_key_value_pairs(Rest, PEncode, [Pair | Acc]).
|
||||||
|
|
||||||
|
format_key(Term) ->
|
||||||
|
%% Keys must be strings
|
||||||
|
String = emqx_logger_textfmt:try_format_unicode(Term),
|
||||||
|
escape(String).
|
||||||
|
|
||||||
|
format_value(Map, PEncode) when is_map(Map) ->
|
||||||
|
format_log_map(Map, PEncode);
|
||||||
|
format_value(V, _PEncode) when is_integer(V) ->
|
||||||
|
integer_to_list(V);
|
||||||
|
format_value(V, _PEncode) when is_float(V) ->
|
||||||
|
float_to_list(V, [{decimals, 2}]);
|
||||||
|
format_value(true, _PEncode) ->
|
||||||
|
"true";
|
||||||
|
format_value(false, _PEncode) ->
|
||||||
|
"false";
|
||||||
|
format_value(V, _PEncode) ->
|
||||||
|
String = emqx_logger_textfmt:try_format_unicode(V),
|
||||||
|
["\"", escape(String), "\""].
|
||||||
|
|
||||||
|
escape(IOList) ->
|
||||||
|
Bin = iolist_to_binary(IOList),
|
||||||
|
List = binary_to_list(Bin),
|
||||||
|
escape_list(List).
|
||||||
|
|
||||||
|
escape_list([]) ->
|
||||||
|
[];
|
||||||
|
escape_list([$\n | Rest]) ->
|
||||||
|
%% 92 is backslash
|
||||||
|
[92, $n | escape_list(Rest)];
|
||||||
|
escape_list([$" | Rest]) ->
|
||||||
|
[92, $" | escape_list(Rest)];
|
||||||
|
escape_list([92 | Rest]) ->
|
||||||
|
[92, 92 | escape_list(Rest)];
|
||||||
|
escape_list([X | Rest]) ->
|
||||||
|
[X | escape_list(Rest)].
|
||||||
|
|
||||||
|
format_packet(undefined, _) -> "";
|
||||||
|
format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode).
|
||||||
|
|
||||||
|
format_payload(undefined, _) ->
|
||||||
|
"";
|
||||||
|
format_payload(_, hidden) ->
|
||||||
|
"******";
|
||||||
|
format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
|
||||||
|
unicode:characters_to_list(Payload);
|
||||||
|
format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload);
|
||||||
|
format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
|
||||||
|
emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type).
|
|
@ -314,6 +314,15 @@ fields(trace) ->
|
||||||
example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}],
|
example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}],
|
||||||
required => false
|
required => false
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{formatter,
|
||||||
|
hoconsc:mk(
|
||||||
|
hoconsc:union([plain, json]),
|
||||||
|
#{
|
||||||
|
description => ?DESC(trace_log_formatter),
|
||||||
|
example => plain,
|
||||||
|
required => false
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(name) ->
|
fields(name) ->
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
-include_lib("stdlib/include/zip.hrl").
|
-include_lib("stdlib/include/zip.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Setups
|
%% Setups
|
||||||
|
@ -169,9 +170,153 @@ t_http_test_rule_trace(_Config) ->
|
||||||
{ok, Delete} = request_api(delete, api_path(["trace/", Name])),
|
{ok, Delete} = request_api(delete, api_path(["trace/", Name])),
|
||||||
?assertEqual(<<>>, Delete),
|
?assertEqual(<<>>, Delete),
|
||||||
|
|
||||||
|
emqx_trace:clear(),
|
||||||
unload(),
|
unload(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_http_test_json_formatter(_Config) ->
|
||||||
|
emqx_trace:clear(),
|
||||||
|
load(),
|
||||||
|
|
||||||
|
Name = <<"testname">>,
|
||||||
|
Topic = <<"/x/y/z">>,
|
||||||
|
Trace = [
|
||||||
|
{<<"name">>, Name},
|
||||||
|
{<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, Topic},
|
||||||
|
{<<"formatter">>, <<"json">>}
|
||||||
|
],
|
||||||
|
|
||||||
|
{ok, Create} = request_api(post, api_path("trace"), Trace),
|
||||||
|
?assertMatch(#{<<"name">> := Name}, json(Create)),
|
||||||
|
|
||||||
|
{ok, List} = request_api(get, api_path("trace")),
|
||||||
|
[Data] = json(List),
|
||||||
|
?assertEqual(<<"json">>, maps:get(<<"formatter">>, Data)),
|
||||||
|
|
||||||
|
{ok, List1} = request_api(get, api_path("trace")),
|
||||||
|
[Data1] = json(List1),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"formatter">> := <<"json">>
|
||||||
|
},
|
||||||
|
Data1
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Check that the log is empty
|
||||||
|
ok = emqx_trace_handler_SUITE:filesync(Name, topic),
|
||||||
|
{ok, _Detail} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
|
||||||
|
%% Trace is empty which results in a not found error
|
||||||
|
{error, _} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
|
||||||
|
|
||||||
|
%% Start a client and send a message to get info to the log
|
||||||
|
ClientId = <<"my_client_id">>,
|
||||||
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
%% Normal message
|
||||||
|
emqtt:publish(Client, Topic, #{}, <<"log_this_message">>, [{qos, 2}]),
|
||||||
|
%% Escape line breaks
|
||||||
|
emqtt:publish(Client, Topic, #{}, <<"\nlog\nthis\nmessage">>, [{qos, 2}]),
|
||||||
|
%% Escape escape character
|
||||||
|
emqtt:publish(Client, Topic, #{}, <<"\\\nlog\n_\\n_this\nmessage\\">>, [{qos, 2}]),
|
||||||
|
%% Escape end of string
|
||||||
|
emqtt:publish(Client, Topic, #{}, <<"\"log_this_message\"">>, [{qos, 2}]),
|
||||||
|
|
||||||
|
%% Manually create some trace messages to test the JSON formatter
|
||||||
|
|
||||||
|
%% String key and value
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, "str" => "str"}),
|
||||||
|
%% Log Erlang term
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, term => {notjson}}),
|
||||||
|
%% Log Erlang term key
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, {'notjson'} => term}),
|
||||||
|
%% Log Integer
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, integer => 42}),
|
||||||
|
%% Log Float
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, float => 1.2}),
|
||||||
|
%% Log Integer Key
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 42 => integer}),
|
||||||
|
%% Log Float Key
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 1.2 => float}),
|
||||||
|
%% Log Map Key
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, #{} => value}),
|
||||||
|
%% Empty submap
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{}}),
|
||||||
|
%% Non-empty submap
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{key => value}}),
|
||||||
|
%% Bolean values
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, true => true, false => false}),
|
||||||
|
%% Key value list
|
||||||
|
?TRACE("CUSTOM", "my_log_msg", #{
|
||||||
|
topic => Topic,
|
||||||
|
list => [
|
||||||
|
{<<"key">>, <<"value">>},
|
||||||
|
{<<"key2">>, <<"value2">>}
|
||||||
|
]
|
||||||
|
}),
|
||||||
|
ok = emqx_trace_handler_SUITE:filesync(Name, topic),
|
||||||
|
{ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
|
||||||
|
{ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
|
||||||
|
{ok, [
|
||||||
|
_Comment,
|
||||||
|
#zip_file{
|
||||||
|
name = _ZipName,
|
||||||
|
info = #file_info{size = Size, type = regular, access = read_write}
|
||||||
|
}
|
||||||
|
]} = zip:table(Bin),
|
||||||
|
?assert(Size > 0),
|
||||||
|
{ok, [{_, LogContent}]} = zip:unzip(Bin, [memory]),
|
||||||
|
LogEntriesTrailing = string:split(LogContent, "\n", all),
|
||||||
|
LogEntries = lists:droplast(LogEntriesTrailing),
|
||||||
|
DecodedLogEntries = [
|
||||||
|
begin
|
||||||
|
ct:pal("LOG ENTRY\n~s\n", [JSONEntry]),
|
||||||
|
emqx_utils_json:decode(JSONEntry)
|
||||||
|
end
|
||||||
|
|| JSONEntry <- LogEntries
|
||||||
|
],
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}},
|
||||||
|
#{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}},
|
||||||
|
#{
|
||||||
|
<<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>}
|
||||||
|
},
|
||||||
|
#{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}},
|
||||||
|
#{<<"meta">> := #{<<"str">> := <<"str">>}},
|
||||||
|
#{<<"meta">> := #{<<"term">> := <<"{notjson}">>}},
|
||||||
|
#{<<"meta">> := <<_/binary>>},
|
||||||
|
#{<<"meta">> := #{<<"integer">> := 42}},
|
||||||
|
#{<<"meta">> := #{<<"float">> := 1.2}},
|
||||||
|
#{<<"meta">> := <<_/binary>>},
|
||||||
|
#{<<"meta">> := <<_/binary>>},
|
||||||
|
#{<<"meta">> := <<_/binary>>},
|
||||||
|
#{<<"meta">> := #{<<"sub">> := #{}}},
|
||||||
|
#{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}},
|
||||||
|
#{<<"meta">> := #{<<"true">> := <<"true">>, <<"false">> := <<"false">>}},
|
||||||
|
#{
|
||||||
|
<<"meta">> := #{
|
||||||
|
<<"list">> := #{
|
||||||
|
<<"key">> := <<"value">>,
|
||||||
|
<<"key2">> := <<"value2">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
| _
|
||||||
|
],
|
||||||
|
DecodedLogEntries
|
||||||
|
),
|
||||||
|
{ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))),
|
||||||
|
?assertEqual(<<>>, Delete),
|
||||||
|
|
||||||
|
{ok, List2} = request_api(get, api_path("trace")),
|
||||||
|
?assertEqual([], json(List2)),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(Client),
|
||||||
|
unload(),
|
||||||
|
emqx_trace:clear(),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_create_failed(_Config) ->
|
t_create_failed(_Config) ->
|
||||||
load(),
|
load(),
|
||||||
Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}],
|
Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}],
|
||||||
|
|
|
@ -115,4 +115,9 @@ current_trace_offset.desc:
|
||||||
current_trace_offset.label:
|
current_trace_offset.label:
|
||||||
"""Offset from the current trace position."""
|
"""Offset from the current trace position."""
|
||||||
|
|
||||||
|
trace_log_formatter.desc:
|
||||||
|
"""The formatter that will be used to format the trace log entries. Set this to plain to format the log entries as plain text (default). Set it to json to format each log entry as a JSON object."""
|
||||||
|
trace_log_formatter.label:
|
||||||
|
"""Trace Log Entry Formatter"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue