diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index 98577d0b9..169e0361b 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -27,6 +27,7 @@ install/3, install/4, install/5, + install/6, uninstall/1, uninstall/2 ]). @@ -70,17 +71,29 @@ Type :: clientid | topic | ip_address, Filter :: emqx_types:clientid() | emqx_types:topic() | string(), Level :: logger:level() | all, - LogFilePath :: string() + LogFilePath :: string(), + Formatter :: plain | json ) -> ok | {error, term()}. -install(Name, Type, Filter, Level, LogFile) -> +install(Name, Type, Filter, Level, LogFile, Formatter) -> Who = #{ type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name), - payload_encode => payload_encode() + payload_encode => payload_encode(), + formatter => Formatter }, install(Who, Level, LogFile). +-spec install( + Name :: binary() | list(), + Type :: clientid | topic | ip_address, + Filter :: emqx_types:clientid() | emqx_types:topic() | string(), + Level :: logger:level() | all, + LogFilePath :: string() +) -> ok | {error, term()}. +install(Name, Type, Filter, Level, LogFile) -> + install(Name, Type, Filter, Level, LogFile, plain). + -spec install( Type :: clientid | topic | ip_address, Filter :: emqx_types:clientid() | emqx_types:topic() | string(), diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index b0548ac04..6f30774a6 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -65,7 +65,7 @@ format_key_value_pairs([{Key, Value} | Rest], PEncode, Acc) -> format_key(Term) -> %% Keys must be strings - String = emqx_logger_textfmt:try_format_unicode(Term), + String = try_format_unicode(Term), escape(String). format_value(Map, PEncode) when is_map(Map) -> @@ -79,9 +79,16 @@ format_value(true, _PEncode) -> format_value(false, _PEncode) -> "false"; format_value(V, _PEncode) -> - String = emqx_logger_textfmt:try_format_unicode(V), + String = try_format_unicode(V), ["\"", escape(String), "\""]. +try_format_unicode(undefined) -> + %% emqx_logger_textfmt:try_format_unicode converts the atom undefined to + %% the atom undefined + "undefined"; +try_format_unicode(V) -> + emqx_logger_textfmt:try_format_unicode(V). + escape(IOList) -> Bin = iolist_to_binary(IOList), List = binary_to_list(Bin), diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 465989f3d..01c121e5b 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -507,21 +507,24 @@ trace(["list"]) -> ) end; trace(["stop", Operation, Filter0]) -> - case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> trace_off(Type, Filter); + case trace_type(Operation, Filter0, plain) of + {ok, Type, Filter, _} -> trace_off(Type, Filter); error -> trace([]) end; trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); trace(["start", Operation, Filter0, LogFile, Level]) -> - case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> + trace(["start", Operation, Filter0, LogFile, Level, plain]); +trace(["start", Operation, Filter0, LogFile, Level, Formatter0]) -> + case trace_type(Operation, Filter0, Formatter0) of + {ok, Type, Filter, Formatter} -> trace_on( name(Filter0), Type, Filter, list_to_existing_atom(Level), - LogFile + LogFile, + Formatter ); error -> trace([]) @@ -529,19 +532,22 @@ trace(["start", Operation, Filter0, LogFile, Level]) -> trace(_) -> emqx_ctl:usage([ {"trace list", "List all traces started on local node"}, - {"trace start client []", "Traces for a client on local node"}, + {"trace start client [] []", + "Traces for a client on local node (Formatter=plain|json)"}, {"trace stop client ", "Stop tracing for a client on local node"}, - {"trace start topic [] ", "Traces for a topic on local node"}, + {"trace start topic [] []", + "Traces for a topic on local node (Formatter=plain|json)"}, {"trace stop topic ", "Stop tracing for a topic on local node"}, - {"trace start ip_address [] ", - "Traces for a client ip on local node"}, + {"trace start ip_address [] []", + "Traces for a client ip on local node (Formatter=plain|json)"}, {"trace stop ip_address ", "Stop tracing for a client ip on local node"}, - {"trace start ruleid [] ", "Traces for a rule ID on local node"}, + {"trace start ruleid [] []", + "Traces for a rule ID on local node (Formatter=plain|json)"}, {"trace stop ruleid ", "Stop tracing for a rule ID on local node"} ]). -trace_on(Name, Type, Filter, Level, LogFile) -> - case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of +trace_on(Name, Type, Filter, Level, LogFile, Formatter) -> + case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile, Formatter) of ok -> emqx_trace:check(), emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]); @@ -592,28 +598,33 @@ traces(["delete", Name]) -> trace_cluster_del(Name); traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter, ?DEFAULT_TRACE_DURATION]); -traces(["start", Name, Operation, Filter0, DurationS]) -> - case trace_type(Operation, Filter0) of - {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); +traces(["start", Name, Operation, Filter, DurationS]) -> + traces(["start", Name, Operation, Filter, DurationS, plain]); +traces(["start", Name, Operation, Filter0, DurationS, Formatter0]) -> + case trace_type(Operation, Filter0, Formatter0) of + {ok, Type, Filter, Formatter} -> trace_cluster_on(Name, Type, Filter, DurationS, Formatter); error -> traces([]) end; traces(_) -> emqx_ctl:usage([ {"traces list", "List all cluster traces started"}, - {"traces start client []", "Traces for a client in cluster"}, - {"traces start topic []", "Traces for a topic in cluster"}, - {"traces start ip_address []", + {"traces start client [] []", + "Traces for a client in cluster (Formatter=plain|json)"}, + {"traces start topic [] []", + "Traces for a topic in cluster (Formatter=plain|json)"}, + {"traces start ruleid [] []", + "Traces for a rule ID in cluster (Formatter=plain|json)"}, + {"traces start ip_address [] []", "Traces for a client IP in cluster\n" "Trace will start immediately on all nodes, including the core and replicant,\n" "and will end after seconds. The default value for is " ?DEFAULT_TRACE_DURATION - " seconds."}, - {"traces start ruleid []", "Traces for a rule ID in cluster"}, + " seconds. (Formatter=plain|json)"}, {"traces stop ", "Stop trace in cluster"}, {"traces delete ", "Delete trace in cluster"} ]). -trace_cluster_on(Name, Type, Filter, DurationS0) -> +trace_cluster_on(Name, Type, Filter, DurationS0, Formatter) -> Now = emqx_trace:now_second(), DurationS = list_to_integer(DurationS0), Trace = #{ @@ -621,7 +632,8 @@ trace_cluster_on(Name, Type, Filter, DurationS0) -> type => Type, Type => bin(Filter), start_at => Now, - end_at => Now + DurationS + end_at => Now + DurationS, + formatter => Formatter }, case emqx_trace:create(Trace) of {ok, _} -> @@ -645,10 +657,12 @@ trace_cluster_off(Name) -> {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error]) end. -trace_type("client", ClientId) -> {ok, clientid, bin(ClientId)}; -trace_type("topic", Topic) -> {ok, topic, bin(Topic)}; -trace_type("ip_address", IP) -> {ok, ip_address, IP}; -trace_type(_, _) -> error. +trace_type(Op, Match, "plain") -> trace_type(Op, Match, plain); +trace_type(Op, Match, "json") -> trace_type(Op, Match, json); +trace_type("client", ClientId, Formatter) -> {ok, clientid, bin(ClientId), Formatter}; +trace_type("topic", Topic, Formatter) -> {ok, topic, bin(Topic), Formatter}; +trace_type("ip_address", IP, Formatter) -> {ok, ip_address, IP, Formatter}; +trace_type(_, _, _) -> error. %%-------------------------------------------------------------------- %% @doc Listeners Command