Merge pull request #12863 from kjellwinblad/kjell/new_trace_log_formatter/EMQX-12025

feat(emqx_trace): add JSON trace log entry formatter
This commit is contained in:
Kjell Winblad 2024-04-22 06:25:18 +02:00 committed by GitHub
commit 6e99f01ecd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 465 additions and 78 deletions

View File

@ -30,7 +30,7 @@
| '_', | '_',
enable = true :: boolean() | '_', enable = true :: boolean() | '_',
payload_encode = text :: hex | text | hidden | '_', payload_encode = text :: hex | text | hidden | '_',
extra = #{} :: map() | '_', extra = #{formatter => text} :: #{formatter => text | json} | '_',
start_at :: integer() | undefined | '_', start_at :: integer() | undefined | '_',
end_at :: integer() | undefined | '_' end_at :: integer() | undefined | '_'
}). }).

View File

@ -88,8 +88,14 @@ unsubscribe(Topic, SubOpts) ->
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
rendered_action_template(ActionID, RenderResult) -> rendered_action_template(ActionID, RenderResult) ->
Msg = lists:flatten(io_lib:format("action_template_rendered(~ts)", [ActionID])), TraceResult = ?TRACE(
TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult), "QUERY_RENDER",
"action_template_rendered",
#{
result => RenderResult,
action_id => ActionID
}
),
case logger:get_process_metadata() of case logger:get_process_metadata() of
#{stop_action_after_render := true} -> #{stop_action_after_render := true} ->
%% We throw an unrecoverable error to stop action before the %% We throw an unrecoverable error to stop action before the
@ -183,8 +189,10 @@ create(Trace) ->
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
true -> true ->
case to_trace(Trace) of case to_trace(Trace) of
{ok, TraceRec} -> insert_new_trace(TraceRec); {ok, TraceRec} ->
{error, Reason} -> {error, Reason} insert_new_trace(TraceRec);
{error, Reason} ->
{error, Reason}
end; end;
false -> false ->
{error, {error,
@ -246,7 +254,11 @@ format(Traces) ->
lists:map( lists:map(
fun(Trace0 = #?TRACE{}) -> fun(Trace0 = #?TRACE{}) ->
[_ | Values] = tuple_to_list(Trace0), [_ | Values] = tuple_to_list(Trace0),
maps:from_list(lists:zip(Fields, Values)) Map0 = maps:from_list(lists:zip(Fields, Values)),
Extra = maps:get(extra, Map0, #{}),
Formatter = maps:get(formatter, Extra, text),
Map1 = Map0#{formatter => Formatter},
maps:remove(extra, Map1)
end, end,
Traces Traces
). ).
@ -392,9 +404,17 @@ start_trace(Trace) ->
type = Type, type = Type,
filter = Filter, filter = Filter,
start_at = Start, start_at = Start,
payload_encode = PayloadEncode payload_encode = PayloadEncode,
extra = Extra
} = Trace, } = Trace,
Who = #{name => Name, type => Type, filter => Filter, payload_encode => PayloadEncode}, Formatter = maps:get(formatter, Extra, text),
Who = #{
name => Name,
type => Type,
filter => Filter,
payload_encode => PayloadEncode,
formatter => Formatter
},
emqx_trace_handler:install(Who, debug, log_file(Name, Start)). emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
stop_trace(Finished, Started) -> stop_trace(Finished, Started) ->
@ -559,6 +579,12 @@ to_trace(#{end_at := EndAt} = Trace, Rec) ->
{ok, _Sec} -> {ok, _Sec} ->
{error, "end_at time has already passed"} {error, "end_at time has already passed"}
end; end;
to_trace(#{formatter := Formatter} = Trace, Rec) ->
Extra = Rec#?TRACE.extra,
to_trace(
maps:remove(formatter, Trace),
Rec#?TRACE{extra = Extra#{formatter => Formatter}}
);
to_trace(_, Rec) -> to_trace(_, Rec) ->
{ok, Rec}. {ok, Rec}.

View File

@ -27,6 +27,7 @@
install/3, install/3,
install/4, install/4,
install/5, install/5,
install/6,
uninstall/1, uninstall/1,
uninstall/2 uninstall/2
]). ]).
@ -46,7 +47,8 @@
name := binary(), name := binary(),
type := clientid | topic | ip_address, type := clientid | topic | ip_address,
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address(), filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address(),
payload_encode := text | hidden | hex payload_encode := text | hidden | hex,
formatter => json | text
}. }.
-define(CONFIG(_LogFile_), #{ -define(CONFIG(_LogFile_), #{
@ -69,17 +71,29 @@
Type :: clientid | topic | ip_address, Type :: clientid | topic | ip_address,
Filter :: emqx_types:clientid() | emqx_types:topic() | string(), Filter :: emqx_types:clientid() | emqx_types:topic() | string(),
Level :: logger:level() | all, Level :: logger:level() | all,
LogFilePath :: string() LogFilePath :: string(),
Formatter :: text | json
) -> ok | {error, term()}. ) -> ok | {error, term()}.
install(Name, Type, Filter, Level, LogFile) -> install(Name, Type, Filter, Level, LogFile, Formatter) ->
Who = #{ Who = #{
type => Type, type => Type,
filter => ensure_bin(Filter), filter => ensure_bin(Filter),
name => ensure_bin(Name), name => ensure_bin(Name),
payload_encode => payload_encode() payload_encode => payload_encode(),
formatter => Formatter
}, },
install(Who, Level, LogFile). install(Who, Level, LogFile).
-spec install(
Name :: binary() | list(),
Type :: clientid | topic | ip_address,
Filter :: emqx_types:clientid() | emqx_types:topic() | string(),
Level :: logger:level() | all,
LogFilePath :: string()
) -> ok | {error, term()}.
install(Name, Type, Filter, Level, LogFile) ->
install(Name, Type, Filter, Level, LogFile, text).
-spec install( -spec install(
Type :: clientid | topic | ip_address, Type :: clientid | topic | ip_address,
Filter :: emqx_types:clientid() | emqx_types:topic() | string(), Filter :: emqx_types:clientid() | emqx_types:topic() | string(),
@ -183,6 +197,10 @@ filters(#{type := ip_address, filter := Filter, name := Name}) ->
filters(#{type := ruleid, filter := Filter, name := Name}) -> filters(#{type := ruleid, filter := Filter, name := Name}) ->
[{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}]. [{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}].
formatter(#{type := _Type, payload_encode := PayloadEncode, formatter := json}) ->
{emqx_trace_json_formatter, #{
payload_encode => PayloadEncode
}};
formatter(#{type := _Type, payload_encode := PayloadEncode}) -> formatter(#{type := _Type, payload_encode := PayloadEncode}) ->
{emqx_trace_formatter, #{ {emqx_trace_formatter, #{
%% template is for ?SLOG message not ?TRACE. %% template is for ?SLOG message not ?TRACE.

View File

@ -0,0 +1,130 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_trace_json_formatter).
-include("emqx_mqtt.hrl").
-export([format/2]).
%% logger_formatter:config/0 is not exported.
-type config() :: map().
%%%-----------------------------------------------------------------
%%% Callback Function
%%%-----------------------------------------------------------------
-spec format(LogEvent, Config) -> unicode:chardata() when
LogEvent :: logger:log_event(),
Config :: config().
format(
LogMap,
#{payload_encode := PEncode}
) ->
%% We just make some basic transformations on the input LogMap and then do
%% an external call to create the JSON text
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
LogMap1 = LogMap#{time => Time},
LogMap2 = prepare_log_map(LogMap1, PEncode),
[emqx_logger_jsonfmt:best_effort_json(LogMap2, [force_utf8]), "\n"].
%%%-----------------------------------------------------------------
%%% Helper Functions
%%%-----------------------------------------------------------------
prepare_log_map(LogMap, PEncode) ->
NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
maps:from_list(NewKeyValuePairs).
prepare_key_value(payload = K, V, PEncode) ->
NewV =
try
format_payload(V, PEncode)
catch
_:_ ->
V
end,
{K, NewV};
prepare_key_value(packet = K, V, PEncode) ->
NewV =
try
format_packet(V, PEncode)
catch
_:_ ->
V
end,
{K, NewV};
prepare_key_value(rule_ids = K, V, _PEncode) ->
NewV =
try
format_map_set_to_list(V)
catch
_:_ ->
V
end,
{K, NewV};
prepare_key_value(client_ids = K, V, _PEncode) ->
NewV =
try
format_map_set_to_list(V)
catch
_:_ ->
V
end,
{K, NewV};
prepare_key_value(action_id = K, V, _PEncode) ->
try
{action_info, format_action_info(V)}
catch
_:_ ->
{K, V}
end;
prepare_key_value(K, V, PEncode) when is_map(V) ->
{K, prepare_log_map(V, PEncode)};
prepare_key_value(K, V, _PEncode) ->
{K, V}.
format_packet(undefined, _) -> "";
format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode).
format_payload(undefined, _) ->
"";
format_payload(_, hidden) ->
"******";
format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
unicode:characters_to_list(Payload);
format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload);
format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type).
format_map_set_to_list(Map) ->
Items = [
begin
%% Assert that it is really a map set
true = V,
%% Assert that the keys have the expected type
true = is_binary(K),
K
end
|| {K, V} <- maps:to_list(Map)
],
lists:sort(Items).
format_action_info(V) ->
[<<"action">>, Type, Name | _] = binary:split(V, <<":">>, [global]),
#{
type => Type,
name => Name
}.

View File

@ -96,7 +96,7 @@ t_base_create_delete(_Config) ->
start_at => Now, start_at => Now,
end_at => Now + 30 * 60, end_at => Now + 30 * 60,
payload_encode => text, payload_encode => text,
extra => #{} formatter => text
} }
], ],
?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])), ?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])),
@ -511,4 +511,13 @@ build_old_trace_data() ->
reload() -> reload() ->
catch ok = gen_server:stop(emqx_trace), catch ok = gen_server:stop(emqx_trace),
{ok, _Pid} = emqx_trace:start_link(). case emqx_trace:start_link() of
{ok, _Pid} = Res ->
Res;
NotOKRes ->
ct:pal(
"emqx_trace:start_link() gave result: ~p\n"
"(perhaps it is already started)",
[NotOKRes]
)
end.

View File

@ -316,7 +316,7 @@ on_query(InstId, {send_message, Msg}, State) ->
ClientId = maps:get(clientid, Msg, undefined), ClientId = maps:get(clientid, Msg, undefined),
on_query( on_query(
InstId, InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, {undefined, ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State State
) )
end; end;
@ -346,19 +346,19 @@ on_query(
ClientId = clientid(Msg), ClientId = clientid(Msg),
on_query( on_query(
InstId, InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, {ActionId, ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State State
) )
end; end;
on_query(InstId, {Method, Request}, State) -> on_query(InstId, {Method, Request}, State) ->
%% TODO: Get retry from State %% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State); on_query(InstId, {undefined, undefined, Method, Request, 5000, _Retry = 2}, State);
on_query(InstId, {Method, Request, Timeout}, State) -> on_query(InstId, {Method, Request, Timeout}, State) ->
%% TODO: Get retry from State %% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, Timeout, _Retry = 2}, State); on_query(InstId, {undefined, undefined, Method, Request, Timeout, _Retry = 2}, State);
on_query( on_query(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout, Retry}, {ActionId, KeyOrNum, Method, Request, Timeout, Retry},
#{base_path := BasePath} = State #{base_path := BasePath} = State
) -> ) ->
?TRACE( ?TRACE(
@ -368,11 +368,12 @@ on_query(
request => redact_request(Request), request => redact_request(Request),
note => ?READACT_REQUEST_NOTE, note => ?READACT_REQUEST_NOTE,
connector => InstId, connector => InstId,
action_id => ActionId,
state => redact(State) state => redact(State)
} }
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
trace_rendered_action_template(InstId, Method, NRequest, Timeout), trace_rendered_action_template(ActionId, Method, NRequest, Timeout),
Worker = resolve_pool_worker(State, KeyOrNum), Worker = resolve_pool_worker(State, KeyOrNum),
Result0 = ehttpc:request( Result0 = ehttpc:request(
Worker, Worker,
@ -429,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
ClientId = maps:get(clientid, Msg, undefined), ClientId = maps:get(clientid, Msg, undefined),
on_query_async( on_query_async(
InstId, InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout}, {undefined, ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs, ReplyFunAndArgs,
State State
) )
@ -459,14 +460,14 @@ on_query_async(
ClientId = clientid(Msg), ClientId = clientid(Msg),
on_query_async( on_query_async(
InstId, InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout}, {ActionId, ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs, ReplyFunAndArgs,
State State
) )
end; end;
on_query_async( on_query_async(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout}, {ActionId, KeyOrNum, Method, Request, Timeout},
ReplyFunAndArgs, ReplyFunAndArgs,
#{base_path := BasePath} = State #{base_path := BasePath} = State
) -> ) ->
@ -482,7 +483,7 @@ on_query_async(
} }
), ),
NRequest = formalize_request(Method, BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
trace_rendered_action_template(InstId, Method, NRequest, Timeout), trace_rendered_action_template(ActionId, Method, NRequest, Timeout),
MaxAttempts = maps:get(max_attempts, State, 3), MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{ Context = #{
attempt => 1, attempt => 1,
@ -502,11 +503,11 @@ on_query_async(
), ),
{ok, Worker}. {ok, Worker}.
trace_rendered_action_template(InstId, Method, NRequest, Timeout) -> trace_rendered_action_template(ActionId, Method, NRequest, Timeout) ->
case NRequest of case NRequest of
{Path, Headers} -> {Path, Headers} ->
emqx_trace:rendered_action_template( emqx_trace:rendered_action_template(
InstId, ActionId,
#{ #{
path => Path, path => Path,
method => Method, method => Method,
@ -516,7 +517,7 @@ trace_rendered_action_template(InstId, Method, NRequest, Timeout) ->
); );
{Path, Headers, Body} -> {Path, Headers, Body} ->
emqx_trace:rendered_action_template( emqx_trace:rendered_action_template(
InstId, ActionId,
#{ #{
path => Path, path => Path,
method => Method, method => Method,

View File

@ -418,9 +418,8 @@ t_send_get_trace_messages(Config) ->
begin begin
Bin = read_rule_trace_file(TraceName, Now), Bin = read_rule_trace_file(TraceName, Now),
?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
end end

View File

@ -314,6 +314,15 @@ fields(trace) ->
example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}], example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}],
required => false required => false
} }
)},
{formatter,
hoconsc:mk(
hoconsc:union([text, json]),
#{
description => ?DESC(trace_log_formatter),
example => text,
required => false
}
)} )}
]; ];
fields(name) -> fields(name) ->

View File

@ -507,21 +507,24 @@ trace(["list"]) ->
) )
end; end;
trace(["stop", Operation, Filter0]) -> trace(["stop", Operation, Filter0]) ->
case trace_type(Operation, Filter0) of case trace_type(Operation, Filter0, text) of
{ok, Type, Filter} -> trace_off(Type, Filter); {ok, Type, Filter, _} -> trace_off(Type, Filter);
error -> trace([]) error -> trace([])
end; end;
trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile]) ->
trace(["start", Operation, ClientId, LogFile, "all"]); trace(["start", Operation, ClientId, LogFile, "all"]);
trace(["start", Operation, Filter0, LogFile, Level]) -> trace(["start", Operation, Filter0, LogFile, Level]) ->
case trace_type(Operation, Filter0) of trace(["start", Operation, Filter0, LogFile, Level, text]);
{ok, Type, Filter} -> trace(["start", Operation, Filter0, LogFile, Level, Formatter0]) ->
case trace_type(Operation, Filter0, Formatter0) of
{ok, Type, Filter, Formatter} ->
trace_on( trace_on(
name(Filter0), name(Filter0),
Type, Type,
Filter, Filter,
list_to_existing_atom(Level), list_to_existing_atom(Level),
LogFile LogFile,
Formatter
); );
error -> error ->
trace([]) trace([])
@ -529,19 +532,22 @@ trace(["start", Operation, Filter0, LogFile, Level]) ->
trace(_) -> trace(_) ->
emqx_ctl:usage([ emqx_ctl:usage([
{"trace list", "List all traces started on local node"}, {"trace list", "List all traces started on local node"},
{"trace start client <ClientId> <File> [<Level>]", "Traces for a client on local node"}, {"trace start client <ClientId> <File> [<Level>] [<Formatter>]",
"Traces for a client on local node (Formatter=text|json)"},
{"trace stop client <ClientId>", "Stop tracing for a client on local node"}, {"trace stop client <ClientId>", "Stop tracing for a client on local node"},
{"trace start topic <Topic> <File> [<Level>] ", "Traces for a topic on local node"}, {"trace start topic <Topic> <File> [<Level>] [<Formatter>]",
"Traces for a topic on local node (Formatter=text|json)"},
{"trace stop topic <Topic> ", "Stop tracing for a topic on local node"}, {"trace stop topic <Topic> ", "Stop tracing for a topic on local node"},
{"trace start ip_address <IP> <File> [<Level>] ", {"trace start ip_address <IP> <File> [<Level>] [<Formatter>]",
"Traces for a client ip on local node"}, "Traces for a client ip on local node (Formatter=text|json)"},
{"trace stop ip_address <IP> ", "Stop tracing for a client ip on local node"}, {"trace stop ip_address <IP> ", "Stop tracing for a client ip on local node"},
{"trace start ruleid <RuleID> <File> [<Level>] ", "Traces for a rule ID on local node"}, {"trace start ruleid <RuleID> <File> [<Level>] [<Formatter>]",
"Traces for a rule ID on local node (Formatter=text|json)"},
{"trace stop ruleid <RuleID> ", "Stop tracing for a rule ID on local node"} {"trace stop ruleid <RuleID> ", "Stop tracing for a rule ID on local node"}
]). ]).
trace_on(Name, Type, Filter, Level, LogFile) -> trace_on(Name, Type, Filter, Level, LogFile, Formatter) ->
case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile, Formatter) of
ok -> ok ->
emqx_trace:check(), emqx_trace:check(),
emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]); emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]);
@ -592,28 +598,33 @@ traces(["delete", Name]) ->
trace_cluster_del(Name); trace_cluster_del(Name);
traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter]) ->
traces(["start", Name, Operation, Filter, ?DEFAULT_TRACE_DURATION]); traces(["start", Name, Operation, Filter, ?DEFAULT_TRACE_DURATION]);
traces(["start", Name, Operation, Filter0, DurationS]) -> traces(["start", Name, Operation, Filter, DurationS]) ->
case trace_type(Operation, Filter0) of traces(["start", Name, Operation, Filter, DurationS, text]);
{ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); traces(["start", Name, Operation, Filter0, DurationS, Formatter0]) ->
case trace_type(Operation, Filter0, Formatter0) of
{ok, Type, Filter, Formatter} -> trace_cluster_on(Name, Type, Filter, DurationS, Formatter);
error -> traces([]) error -> traces([])
end; end;
traces(_) -> traces(_) ->
emqx_ctl:usage([ emqx_ctl:usage([
{"traces list", "List all cluster traces started"}, {"traces list", "List all cluster traces started"},
{"traces start <Name> client <ClientId> [<Duration>]", "Traces for a client in cluster"}, {"traces start <Name> client <ClientId> [<Duration>] [<Formatter>]",
{"traces start <Name> topic <Topic> [<Duration>]", "Traces for a topic in cluster"}, "Traces for a client in cluster (Formatter=text|json)"},
{"traces start <Name> ip_address <IPAddr> [<Duration>]", {"traces start <Name> topic <Topic> [<Duration>] [<Formatter>]",
"Traces for a topic in cluster (Formatter=text|json)"},
{"traces start <Name> ruleid <RuleID> [<Duration>] [<Formatter>]",
"Traces for a rule ID in cluster (Formatter=text|json)"},
{"traces start <Name> ip_address <IPAddr> [<Duration>] [<Formatter>]",
"Traces for a client IP in cluster\n" "Traces for a client IP in cluster\n"
"Trace will start immediately on all nodes, including the core and replicant,\n" "Trace will start immediately on all nodes, including the core and replicant,\n"
"and will end after <Duration> seconds. The default value for <Duration> is " "and will end after <Duration> seconds. The default value for <Duration> is "
?DEFAULT_TRACE_DURATION ?DEFAULT_TRACE_DURATION
" seconds."}, " seconds. (Formatter=text|json)"},
{"traces start <Name> ruleid <RuleID> [<Duration>]", "Traces for a rule ID in cluster"},
{"traces stop <Name>", "Stop trace in cluster"}, {"traces stop <Name>", "Stop trace in cluster"},
{"traces delete <Name>", "Delete trace in cluster"} {"traces delete <Name>", "Delete trace in cluster"}
]). ]).
trace_cluster_on(Name, Type, Filter, DurationS0) -> trace_cluster_on(Name, Type, Filter, DurationS0, Formatter) ->
Now = emqx_trace:now_second(), Now = emqx_trace:now_second(),
DurationS = list_to_integer(DurationS0), DurationS = list_to_integer(DurationS0),
Trace = #{ Trace = #{
@ -621,7 +632,8 @@ trace_cluster_on(Name, Type, Filter, DurationS0) ->
type => Type, type => Type,
Type => bin(Filter), Type => bin(Filter),
start_at => Now, start_at => Now,
end_at => Now + DurationS end_at => Now + DurationS,
formatter => Formatter
}, },
case emqx_trace:create(Trace) of case emqx_trace:create(Trace) of
{ok, _} -> {ok, _} ->
@ -645,10 +657,12 @@ trace_cluster_off(Name) ->
{error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error]) {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error])
end. end.
trace_type("client", ClientId) -> {ok, clientid, bin(ClientId)}; trace_type(Op, Match, "text") -> trace_type(Op, Match, text);
trace_type("topic", Topic) -> {ok, topic, bin(Topic)}; trace_type(Op, Match, "json") -> trace_type(Op, Match, json);
trace_type("ip_address", IP) -> {ok, ip_address, IP}; trace_type("client", ClientId, Formatter) -> {ok, clientid, bin(ClientId), Formatter};
trace_type(_, _) -> error. trace_type("topic", Topic, Formatter) -> {ok, topic, bin(Topic), Formatter};
trace_type("ip_address", IP, Formatter) -> {ok, ip_address, IP, Formatter};
trace_type(_, _, _) -> error.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Listeners Command %% @doc Listeners Command

View File

@ -23,6 +23,7 @@
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
-include_lib("stdlib/include/zip.hrl"). -include_lib("stdlib/include/zip.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/logger.hrl").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
@ -169,9 +170,186 @@ t_http_test_rule_trace(_Config) ->
{ok, Delete} = request_api(delete, api_path(["trace/", Name])), {ok, Delete} = request_api(delete, api_path(["trace/", Name])),
?assertEqual(<<>>, Delete), ?assertEqual(<<>>, Delete),
emqx_trace:clear(),
unload(), unload(),
ok. ok.
t_http_test_json_formatter(_Config) ->
emqx_trace:clear(),
load(),
Name = <<"testname">>,
Topic = <<"/x/y/z">>,
Trace = [
{<<"name">>, Name},
{<<"type">>, <<"topic">>},
{<<"topic">>, Topic},
{<<"formatter">>, <<"json">>}
],
{ok, Create} = request_api(post, api_path("trace"), Trace),
?assertMatch(#{<<"name">> := Name}, json(Create)),
{ok, List} = request_api(get, api_path("trace")),
[Data] = json(List),
?assertEqual(<<"json">>, maps:get(<<"formatter">>, Data)),
{ok, List1} = request_api(get, api_path("trace")),
[Data1] = json(List1),
?assertMatch(
#{
<<"formatter">> := <<"json">>
},
Data1
),
%% Check that the log is empty
ok = emqx_trace_handler_SUITE:filesync(Name, topic),
{ok, _Detail} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
%% Trace is empty which results in a not found error
{error, _} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
%% Start a client and send a message to get info to the log
ClientId = <<"my_client_id">>,
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client),
%% Normal message
emqtt:publish(Client, Topic, #{}, <<"log_this_message">>, [{qos, 2}]),
%% Escape line breaks
emqtt:publish(Client, Topic, #{}, <<"\nlog\nthis\nmessage">>, [{qos, 2}]),
%% Escape escape character
emqtt:publish(Client, Topic, #{}, <<"\\\nlog\n_\\n_this\nmessage\\">>, [{qos, 2}]),
%% Escape end of string
emqtt:publish(Client, Topic, #{}, <<"\"log_this_message\"">>, [{qos, 2}]),
%% Manually create some trace messages to test the JSON formatter
%% String key and value
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, "str" => "str"}),
%% Log Erlang term
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, term => {notjson}}),
%% Log Erlang term key
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, {'notjson'} => term}),
%% Log Integer
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, integer => 42}),
%% Log Float
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, float => 1.2}),
%% Log Integer Key
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 42 => integer}),
%% Log Float Key
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 1.2 => float}),
%% Log Map Key
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, #{} => value}),
%% Empty submap
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{}}),
%% Non-empty submap
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{key => value}}),
%% Bolean values
?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, true => true, false => false}),
%% Key value list
?TRACE("CUSTOM", "my_log_msg", #{
topic => Topic,
list => [
{<<"key">>, <<"value">>},
{<<"key2">>, <<"value2">>}
]
}),
%% We do special formatting for client_ids and rule_ids
?TRACE("CUSTOM", "my_log_msg", #{
topic => Topic,
client_ids => maps:from_keys([<<"a">>, <<"b">>, <<"c">>], true)
}),
?TRACE("CUSTOM", "my_log_msg", #{
topic => Topic,
rule_ids => maps:from_keys([<<"a">>, <<"b">>, <<"c">>], true)
}),
%% action_id should be rendered as action_info
?TRACE("CUSTOM", "my_log_msg", #{
topic => Topic,
action_id =>
<<"action:http:emqx_bridge_http_test_lib:connector:http:emqx_bridge_http_test_lib">>
}),
ok = emqx_trace_handler_SUITE:filesync(Name, topic),
{ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
{ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
{ok, [
_Comment,
#zip_file{
name = _ZipName,
info = #file_info{size = Size, type = regular, access = read_write}
}
]} = zip:table(Bin),
?assert(Size > 0),
{ok, [{_, LogContent}]} = zip:unzip(Bin, [memory]),
LogEntriesTrailing = string:split(LogContent, "\n", all),
LogEntries = lists:droplast(LogEntriesTrailing),
DecodedLogEntries = [
begin
ct:pal("LOG ENTRY\n~s\n", [JSONEntry]),
emqx_utils_json:decode(JSONEntry)
end
|| JSONEntry <- LogEntries
],
?assertMatch(
[
#{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}},
#{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}},
#{
<<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>}
},
#{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}},
#{<<"meta">> := #{<<"str">> := <<"str">>}},
#{<<"meta">> := #{<<"term">> := <<"{notjson}">>}},
#{<<"meta">> := <<_/binary>>},
#{<<"meta">> := #{<<"integer">> := 42}},
#{<<"meta">> := #{<<"float">> := 1.2}},
#{<<"meta">> := <<_/binary>>},
#{<<"meta">> := <<_/binary>>},
#{<<"meta">> := <<_/binary>>},
#{<<"meta">> := #{<<"sub">> := #{}}},
#{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}},
#{<<"meta">> := #{<<"true">> := <<"true">>, <<"false">> := <<"false">>}},
#{
<<"meta">> := #{
<<"list">> := #{
<<"key">> := <<"value">>,
<<"key2">> := <<"value2">>
}
}
},
#{
<<"meta">> := #{
<<"client_ids">> := [<<"a">>, <<"b">>, <<"c">>]
}
},
#{
<<"meta">> := #{
<<"rule_ids">> := [<<"a">>, <<"b">>, <<"c">>]
}
},
#{
<<"meta">> := #{
<<"action_info">> := #{
<<"type">> := <<"http">>,
<<"name">> := <<"emqx_bridge_http_test_lib">>
}
}
}
| _
],
DecodedLogEntries
),
{ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))),
?assertEqual(<<>>, Delete),
{ok, List2} = request_api(get, api_path("trace")),
?assertEqual([], json(List2)),
ok = emqtt:disconnect(Client),
unload(),
emqx_trace:clear(),
ok.
t_create_failed(_Config) -> t_create_failed(_Config) ->
load(), load(),
Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}], Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}],

View File

@ -1147,12 +1147,13 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
{ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} -> {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
{error, {unrecoverable_error, unhealthy_target}}; {error, {unrecoverable_error, unhealthy_target}};
{ok, _Group, Resource} -> {ok, _Group, Resource} ->
set_rule_id_trace_meta_data(Query), QueryResult =
QueryResult = do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource), try
%% do_call_query does not throw an exception as the call to the set_rule_id_trace_meta_data(Query),
%% resource is wrapped in a try catch expression so we will always do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource)
%% unset the trace meta data after
unset_rule_id_trace_meta_data(), unset_rule_id_trace_meta_data()
end,
QueryResult; QueryResult;
{error, not_found} -> {error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found") ?RESOURCE_ERROR(not_found, "resource not found")

View File

@ -132,7 +132,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
reason => Error, reason => Error,
stacktrace => StkTrace stacktrace => StkTrace
}, },
warning error
), ),
{error, {Error, StkTrace}} {error, {Error, StkTrace}}
after after
@ -176,18 +176,18 @@ do_apply_rule(
{ok, ColumnsAndSelected, FinalCollection} -> {ok, ColumnsAndSelected, FinalCollection} ->
case FinalCollection of case FinalCollection of
[] -> [] ->
trace_rule_sql("FOREACH_yielded_no_result"), trace_rule_sql("SQL_yielded_no_result"),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'); ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
_ -> _ ->
trace_rule_sql( trace_rule_sql(
"FOREACH_yielded_result", #{result => FinalCollection}, debug "SQL_yielded_result", #{result => FinalCollection}, debug
), ),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
end, end,
NewEnvs = maps:merge(ColumnsAndSelected, Envs), NewEnvs = maps:merge(ColumnsAndSelected, Envs),
{ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]}; {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
false -> false ->
trace_rule_sql("FOREACH_yielded_no_result_no_match"), trace_rule_sql("SQL_yielded_no_result"),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch} {error, nomatch}
end; end;
@ -204,11 +204,11 @@ do_apply_rule(
) -> ) ->
case evaluate_select(Fields, Columns, Conditions) of case evaluate_select(Fields, Columns, Conditions) of
{ok, Selected} -> {ok, Selected} ->
trace_rule_sql("SELECT_yielded_result", #{result => Selected}, debug), trace_rule_sql("SQL_yielded_result", #{result => Selected}, debug),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
{ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))}; {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
false -> false ->
trace_rule_sql("SELECT_yielded_no_result_no_match"), trace_rule_sql("SQL_yielded_no_result"),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch} {error, nomatch}
end. end.
@ -392,10 +392,8 @@ handle_action_list(RuleId, Actions, Selected, Envs) ->
handle_action(RuleId, ActId, Selected, Envs) -> handle_action(RuleId, ActId, Selected, Envs) ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
trace_action(ActId, "activating_action"),
try try
Result = do_handle_action(RuleId, ActId, Selected, Envs), Result = do_handle_action(RuleId, ActId, Selected, Envs),
trace_action(ActId, "action_activated", #{result => Result}),
Result Result
catch catch
throw:out_of_service -> throw:out_of_service ->
@ -467,7 +465,6 @@ do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -
Result = Mod:Func(Selected, Envs, Args), Result = Mod:Func(Selected, Envs, Args),
{_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
inc_action_metrics(IncCtx, Result), inc_action_metrics(IncCtx, Result),
trace_action(Action, "call_action_function_result", #{result => Result}, debug),
Result. Result.
do_handle_action_get_trace_inc_metrics_context(RuleID, Action) -> do_handle_action_get_trace_inc_metrics_context(RuleID, Action) ->

View File

@ -36,7 +36,8 @@ init_per_suite(Config) ->
emqx_connector, emqx_connector,
emqx_bridge, emqx_bridge,
emqx_bridge_http, emqx_bridge_http,
emqx_rule_engine emqx_rule_engine,
emqx_modules
], ],
%% I don't know why we need to stop the apps and then start them but if we %% I don't know why we need to stop the apps and then start them but if we
%% don't do this and other suites run before this suite the test cases will %% don't do this and other suites run before this suite the test cases will
@ -128,9 +129,8 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
Bin = read_rule_trace_file(TraceName, TraceType, Now), Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG:~n~s", [Bin]), io:format("THELOG:~n~s", [Bin]),
?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
end end
@ -171,10 +171,9 @@ create_trace(TraceName, TraceType, TraceValue) ->
type => TraceType, type => TraceType,
TraceType => TraceValue, TraceType => TraceValue,
start_at => Start, start_at => Start,
end_at => End end_at => End,
formatter => json
}, },
emqx_trace_SUITE:reload(),
ok = emqx_trace:clear(),
{ok, _} = emqx_trace:create(Trace). {ok, _} = emqx_trace:create(Trace).
t_apply_rule_test_batch_separation_stop_after_render(_Config) -> t_apply_rule_test_batch_separation_stop_after_render(_Config) ->

View File

@ -30,11 +30,11 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx_conf), application:load(emqx_conf),
ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine, emqx_modules]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine, emqx_modules]),
ok. ok.
t_ctx_pub(_) -> t_ctx_pub(_) ->

View File

@ -0,0 +1 @@
You can now format trace log entries as JSON objects by setting the formatter parameter to "json" when creating the trace pattern.

View File

@ -115,4 +115,9 @@ current_trace_offset.desc:
current_trace_offset.label: current_trace_offset.label:
"""Offset from the current trace position.""" """Offset from the current trace position."""
trace_log_formatter.desc:
"""The formatter that will be used to format the trace log entries. Set this to text to format the log entries as plain text (default). Set it to json to format each log entry as a JSON object."""
trace_log_formatter.label:
"""Trace Log Entry Formatter"""
} }