diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 476794223..d1e70b184 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -30,7 +30,7 @@ | '_', enable = true :: boolean() | '_', payload_encode = text :: hex | text | hidden | '_', - extra = #{} :: map() | '_', + extra = #{formatter => text} :: #{formatter => text | json} | '_', start_at :: integer() | undefined | '_', end_at :: integer() | undefined | '_' }). diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 4ae973722..7bbe59b2b 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -88,8 +88,14 @@ unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). rendered_action_template(ActionID, RenderResult) -> - Msg = lists:flatten(io_lib:format("action_template_rendered(~ts)", [ActionID])), - TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult), + TraceResult = ?TRACE( + "QUERY_RENDER", + "action_template_rendered", + #{ + result => RenderResult, + action_id => ActionID + } + ), case logger:get_process_metadata() of #{stop_action_after_render := true} -> %% We throw an unrecoverable error to stop action before the @@ -183,8 +189,10 @@ create(Trace) -> case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of true -> case to_trace(Trace) of - {ok, TraceRec} -> insert_new_trace(TraceRec); - {error, Reason} -> {error, Reason} + {ok, TraceRec} -> + insert_new_trace(TraceRec); + {error, Reason} -> + {error, Reason} end; false -> {error, @@ -246,7 +254,11 @@ format(Traces) -> lists:map( fun(Trace0 = #?TRACE{}) -> [_ | Values] = tuple_to_list(Trace0), - maps:from_list(lists:zip(Fields, Values)) + Map0 = maps:from_list(lists:zip(Fields, Values)), + Extra = maps:get(extra, Map0, #{}), + Formatter = maps:get(formatter, Extra, text), + Map1 = Map0#{formatter => Formatter}, + maps:remove(extra, Map1) end, Traces ). @@ -392,9 +404,17 @@ start_trace(Trace) -> type = Type, filter = Filter, start_at = Start, - payload_encode = PayloadEncode + payload_encode = PayloadEncode, + extra = Extra } = Trace, - Who = #{name => Name, type => Type, filter => Filter, payload_encode => PayloadEncode}, + Formatter = maps:get(formatter, Extra, text), + Who = #{ + name => Name, + type => Type, + filter => Filter, + payload_encode => PayloadEncode, + formatter => Formatter + }, emqx_trace_handler:install(Who, debug, log_file(Name, Start)). stop_trace(Finished, Started) -> @@ -559,6 +579,12 @@ to_trace(#{end_at := EndAt} = Trace, Rec) -> {ok, _Sec} -> {error, "end_at time has already passed"} end; +to_trace(#{formatter := Formatter} = Trace, Rec) -> + Extra = Rec#?TRACE.extra, + to_trace( + maps:remove(formatter, Trace), + Rec#?TRACE{extra = Extra#{formatter => Formatter}} + ); to_trace(_, Rec) -> {ok, Rec}. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index c69809052..8179f8c0a 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -27,6 +27,7 @@ install/3, install/4, install/5, + install/6, uninstall/1, uninstall/2 ]). @@ -46,7 +47,8 @@ name := binary(), type := clientid | topic | ip_address, filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address(), - payload_encode := text | hidden | hex + payload_encode := text | hidden | hex, + formatter => json | text }. -define(CONFIG(_LogFile_), #{ @@ -69,17 +71,29 @@ Type :: clientid | topic | ip_address, Filter :: emqx_types:clientid() | emqx_types:topic() | string(), Level :: logger:level() | all, - LogFilePath :: string() + LogFilePath :: string(), + Formatter :: text | json ) -> ok | {error, term()}. -install(Name, Type, Filter, Level, LogFile) -> +install(Name, Type, Filter, Level, LogFile, Formatter) -> Who = #{ type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name), - payload_encode => payload_encode() + payload_encode => payload_encode(), + formatter => Formatter }, install(Who, Level, LogFile). +-spec install( + Name :: binary() | list(), + Type :: clientid | topic | ip_address, + Filter :: emqx_types:clientid() | emqx_types:topic() | string(), + Level :: logger:level() | all, + LogFilePath :: string() +) -> ok | {error, term()}. +install(Name, Type, Filter, Level, LogFile) -> + install(Name, Type, Filter, Level, LogFile, text). + -spec install( Type :: clientid | topic | ip_address, Filter :: emqx_types:clientid() | emqx_types:topic() | string(), @@ -183,6 +197,10 @@ filters(#{type := ip_address, filter := Filter, name := Name}) -> filters(#{type := ruleid, filter := Filter, name := Name}) -> [{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}]. +formatter(#{type := _Type, payload_encode := PayloadEncode, formatter := json}) -> + {emqx_trace_json_formatter, #{ + payload_encode => PayloadEncode + }}; formatter(#{type := _Type, payload_encode := PayloadEncode}) -> {emqx_trace_formatter, #{ %% template is for ?SLOG message not ?TRACE. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl new file mode 100644 index 000000000..35b09b9b0 --- /dev/null +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -0,0 +1,130 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_trace_json_formatter). + +-include("emqx_mqtt.hrl"). + +-export([format/2]). + +%% logger_formatter:config/0 is not exported. +-type config() :: map(). + +%%%----------------------------------------------------------------- +%%% Callback Function +%%%----------------------------------------------------------------- + +-spec format(LogEvent, Config) -> unicode:chardata() when + LogEvent :: logger:log_event(), + Config :: config(). +format( + LogMap, + #{payload_encode := PEncode} +) -> + %% We just make some basic transformations on the input LogMap and then do + %% an external call to create the JSON text + Time = emqx_utils_calendar:now_to_rfc3339(microsecond), + LogMap1 = LogMap#{time => Time}, + LogMap2 = prepare_log_map(LogMap1, PEncode), + [emqx_logger_jsonfmt:best_effort_json(LogMap2, [force_utf8]), "\n"]. + +%%%----------------------------------------------------------------- +%%% Helper Functions +%%%----------------------------------------------------------------- + +prepare_log_map(LogMap, PEncode) -> + NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], + maps:from_list(NewKeyValuePairs). + +prepare_key_value(payload = K, V, PEncode) -> + NewV = + try + format_payload(V, PEncode) + catch + _:_ -> + V + end, + {K, NewV}; +prepare_key_value(packet = K, V, PEncode) -> + NewV = + try + format_packet(V, PEncode) + catch + _:_ -> + V + end, + {K, NewV}; +prepare_key_value(rule_ids = K, V, _PEncode) -> + NewV = + try + format_map_set_to_list(V) + catch + _:_ -> + V + end, + {K, NewV}; +prepare_key_value(client_ids = K, V, _PEncode) -> + NewV = + try + format_map_set_to_list(V) + catch + _:_ -> + V + end, + {K, NewV}; +prepare_key_value(action_id = K, V, _PEncode) -> + try + {action_info, format_action_info(V)} + catch + _:_ -> + {K, V} + end; +prepare_key_value(K, V, PEncode) when is_map(V) -> + {K, prepare_log_map(V, PEncode)}; +prepare_key_value(K, V, _PEncode) -> + {K, V}. + +format_packet(undefined, _) -> ""; +format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode). + +format_payload(undefined, _) -> + ""; +format_payload(_, hidden) -> + "******"; +format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> + unicode:characters_to_list(Payload); +format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload); +format_payload(<> = Payload, Type) -> + emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type). + +format_map_set_to_list(Map) -> + Items = [ + begin + %% Assert that it is really a map set + true = V, + %% Assert that the keys have the expected type + true = is_binary(K), + K + end + || {K, V} <- maps:to_list(Map) + ], + lists:sort(Items). + +format_action_info(V) -> + [<<"action">>, Type, Name | _] = binary:split(V, <<":">>, [global]), + #{ + type => Type, + name => Name + }. diff --git a/apps/emqx/test/emqx_trace_SUITE.erl b/apps/emqx/test/emqx_trace_SUITE.erl index 1652c11b9..ad2991445 100644 --- a/apps/emqx/test/emqx_trace_SUITE.erl +++ b/apps/emqx/test/emqx_trace_SUITE.erl @@ -96,7 +96,7 @@ t_base_create_delete(_Config) -> start_at => Now, end_at => Now + 30 * 60, payload_encode => text, - extra => #{} + formatter => text } ], ?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])), @@ -511,4 +511,13 @@ build_old_trace_data() -> reload() -> catch ok = gen_server:stop(emqx_trace), - {ok, _Pid} = emqx_trace:start_link(). + case emqx_trace:start_link() of + {ok, _Pid} = Res -> + Res; + NotOKRes -> + ct:pal( + "emqx_trace:start_link() gave result: ~p\n" + "(perhaps it is already started)", + [NotOKRes] + ) + end. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 927c1c9c9..9be7457e1 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -316,7 +316,7 @@ on_query(InstId, {send_message, Msg}, State) -> ClientId = maps:get(clientid, Msg, undefined), on_query( InstId, - {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, + {undefined, ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, State ) end; @@ -346,19 +346,19 @@ on_query( ClientId = clientid(Msg), on_query( InstId, - {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, + {ActionId, ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, State ) end; on_query(InstId, {Method, Request}, State) -> %% TODO: Get retry from State - on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State); + on_query(InstId, {undefined, undefined, Method, Request, 5000, _Retry = 2}, State); on_query(InstId, {Method, Request, Timeout}, State) -> %% TODO: Get retry from State - on_query(InstId, {undefined, Method, Request, Timeout, _Retry = 2}, State); + on_query(InstId, {undefined, undefined, Method, Request, Timeout, _Retry = 2}, State); on_query( InstId, - {KeyOrNum, Method, Request, Timeout, Retry}, + {ActionId, KeyOrNum, Method, Request, Timeout, Retry}, #{base_path := BasePath} = State ) -> ?TRACE( @@ -368,11 +368,12 @@ on_query( request => redact_request(Request), note => ?READACT_REQUEST_NOTE, connector => InstId, + action_id => ActionId, state => redact(State) } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(InstId, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -429,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> ClientId = maps:get(clientid, Msg, undefined), on_query_async( InstId, - {ClientId, Method, {Path, Headers, Body}, Timeout}, + {undefined, ClientId, Method, {Path, Headers, Body}, Timeout}, ReplyFunAndArgs, State ) @@ -459,14 +460,14 @@ on_query_async( ClientId = clientid(Msg), on_query_async( InstId, - {ClientId, Method, {Path, Headers, Body}, Timeout}, + {ActionId, ClientId, Method, {Path, Headers, Body}, Timeout}, ReplyFunAndArgs, State ) end; on_query_async( InstId, - {KeyOrNum, Method, Request, Timeout}, + {ActionId, KeyOrNum, Method, Request, Timeout}, ReplyFunAndArgs, #{base_path := BasePath} = State ) -> @@ -482,7 +483,7 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(InstId, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -502,11 +503,11 @@ on_query_async( ), {ok, Worker}. -trace_rendered_action_template(InstId, Method, NRequest, Timeout) -> +trace_rendered_action_template(ActionId, Method, NRequest, Timeout) -> case NRequest of {Path, Headers} -> emqx_trace:rendered_action_template( - InstId, + ActionId, #{ path => Path, method => Method, @@ -516,7 +517,7 @@ trace_rendered_action_template(InstId, Method, NRequest, Timeout) -> ); {Path, Headers, Body} -> emqx_trace:rendered_action_template( - InstId, + ActionId, #{ path => Path, method => Method, diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 73f6359ab..9d215d815 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -418,9 +418,8 @@ t_send_get_trace_messages(Config) -> begin Bin = read_rule_trace_file(TraceName, Now), ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) end diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 19edc229d..e5ccde4f2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -314,6 +314,15 @@ fields(trace) -> example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}], required => false } + )}, + {formatter, + hoconsc:mk( + hoconsc:union([text, json]), + #{ + description => ?DESC(trace_log_formatter), + example => text, + required => false + } )} ]; fields(name) -> diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 465989f3d..32a24d9bd 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -507,21 +507,24 @@ trace(["list"]) -> ) end; trace(["stop", Operation, Filter0]) -> - case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> trace_off(Type, Filter); + case trace_type(Operation, Filter0, text) of + {ok, Type, Filter, _} -> trace_off(Type, Filter); error -> trace([]) end; trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); trace(["start", Operation, Filter0, LogFile, Level]) -> - case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> + trace(["start", Operation, Filter0, LogFile, Level, text]); +trace(["start", Operation, Filter0, LogFile, Level, Formatter0]) -> + case trace_type(Operation, Filter0, Formatter0) of + {ok, Type, Filter, Formatter} -> trace_on( name(Filter0), Type, Filter, list_to_existing_atom(Level), - LogFile + LogFile, + Formatter ); error -> trace([]) @@ -529,19 +532,22 @@ trace(["start", Operation, Filter0, LogFile, Level]) -> trace(_) -> emqx_ctl:usage([ {"trace list", "List all traces started on local node"}, - {"trace start client []", "Traces for a client on local node"}, + {"trace start client [] []", + "Traces for a client on local node (Formatter=text|json)"}, {"trace stop client ", "Stop tracing for a client on local node"}, - {"trace start topic [] ", "Traces for a topic on local node"}, + {"trace start topic [] []", + "Traces for a topic on local node (Formatter=text|json)"}, {"trace stop topic ", "Stop tracing for a topic on local node"}, - {"trace start ip_address [] ", - "Traces for a client ip on local node"}, + {"trace start ip_address [] []", + "Traces for a client ip on local node (Formatter=text|json)"}, {"trace stop ip_address ", "Stop tracing for a client ip on local node"}, - {"trace start ruleid [] ", "Traces for a rule ID on local node"}, + {"trace start ruleid [] []", + "Traces for a rule ID on local node (Formatter=text|json)"}, {"trace stop ruleid ", "Stop tracing for a rule ID on local node"} ]). -trace_on(Name, Type, Filter, Level, LogFile) -> - case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of +trace_on(Name, Type, Filter, Level, LogFile, Formatter) -> + case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile, Formatter) of ok -> emqx_trace:check(), emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]); @@ -592,28 +598,33 @@ traces(["delete", Name]) -> trace_cluster_del(Name); traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter, ?DEFAULT_TRACE_DURATION]); -traces(["start", Name, Operation, Filter0, DurationS]) -> - case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); +traces(["start", Name, Operation, Filter, DurationS]) -> + traces(["start", Name, Operation, Filter, DurationS, text]); +traces(["start", Name, Operation, Filter0, DurationS, Formatter0]) -> + case trace_type(Operation, Filter0, Formatter0) of + {ok, Type, Filter, Formatter} -> trace_cluster_on(Name, Type, Filter, DurationS, Formatter); error -> traces([]) end; traces(_) -> emqx_ctl:usage([ {"traces list", "List all cluster traces started"}, - {"traces start client []", "Traces for a client in cluster"}, - {"traces start topic []", "Traces for a topic in cluster"}, - {"traces start ip_address []", + {"traces start client [] []", + "Traces for a client in cluster (Formatter=text|json)"}, + {"traces start topic [] []", + "Traces for a topic in cluster (Formatter=text|json)"}, + {"traces start ruleid [] []", + "Traces for a rule ID in cluster (Formatter=text|json)"}, + {"traces start ip_address [] []", "Traces for a client IP in cluster\n" "Trace will start immediately on all nodes, including the core and replicant,\n" "and will end after seconds. The default value for is " ?DEFAULT_TRACE_DURATION - " seconds."}, - {"traces start ruleid []", "Traces for a rule ID in cluster"}, + " seconds. (Formatter=text|json)"}, {"traces stop ", "Stop trace in cluster"}, {"traces delete ", "Delete trace in cluster"} ]). -trace_cluster_on(Name, Type, Filter, DurationS0) -> +trace_cluster_on(Name, Type, Filter, DurationS0, Formatter) -> Now = emqx_trace:now_second(), DurationS = list_to_integer(DurationS0), Trace = #{ @@ -621,7 +632,8 @@ trace_cluster_on(Name, Type, Filter, DurationS0) -> type => Type, Type => bin(Filter), start_at => Now, - end_at => Now + DurationS + end_at => Now + DurationS, + formatter => Formatter }, case emqx_trace:create(Trace) of {ok, _} -> @@ -645,10 +657,12 @@ trace_cluster_off(Name) -> {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error]) end. -trace_type("client", ClientId) -> {ok, clientid, bin(ClientId)}; -trace_type("topic", Topic) -> {ok, topic, bin(Topic)}; -trace_type("ip_address", IP) -> {ok, ip_address, IP}; -trace_type(_, _) -> error. +trace_type(Op, Match, "text") -> trace_type(Op, Match, text); +trace_type(Op, Match, "json") -> trace_type(Op, Match, json); +trace_type("client", ClientId, Formatter) -> {ok, clientid, bin(ClientId), Formatter}; +trace_type("topic", Topic, Formatter) -> {ok, topic, bin(Topic), Formatter}; +trace_type("ip_address", IP, Formatter) -> {ok, ip_address, IP, Formatter}; +trace_type(_, _, _) -> error. %%-------------------------------------------------------------------- %% @doc Listeners Command diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index ef7b5a191..c5f5c475d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("kernel/include/file.hrl"). -include_lib("stdlib/include/zip.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/logger.hrl"). %%-------------------------------------------------------------------- %% Setups @@ -169,9 +170,186 @@ t_http_test_rule_trace(_Config) -> {ok, Delete} = request_api(delete, api_path(["trace/", Name])), ?assertEqual(<<>>, Delete), + emqx_trace:clear(), unload(), ok. +t_http_test_json_formatter(_Config) -> + emqx_trace:clear(), + load(), + + Name = <<"testname">>, + Topic = <<"/x/y/z">>, + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"topic">>}, + {<<"topic">>, Topic}, + {<<"formatter">>, <<"json">>} + ], + + {ok, Create} = request_api(post, api_path("trace"), Trace), + ?assertMatch(#{<<"name">> := Name}, json(Create)), + + {ok, List} = request_api(get, api_path("trace")), + [Data] = json(List), + ?assertEqual(<<"json">>, maps:get(<<"formatter">>, Data)), + + {ok, List1} = request_api(get, api_path("trace")), + [Data1] = json(List1), + ?assertMatch( + #{ + <<"formatter">> := <<"json">> + }, + Data1 + ), + + %% Check that the log is empty + ok = emqx_trace_handler_SUITE:filesync(Name, topic), + {ok, _Detail} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), + %% Trace is empty which results in a not found error + {error, _} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), + + %% Start a client and send a message to get info to the log + ClientId = <<"my_client_id">>, + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + %% Normal message + emqtt:publish(Client, Topic, #{}, <<"log_this_message">>, [{qos, 2}]), + %% Escape line breaks + emqtt:publish(Client, Topic, #{}, <<"\nlog\nthis\nmessage">>, [{qos, 2}]), + %% Escape escape character + emqtt:publish(Client, Topic, #{}, <<"\\\nlog\n_\\n_this\nmessage\\">>, [{qos, 2}]), + %% Escape end of string + emqtt:publish(Client, Topic, #{}, <<"\"log_this_message\"">>, [{qos, 2}]), + + %% Manually create some trace messages to test the JSON formatter + + %% String key and value + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, "str" => "str"}), + %% Log Erlang term + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, term => {notjson}}), + %% Log Erlang term key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, {'notjson'} => term}), + %% Log Integer + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, integer => 42}), + %% Log Float + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, float => 1.2}), + %% Log Integer Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 42 => integer}), + %% Log Float Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, 1.2 => float}), + %% Log Map Key + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, #{} => value}), + %% Empty submap + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{}}), + %% Non-empty submap + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, sub => #{key => value}}), + %% Bolean values + ?TRACE("CUSTOM", "my_log_msg", #{topic => Topic, true => true, false => false}), + %% Key value list + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + list => [ + {<<"key">>, <<"value">>}, + {<<"key2">>, <<"value2">>} + ] + }), + %% We do special formatting for client_ids and rule_ids + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + client_ids => maps:from_keys([<<"a">>, <<"b">>, <<"c">>], true) + }), + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + rule_ids => maps:from_keys([<<"a">>, <<"b">>, <<"c">>], true) + }), + %% action_id should be rendered as action_info + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + action_id => + <<"action:http:emqx_bridge_http_test_lib:connector:http:emqx_bridge_http_test_lib">> + }), + ok = emqx_trace_handler_SUITE:filesync(Name, topic), + {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), + {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), + {ok, [ + _Comment, + #zip_file{ + name = _ZipName, + info = #file_info{size = Size, type = regular, access = read_write} + } + ]} = zip:table(Bin), + ?assert(Size > 0), + {ok, [{_, LogContent}]} = zip:unzip(Bin, [memory]), + LogEntriesTrailing = string:split(LogContent, "\n", all), + LogEntries = lists:droplast(LogEntriesTrailing), + DecodedLogEntries = [ + begin + ct:pal("LOG ENTRY\n~s\n", [JSONEntry]), + emqx_utils_json:decode(JSONEntry) + end + || JSONEntry <- LogEntries + ], + ?assertMatch( + [ + #{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}}, + #{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}}, + #{ + <<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>} + }, + #{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}}, + #{<<"meta">> := #{<<"str">> := <<"str">>}}, + #{<<"meta">> := #{<<"term">> := <<"{notjson}">>}}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := #{<<"integer">> := 42}}, + #{<<"meta">> := #{<<"float">> := 1.2}}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := <<_/binary>>}, + #{<<"meta">> := #{<<"sub">> := #{}}}, + #{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}}, + #{<<"meta">> := #{<<"true">> := <<"true">>, <<"false">> := <<"false">>}}, + #{ + <<"meta">> := #{ + <<"list">> := #{ + <<"key">> := <<"value">>, + <<"key2">> := <<"value2">> + } + } + }, + #{ + <<"meta">> := #{ + <<"client_ids">> := [<<"a">>, <<"b">>, <<"c">>] + } + }, + #{ + <<"meta">> := #{ + <<"rule_ids">> := [<<"a">>, <<"b">>, <<"c">>] + } + }, + #{ + <<"meta">> := #{ + <<"action_info">> := #{ + <<"type">> := <<"http">>, + <<"name">> := <<"emqx_bridge_http_test_lib">> + } + } + } + | _ + ], + DecodedLogEntries + ), + {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))), + ?assertEqual(<<>>, Delete), + + {ok, List2} = request_api(get, api_path("trace")), + ?assertEqual([], json(List2)), + + ok = emqtt:disconnect(Client), + unload(), + emqx_trace:clear(), + ok. + t_create_failed(_Config) -> load(), Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}], diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index d1fc3081a..1cbcfe0b8 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1147,12 +1147,13 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} -> {error, {unrecoverable_error, unhealthy_target}}; {ok, _Group, Resource} -> - set_rule_id_trace_meta_data(Query), - QueryResult = do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource), - %% do_call_query does not throw an exception as the call to the - %% resource is wrapped in a try catch expression so we will always - %% unset the trace meta data - unset_rule_id_trace_meta_data(), + QueryResult = + try + set_rule_id_trace_meta_data(Query), + do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource) + after + unset_rule_id_trace_meta_data() + end, QueryResult; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 3dfc5f6c8..3872fb973 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -132,7 +132,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> reason => Error, stacktrace => StkTrace }, - warning + error ), {error, {Error, StkTrace}} after @@ -176,18 +176,18 @@ do_apply_rule( {ok, ColumnsAndSelected, FinalCollection} -> case FinalCollection of [] -> - trace_rule_sql("FOREACH_yielded_no_result"), + trace_rule_sql("SQL_yielded_no_result"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'); _ -> trace_rule_sql( - "FOREACH_yielded_result", #{result => FinalCollection}, debug + "SQL_yielded_result", #{result => FinalCollection}, debug ), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') end, NewEnvs = maps:merge(ColumnsAndSelected, Envs), {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]}; false -> - trace_rule_sql("FOREACH_yielded_no_result_no_match"), + trace_rule_sql("SQL_yielded_no_result"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end; @@ -204,11 +204,11 @@ do_apply_rule( ) -> case evaluate_select(Fields, Columns, Conditions) of {ok, Selected} -> - trace_rule_sql("SELECT_yielded_result", #{result => Selected}, debug), + trace_rule_sql("SQL_yielded_result", #{result => Selected}, debug), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))}; false -> - trace_rule_sql("SELECT_yielded_no_result_no_match"), + trace_rule_sql("SQL_yielded_no_result"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end. @@ -392,10 +392,8 @@ handle_action_list(RuleId, Actions, Selected, Envs) -> handle_action(RuleId, ActId, Selected, Envs) -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), - trace_action(ActId, "activating_action"), try Result = do_handle_action(RuleId, ActId, Selected, Envs), - trace_action(ActId, "action_activated", #{result => Result}), Result catch throw:out_of_service -> @@ -467,7 +465,6 @@ do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) - Result = Mod:Func(Selected, Envs, Args), {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), inc_action_metrics(IncCtx, Result), - trace_action(Action, "call_action_function_result", #{result => Result}, debug), Result. do_handle_action_get_trace_inc_metrics_context(RuleID, Action) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index c875617ce..b1e533d31 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -36,7 +36,8 @@ init_per_suite(Config) -> emqx_connector, emqx_bridge, emqx_bridge_http, - emqx_rule_engine + emqx_rule_engine, + emqx_modules ], %% I don't know why we need to stop the apps and then start them but if we %% don't do this and other suites run before this suite the test cases will @@ -128,9 +129,8 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> Bin = read_rule_trace_file(TraceName, TraceType, Now), io:format("THELOG:~n~s", [Bin]), ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) end @@ -171,10 +171,9 @@ create_trace(TraceName, TraceType, TraceValue) -> type => TraceType, TraceType => TraceValue, start_at => Start, - end_at => End + end_at => End, + formatter => json }, - emqx_trace_SUITE:reload(), - ok = emqx_trace:clear(), {ok, _} = emqx_trace:create(Trace). t_apply_rule_test_batch_separation_stop_after_render(_Config) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl index 7f74cc7d7..8b47669da 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl @@ -30,11 +30,11 @@ all() -> init_per_suite(Config) -> application:load(emqx_conf), ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine, emqx_modules]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine, emqx_modules]), ok. t_ctx_pub(_) -> diff --git a/changes/ce/feat-12863.en.md b/changes/ce/feat-12863.en.md new file mode 100644 index 000000000..45bebfbd6 --- /dev/null +++ b/changes/ce/feat-12863.en.md @@ -0,0 +1 @@ +You can now format trace log entries as JSON objects by setting the formatter parameter to "json" when creating the trace pattern. diff --git a/rel/i18n/emqx_mgmt_api_trace.hocon b/rel/i18n/emqx_mgmt_api_trace.hocon index ba07d7d53..13d814c21 100644 --- a/rel/i18n/emqx_mgmt_api_trace.hocon +++ b/rel/i18n/emqx_mgmt_api_trace.hocon @@ -115,4 +115,9 @@ current_trace_offset.desc: current_trace_offset.label: """Offset from the current trace position.""" +trace_log_formatter.desc: +"""The formatter that will be used to format the trace log entries. Set this to text to format the log entries as plain text (default). Set it to json to format each log entry as a JSON object.""" +trace_log_formatter.label: +"""Trace Log Entry Formatter""" + }