diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 92c0bb561..66381f4c7 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -229,6 +229,13 @@ best_effort_json_obj(Map, Config) -> do_format_msg("~p", [Map], Config) end. +json_value(true, _Config) -> + true; +json_value(false, _Config) -> + false; +json_value(V, Config) -> + json(V, Config). + json(A, _) when is_atom(A) -> atom_to_binary(A, utf8); json(I, _) when is_integer(I) -> I; json(F, _) when is_float(F) -> F; @@ -317,7 +324,7 @@ json_kv(K0, V, Config) -> K = json_key(K0), case is_map(V) of true -> {K, best_effort_json_obj(V, Config)}; - false -> {K, json(V, Config)} + false -> {K, json_value(V, Config)} end. json_key(A) when is_atom(A) -> json_key(atom_to_binary(A, utf8)); diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 91de65b39..47e1d01b1 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -31,7 +31,8 @@ log/4, rendered_action_template/2, make_rendered_action_template_trace_context/1, - rendered_action_template_with_ctx/2 + rendered_action_template_with_ctx/2, + is_rule_trace_active/0 ]). -export([ @@ -96,6 +97,16 @@ unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> + do_rendered_action_template(ActionID, RenderResult); +rendered_action_template(#{mod := _, func := _} = ActionID, RenderResult) -> + do_rendered_action_template(ActionID, RenderResult); +rendered_action_template(_ActionID, _RenderResult) -> + %% We do nothing if we don't get a valid Action ID. This can happen when + %% called from connectors that are used for actions as well as authz and + %% authn. + ok. + +do_rendered_action_template(ActionID, RenderResult) -> TraceResult = ?TRACE( "QUERY_RENDER", "action_template_rendered", @@ -108,23 +119,25 @@ rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> #{stop_action_after_render := true} -> %% We throw an unrecoverable error to stop action before the %% resource is called/modified - StopMsg = lists:flatten( + ActionIDStr = + case ActionID of + Bin when is_binary(Bin) -> + Bin; + Term -> + ActionIDFormatted = io_lib:format("~tw", [Term]), + unicode:characters_to_binary(ActionIDFormatted) + end, + StopMsg = io_lib:format( "Action ~ts stopped after template rendering due to test setting.", - [ActionID] - ) - ), + [ActionIDStr] + ), MsgBin = unicode:characters_to_binary(StopMsg), error(?EMQX_TRACE_STOP_ACTION(MsgBin)); _ -> ok end, - TraceResult; -rendered_action_template(_ActionID, _RenderResult) -> - %% We do nothing if we don't get a valid Action ID. This can happen when - %% called from connectors that are used for actions as well as authz and - %% authn. - ok. + TraceResult. %% The following two functions are used for connectors that don't do the %% rendering in the main process (the one that called on_*query). In this case @@ -165,6 +178,16 @@ rendered_action_template_with_ctx( logger:set_process_metadata(OldMetaData) end. +is_rule_trace_active() -> + case logger:get_process_metadata() of + #{rule_id := RID} when is_binary(RID) -> + true; + #{rule_ids := RIDs} when map_size(RIDs) > 0 -> + true; + _ -> + false + end. + log(List, Msg, Meta) -> log(debug, List, Msg, Meta). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 8f748ed9f..6cd0e1684 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -57,12 +57,46 @@ prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) -> _:_ -> {K, V} end; -prepare_key_value(K, {ok, Status, Headers, Body}, PEncode) when - is_integer(Status), is_list(Headers), is_binary(Body) +prepare_key_value(K, {ok, {Formatter, V}}, PEncode) when is_function(Formatter, 1) -> + %% Unwrap + prepare_key_value(K, {Formatter, V}, PEncode); +prepare_key_value(host, {I1, I2, I3, I4}, _PEncode) when + is_integer(I1), + is_integer(I2), + is_integer(I3), + is_integer(I4) -> - %% This is unlikely anything else then info about a HTTP request so we make - %% it more structured - prepare_key_value(K, #{status => Status, headers => Headers, body => Body}, PEncode); + %% We assume this is an IP address + {host, + unicode:characters_to_binary([ + integer_to_binary(I1), + <<".">>, + integer_to_binary(I2), + <<".">>, + integer_to_binary(I3), + <<".">>, + integer_to_binary(I4) + ])}; +prepare_key_value(K, {ok, StatusCode, Headers}, PEncode) when + is_integer(StatusCode), StatusCode >= 200, StatusCode < 300, is_list(Headers) +-> + prepare_key_value(K, {ok, StatusCode, Headers, <<"">>}, PEncode); +prepare_key_value(K, {ok, StatusCode, Headers, Body}, PEncode) when + is_integer(StatusCode), StatusCode >= 200, StatusCode < 300, is_list(Headers) +-> + %% We assume this is that response of an HTTP request + prepare_key_value( + K, + #{ + result => ok, + response => #{ + status => StatusCode, + headers => Headers, + body => Body + } + }, + PEncode + ); prepare_key_value(payload = K, V, PEncode) -> NewV = try @@ -137,6 +171,8 @@ format_map_set_to_list(Map) -> ], lists:sort(Items). +format_action_info(#{mod := _Mod, func := _Func} = FuncCall) -> + FuncCall; format_action_info(V) -> [<<"action">>, Type, Name | _] = binary:split(V, <<":">>, [global]), #{ diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index ef9c6c70d..3f87d88ed 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -232,6 +232,7 @@ on_start( port => Port, connect_timeout => ConnectTimeout, base_path => BasePath, + scheme => Scheme, request => preprocess_request(maps:get(request, Config, undefined)) }, case start_pool(InstId, PoolOpts) of @@ -359,7 +360,7 @@ on_query(InstId, {Method, Request, Timeout}, State) -> on_query( InstId, {ActionId, KeyOrNum, Method, Request, Timeout, Retry}, - #{base_path := BasePath, host := Host} = State + #{base_path := BasePath, host := Host, scheme := Scheme, port := Port} = State ) -> ?TRACE( "QUERY", @@ -373,7 +374,7 @@ on_query( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -413,6 +414,24 @@ on_query( Result end. +maybe_trace_format_result(Res) -> + %% If rule tracing is active, then we know that the connector is used by an + %% action and that the result is used for tracing only. This is why we can + %% add a function to lazily format the trace entry. + case emqx_trace:is_rule_trace_active() of + true -> + {ok, {fun trace_format_result/1, Res}}; + false -> + Res + end. + +trace_format_result({ok, Status, Headers, Body}) -> + #{status => Status, headers => Headers, body => Body}; +trace_format_result({ok, Status, Headers}) -> + #{status => Status, headers => Headers}; +trace_format_result(Result) -> + Result. + %% BridgeV1 entrypoint on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> case maps:get(request, State, undefined) of @@ -469,7 +488,7 @@ on_query_async( InstId, {ActionId, KeyOrNum, Method, Request, Timeout}, ReplyFunAndArgs, - #{base_path := BasePath, host := Host} = State + #{base_path := BasePath, host := Host, port := Port, scheme := Scheme} = State ) -> Worker = resolve_pool_worker(State, KeyOrNum), ?TRACE( @@ -483,7 +502,7 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -492,7 +511,8 @@ on_query_async( key_or_num => KeyOrNum, method => Method, request => NRequest, - timeout => Timeout + timeout => Timeout, + trace_metadata => logger:get_process_metadata() }, ok = ehttpc:request_async( Worker, @@ -503,17 +523,19 @@ on_query_async( ), {ok, Worker}. -trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> +trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout) -> case NRequest of {Path, Headers} -> emqx_trace:rendered_action_template( ActionId, #{ host => Host, + port => Port, path => Path, method => Method, headers => {fun emqx_utils_redact:redact_headers/1, Headers}, - timeout => Timeout + timeout => Timeout, + url => {fun render_url/1, {Scheme, Host, Port, Path}} } ); {Path, Headers, Body} -> @@ -521,15 +543,33 @@ trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> ActionId, #{ host => Host, + port => Port, path => Path, method => Method, headers => {fun emqx_utils_redact:redact_headers/1, Headers}, timeout => Timeout, - body => {fun log_format_body/1, Body} + body => {fun log_format_body/1, Body}, + url => {fun render_url/1, {Scheme, Host, Port, Path}} } ) end. +render_url({Scheme, Host, Port, Path}) -> + SchemeStr = + case Scheme of + http -> + <<"http://">>; + https -> + <<"https://">> + end, + unicode:characters_to_binary([ + SchemeStr, + Host, + <<":">>, + erlang:integer_to_binary(Port), + Path + ]). + log_format_body(Body) -> unicode:characters_to_binary(Body). @@ -807,9 +847,15 @@ to_bin(Str) when is_list(Str) -> to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). -reply_delegator(Context, ReplyFunAndArgs, Result0) -> +reply_delegator( + #{trace_metadata := TraceMetadata} = Context, + ReplyFunAndArgs, + Result0 +) -> spawn(fun() -> + logger:set_process_metadata(TraceMetadata), Result = transform_result(Result0), + logger:unset_process_metadata(), maybe_retry(Result, Context, ReplyFunAndArgs) end). @@ -831,9 +877,9 @@ transform_result(Result) -> {error, _Reason} -> Result; {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> - Result; + maybe_trace_format_result(Result); {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> - Result; + maybe_trace_format_result(Result); {ok, _TooManyRequests = StatusCode = 429, Headers} -> {error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}}; {ok, _ServiceUnavailable = StatusCode = 503, Headers} -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 3f7c4897c..78805de80 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -125,11 +125,27 @@ on_query( redis_bridge_connector_send_done, #{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result} ), - Result; + maybe_trace_format_result(Result); Error -> Error end. +maybe_trace_format_result(Res) -> + %% If rule tracing is active, then we know that the connector is used by an + %% action and that the result is used for tracing only. This is why we can + %% add a function to lazily format the trace entry. + case emqx_trace:is_rule_trace_active() of + true -> + {ok, {fun trace_format_result/1, Res}}; + false -> + Res + end. + +trace_format_result({ok, Msg}) -> + #{result => ok, message => Msg}; +trace_format_result(Res) -> + Res. + on_batch_query( InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt} ) -> @@ -156,7 +172,7 @@ on_batch_query( result => Result } ), - Result; + maybe_trace_format_result(Result); Error -> Error end. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index d135a087a..dbe57fbe8 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -320,7 +320,7 @@ run_simple_upload( emqx_trace:rendered_action_template(ChannelID, #{ bucket => Bucket, key => Key, - content => Content + content => {fun unicode:characters_to_binary/1, Content} }), case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of ok -> diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 761c9c0f6..36b1f57de 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -693,6 +693,11 @@ handle_result({error, Error}) -> TranslatedError = translate_to_log_context(Error), {error, {unrecoverable_error, export_error(TranslatedError)}}; handle_result(Res) -> + maybe_trace_format_result(Res). + +trace_format_result({ok, Cnt}) when is_integer(Cnt) -> + #{result => ok, affected_rows => Cnt}; +trace_format_result(Res) -> Res. handle_batch_result([{ok, Count} | Rest], Acc) -> @@ -701,7 +706,18 @@ handle_batch_result([{error, Error} | _Rest], _Acc) -> TranslatedError = translate_to_log_context(Error), {error, {unrecoverable_error, export_error(TranslatedError)}}; handle_batch_result([], Acc) -> - {ok, Acc}. + maybe_trace_format_result({ok, Acc}). + +maybe_trace_format_result(Res) -> + %% If rule tracing is active, then we know that the connector is used by an + %% action and that the result is used for tracing only. This is why we can + %% add a function to lazily format the trace entry. + case emqx_trace:is_rule_trace_active() of + true -> + {ok, {fun trace_format_result/1, Res}}; + false -> + Res + end. translate_to_log_context({error, Reason}) -> translate_to_log_context(Reason); diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 4d57d4c2a..dcd9024ef 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -104,6 +104,20 @@ pre_process_action_args(_, Args) -> %%-------------------------------------------------------------------- -spec console(map(), map(), map()) -> any(). console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) -> + case logger:get_process_metadata() of + #{action_id := ActionID} -> + emqx_trace:rendered_action_template( + ActionID, + #{ + selected => Selected, + environment => Envs + } + ); + _ -> + %% We may not have an action ID in the metadata if this is called + %% from a test case or similar + ok + end, ?ULOG( "[rule action] ~ts~n" "\tAction Data: ~p~n" @@ -149,15 +163,24 @@ republish( PubProps0 = render_pub_props(UserPropertiesTemplate, Selected, Env), MQTTProps = render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env), PubProps = maps:merge(PubProps0, MQTTProps), + TraceInfo = #{ + flags => Flags, + topic => Topic, + payload => Payload, + pub_props => PubProps + }, + case logger:get_process_metadata() of + #{action_id := ActionID} -> + emqx_trace:rendered_action_template(ActionID, TraceInfo); + _ -> + %% We may not have an action ID in the metadata if this is called + %% from a test case or similar + ok + end, ?TRACE( "RULE", "republish_message", - #{ - flags => Flags, - topic => Topic, - payload => Payload, - pub_props => PubProps - } + TraceInfo ), safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 5ec4bdc6e..1bf1baf3a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -415,6 +415,15 @@ handle_action(RuleId, ActId, Selected, Envs) -> rule_metrics, RuleId, 'actions.failed.out_of_service' ), trace_action(ActId, "out_of_service", #{}, warning); + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> + ?EMQX_TRACE_STOP_ACTION(Explanation) = Reason, + trace_action( + ActId, + "action_stopped_after_template_rendering", + #{reason => Explanation} + ), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); Err:Reason:ST -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'), @@ -475,7 +484,18 @@ do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) - trace_action(Action, "call_action_function"), %% the function can also throw 'out_of_service' Args = maps:get(args, Action, []), - Result = Mod:Func(Selected, Envs, Args), + PrevProcessMetadata = + case logger:get_process_metadata() of + undefined -> #{}; + D -> D + end, + Result = + try + logger:update_process_metadata(#{action_id => Action}), + Mod:Func(Selected, Envs, Args) + after + logger:set_process_metadata(PrevProcessMetadata) + end, {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), inc_action_metrics(IncCtx, Result), Result.