diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 27dd8b6c8..7630fbc56 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -35,6 +35,11 @@ end_at :: integer() | undefined | '_' }). +-record(emqx_trace_format_func_data, { + function :: fun((any()) -> any()), + data :: any() +}). + -define(SHARD, ?COMMON_SHARD). -define(MAX_SIZE, 30). diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 92c0bb561..856b6111c 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -229,7 +229,7 @@ best_effort_json_obj(Map, Config) -> do_format_msg("~p", [Map], Config) end. -json(A, _) when is_atom(A) -> atom_to_binary(A, utf8); +json(A, _) when is_atom(A) -> A; json(I, _) when is_integer(I) -> I; json(F, _) when is_float(F) -> F; json(P, C) when is_pid(P) -> json(pid_to_list(P), C); diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 91de65b39..3e8f36890 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -31,7 +31,8 @@ log/4, rendered_action_template/2, make_rendered_action_template_trace_context/1, - rendered_action_template_with_ctx/2 + rendered_action_template_with_ctx/2, + is_rule_trace_active/0 ]). -export([ @@ -96,6 +97,16 @@ unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> + do_rendered_action_template(ActionID, RenderResult); +rendered_action_template(#{mod := _, func := _} = ActionID, RenderResult) -> + do_rendered_action_template(ActionID, RenderResult); +rendered_action_template(_ActionID, _RenderResult) -> + %% We do nothing if we don't get a valid Action ID. This can happen when + %% called from connectors that are used for actions as well as authz and + %% authn. + ok. + +do_rendered_action_template(ActionID, RenderResult) -> TraceResult = ?TRACE( "QUERY_RENDER", "action_template_rendered", @@ -108,23 +119,25 @@ rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> #{stop_action_after_render := true} -> %% We throw an unrecoverable error to stop action before the %% resource is called/modified - StopMsg = lists:flatten( + ActionIDStr = + case ActionID of + Bin when is_binary(Bin) -> + Bin; + Term -> + ActionIDFormatted = io_lib:format("~tw", [Term]), + unicode:characters_to_binary(ActionIDFormatted) + end, + StopMsg = io_lib:format( "Action ~ts stopped after template rendering due to test setting.", - [ActionID] - ) - ), + [ActionIDStr] + ), MsgBin = unicode:characters_to_binary(StopMsg), error(?EMQX_TRACE_STOP_ACTION(MsgBin)); _ -> ok end, - TraceResult; -rendered_action_template(_ActionID, _RenderResult) -> - %% We do nothing if we don't get a valid Action ID. This can happen when - %% called from connectors that are used for actions as well as authz and - %% authn. - ok. + TraceResult. %% The following two functions are used for connectors that don't do the %% rendering in the main process (the one that called on_*query). In this case @@ -165,6 +178,16 @@ rendered_action_template_with_ctx( logger:set_process_metadata(OldMetaData) end. +is_rule_trace_active() -> + case logger:get_process_metadata() of + #{rule_id := RID} when is_binary(RID) -> + true; + #{rule_ids := RIDs} when map_size(RIDs) > 0 -> + true; + _ -> + false + end. + log(List, Msg, Meta) -> log(debug, List, Msg, Meta). @@ -382,7 +405,14 @@ code_change(_, State, _Extra) -> {ok, State}. insert_new_trace(Trace) -> - transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]). + case transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]) of + {error, _} = Error -> + Error; + Res -> + %% We call this to ensure the trace is active when we return + check(), + Res + end. update_trace(Traces) -> Now = now_second(), diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index 6cbcc0510..1e40c6849 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -15,9 +15,11 @@ %%-------------------------------------------------------------------- -module(emqx_trace_formatter). -include("emqx_mqtt.hrl"). +-include("emqx_trace.hrl"). -export([format/2]). -export([format_meta_map/1]). +-export([evaluate_lazy_values/1]). %% logger_formatter:config/0 is not exported. -type config() :: map(). @@ -28,18 +30,35 @@ LogEvent :: logger:log_event(), Config :: config(). format( - #{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg}, + #{level := debug, meta := Meta0 = #{trace_tag := Tag}, msg := Msg}, #{payload_encode := PEncode} ) -> + Meta1 = evaluate_lazy_values(Meta0), Time = emqx_utils_calendar:now_to_rfc3339(microsecond), - ClientId = to_iolist(maps:get(clientid, Meta, "")), - Peername = maps:get(peername, Meta, ""), - MetaBin = format_meta(Meta, PEncode), + ClientId = to_iolist(maps:get(clientid, Meta1, "")), + Peername = maps:get(peername, Meta1, ""), + MetaBin = format_meta(Meta1, PEncode), Msg1 = to_iolist(Msg), Tag1 = to_iolist(Tag), [Time, " [", Tag1, "] ", ClientId, "@", Peername, " msg: ", Msg1, ", ", MetaBin, "\n"]; format(Event, Config) -> - emqx_logger_textfmt:format(Event, Config). + emqx_logger_textfmt:format(evaluate_lazy_values(Event), Config). + +evaluate_lazy_values(Map) when is_map(Map) -> + maps:map(fun evaluate_lazy_values_kv/2, Map); +evaluate_lazy_values(V) -> + V. + +evaluate_lazy_values_kv(_K, #emqx_trace_format_func_data{function = Formatter, data = V}) -> + try + NewV = Formatter(V), + evaluate_lazy_values(NewV) + catch + _:_ -> + V + end; +evaluate_lazy_values_kv(_K, V) -> + evaluate_lazy_values(V). format_meta_map(Meta) -> Encode = emqx_trace_handler:payload_encode(), diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 8f748ed9f..f07dc8c83 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -16,6 +16,7 @@ -module(emqx_trace_json_formatter). -include("emqx_mqtt.hrl"). +-include("emqx_trace.hrl"). -export([format/2]). @@ -30,15 +31,16 @@ LogEvent :: logger:log_event(), Config :: config(). format( - LogMap, + LogMap0, #{payload_encode := PEncode} ) -> + LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0), %% We just make some basic transformations on the input LogMap and then do %% an external call to create the JSON text Time = emqx_utils_calendar:now_to_rfc3339(microsecond), - LogMap1 = LogMap#{time => Time}, - LogMap2 = prepare_log_map(LogMap1, PEncode), - [emqx_logger_jsonfmt:best_effort_json(LogMap2, [force_utf8]), "\n"]. + LogMap2 = LogMap1#{time => Time}, + LogMap3 = prepare_log_map(LogMap2, PEncode), + [emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"]. %%%----------------------------------------------------------------- %%% Helper Functions @@ -48,21 +50,26 @@ prepare_log_map(LogMap, PEncode) -> NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], maps:from_list(NewKeyValuePairs). -prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) -> - %% A cusom formatter is provided with the value - try - NewV = Formatter(V), - prepare_key_value(K, NewV, PEncode) - catch - _:_ -> - {K, V} - end; -prepare_key_value(K, {ok, Status, Headers, Body}, PEncode) when - is_integer(Status), is_list(Headers), is_binary(Body) +prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when + is_integer(I1), + is_integer(I2), + is_integer(I3), + is_integer(I4) -> - %% This is unlikely anything else then info about a HTTP request so we make - %% it more structured - prepare_key_value(K, #{status => Status, headers => Headers, body => Body}, PEncode); + %% We assume this is an IP address + {host, unicode:characters_to_binary(inet:ntoa(IP))}; +prepare_key_value(host, {I1, I2, I3, I4, I5, I6, I7, I8} = IP, _PEncode) when + is_integer(I1), + is_integer(I2), + is_integer(I3), + is_integer(I4), + is_integer(I5), + is_integer(I6), + is_integer(I7), + is_integer(I8) +-> + %% We assume this is an IP address + {host, unicode:characters_to_binary(inet:ntoa(IP))}; prepare_key_value(payload = K, V, PEncode) -> NewV = try @@ -137,6 +144,8 @@ format_map_set_to_list(Map) -> ], lists:sort(Items). +format_action_info(#{mod := _Mod, func := _Func} = FuncCall) -> + FuncCall; format_action_info(V) -> [<<"action">>, Type, Name | _] = binary:split(V, <<":">>, [global]), #{ diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 66fbb5d79..2803c9e53 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -252,11 +252,14 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config) -> create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> BridgeName = ?config(bridge_name, Config), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + create_rule_and_action(BridgeId, RuleTopic, Opts). + +create_rule_and_action(Action, RuleTopic, Opts) -> SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>), Params = #{ enable => true, sql => SQL, - actions => [BridgeId] + actions => [Action] }, Path = emqx_mgmt_api_test_util:api_path(["rules"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index f257ae389..cc37c8dd0 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -27,6 +27,8 @@ -export([execute/2]). -endif. +-include_lib("emqx/include/emqx_trace.hrl"). + %%%=================================================================== %%% API %%%=================================================================== @@ -107,7 +109,10 @@ do_query(Table, Query0, Templates, TraceRenderedCTX) -> Query = apply_template(Query0, Templates), emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ table => Table, - query => {fun trace_format_query/1, Query} + query => #emqx_trace_format_func_data{ + function = fun trace_format_query/1, + data = Query + } }), execute(Query, Table) catch diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index ef9c6c70d..f639596b6 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -20,6 +20,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -behaviour(emqx_resource). @@ -35,7 +36,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([reply_delegator/3]). @@ -232,6 +234,7 @@ on_start( port => Port, connect_timeout => ConnectTimeout, base_path => BasePath, + scheme => Scheme, request => preprocess_request(maps:get(request, Config, undefined)) }, case start_pool(InstId, PoolOpts) of @@ -359,7 +362,7 @@ on_query(InstId, {Method, Request, Timeout}, State) -> on_query( InstId, {ActionId, KeyOrNum, Method, Request, Timeout, Retry}, - #{base_path := BasePath, host := Host} = State + #{base_path := BasePath, host := Host, scheme := Scheme, port := Port} = State ) -> ?TRACE( "QUERY", @@ -373,7 +376,7 @@ on_query( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -469,7 +472,7 @@ on_query_async( InstId, {ActionId, KeyOrNum, Method, Request, Timeout}, ReplyFunAndArgs, - #{base_path := BasePath, host := Host} = State + #{base_path := BasePath, host := Host, port := Port, scheme := Scheme} = State ) -> Worker = resolve_pool_worker(State, KeyOrNum), ?TRACE( @@ -483,7 +486,7 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -492,7 +495,8 @@ on_query_async( key_or_num => KeyOrNum, method => Method, request => NRequest, - timeout => Timeout + timeout => Timeout, + trace_metadata => logger:get_process_metadata() }, ok = ehttpc:request_async( Worker, @@ -503,17 +507,25 @@ on_query_async( ), {ok, Worker}. -trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> +trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout) -> case NRequest of {Path, Headers} -> emqx_trace:rendered_action_template( ActionId, #{ host => Host, + port => Port, path => Path, method => Method, - headers => {fun emqx_utils_redact:redact_headers/1, Headers}, - timeout => Timeout + headers => #emqx_trace_format_func_data{ + function = fun emqx_utils_redact:redact_headers/1, + data = Headers + }, + timeout => Timeout, + url => #emqx_trace_format_func_data{ + function = fun render_url/1, + data = {Scheme, Host, Port, Path} + } } ); {Path, Headers, Body} -> @@ -521,15 +533,42 @@ trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> ActionId, #{ host => Host, + port => Port, path => Path, method => Method, - headers => {fun emqx_utils_redact:redact_headers/1, Headers}, + headers => #emqx_trace_format_func_data{ + function = fun emqx_utils_redact:redact_headers/1, + data = Headers + }, timeout => Timeout, - body => {fun log_format_body/1, Body} + body => #emqx_trace_format_func_data{ + function = fun log_format_body/1, + data = Body + }, + url => #emqx_trace_format_func_data{ + function = fun render_url/1, + data = {Scheme, Host, Port, Path} + } } ) end. +render_url({Scheme, Host, Port, Path}) -> + SchemeStr = + case Scheme of + http -> + <<"http://">>; + https -> + <<"https://">> + end, + unicode:characters_to_binary([ + SchemeStr, + Host, + <<":">>, + erlang:integer_to_binary(Port), + Path + ]). + log_format_body(Body) -> unicode:characters_to_binary(Body). @@ -605,6 +644,26 @@ on_get_channel_status( %% XXX: Reuse the connector status on_get_status(InstId, State). +on_format_query_result({ok, Status, Headers, Body}) -> + #{ + result => ok, + response => #{ + status => Status, + headers => Headers, + body => Body + } + }; +on_format_query_result({ok, Status, Headers}) -> + #{ + result => ok, + response => #{ + status => Status, + headers => Headers + } + }; +on_format_query_result(Result) -> + Result. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -807,9 +866,15 @@ to_bin(Str) when is_list(Str) -> to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). -reply_delegator(Context, ReplyFunAndArgs, Result0) -> +reply_delegator( + #{trace_metadata := TraceMetadata} = Context, + ReplyFunAndArgs, + Result0 +) -> spawn(fun() -> + logger:set_process_metadata(TraceMetadata), Result = transform_result(Result0), + logger:unset_process_metadata(), maybe_retry(Result, Context, ReplyFunAndArgs) end). diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 3f7c4897c..28f7c6f8e 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -20,7 +20,8 @@ on_query/3, on_batch_query/3, on_get_status/2, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). %% ------------------------------------------------------------------------------------------------- @@ -161,6 +162,11 @@ on_batch_query( Error end. +on_format_query_result({ok, Msg}) -> + #{result => ok, message => Msg}; +on_format_query_result(Res) -> + Res. + %% ------------------------------------------------------------------------------------------------- %% private helpers %% ------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index d135a087a..83d5c47a9 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -7,6 +7,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include("emqx_bridge_s3.hrl"). -behaviour(emqx_resource). @@ -320,7 +321,10 @@ run_simple_upload( emqx_trace:rendered_action_template(ChannelID, #{ bucket => Bucket, key => Key, - content => Content + content => #emqx_trace_format_func_data{ + function = fun unicode:characters_to_binary/1, + data = Content + } }), case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of ok -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index c5f5c475d..4daf1c51a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -290,54 +290,125 @@ t_http_test_json_formatter(_Config) -> end || JSONEntry <- LogEntries ], + ListIterFun = + fun + ListIterFunRec([]) -> + ok; + ListIterFunRec([Item | Rest]) -> + receive + From -> + From ! {list_iter_item, Item} + end, + ListIterFunRec(Rest) + end, + ListIter = spawn_link(fun() -> ListIterFun(DecodedLogEntries) end), + NextFun = + fun() -> + ListIter ! self(), + receive + {list_iter_item, Item} -> + Item + end + end, ?assertMatch( - [ - #{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}}, - #{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}}, - #{ - <<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>} - }, - #{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}}, - #{<<"meta">> := #{<<"str">> := <<"str">>}}, - #{<<"meta">> := #{<<"term">> := <<"{notjson}">>}}, - #{<<"meta">> := <<_/binary>>}, - #{<<"meta">> := #{<<"integer">> := 42}}, - #{<<"meta">> := #{<<"float">> := 1.2}}, - #{<<"meta">> := <<_/binary>>}, - #{<<"meta">> := <<_/binary>>}, - #{<<"meta">> := <<_/binary>>}, - #{<<"meta">> := #{<<"sub">> := #{}}}, - #{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}}, - #{<<"meta">> := #{<<"true">> := <<"true">>, <<"false">> := <<"false">>}}, - #{ - <<"meta">> := #{ - <<"list">> := #{ - <<"key">> := <<"value">>, - <<"key2">> := <<"value2">> - } - } - }, - #{ - <<"meta">> := #{ - <<"client_ids">> := [<<"a">>, <<"b">>, <<"c">>] - } - }, - #{ - <<"meta">> := #{ - <<"rule_ids">> := [<<"a">>, <<"b">>, <<"c">>] - } - }, - #{ - <<"meta">> := #{ - <<"action_info">> := #{ - <<"type">> := <<"http">>, - <<"name">> := <<"emqx_bridge_http_test_lib">> - } + #{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}}, + NextFun() + ), + ?assertMatch( + #{ + <<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>} + }, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"str">> := <<"str">>}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"term">> := <<"{notjson}">>}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := <<_/binary>>}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"integer">> := 42}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"float">> := 1.2}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := <<_/binary>>}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := <<_/binary>>}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := <<_/binary>>}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"sub">> := #{}}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}}, + NextFun() + ), + ?assertMatch( + #{<<"meta">> := #{<<"true">> := true, <<"false">> := false}}, + NextFun() + ), + ?assertMatch( + #{ + <<"meta">> := #{ + <<"list">> := #{ + <<"key">> := <<"value">>, + <<"key2">> := <<"value2">> } } - | _ - ], - DecodedLogEntries + }, + NextFun() + ), + ?assertMatch( + #{ + <<"meta">> := #{ + <<"client_ids">> := [<<"a">>, <<"b">>, <<"c">>] + } + }, + NextFun() + ), + ?assertMatch( + #{ + <<"meta">> := #{ + <<"rule_ids">> := [<<"a">>, <<"b">>, <<"c">>] + } + }, + NextFun() + ), + ?assertMatch( + #{ + <<"meta">> := #{ + <<"action_info">> := #{ + <<"type">> := <<"http">>, + <<"name">> := <<"emqx_bridge_http_test_lib">> + } + } + }, + NextFun() ), {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))), ?assertEqual(<<>>, Delete), @@ -495,7 +566,7 @@ create_trace(Name, Type, TypeValue, Start) -> ?block_until(#{?snk_kind := update_trace_done}) end, fun(Trace) -> - ?assertMatch([#{}], ?of_kind(update_trace_done, Trace)) + ?assertMatch([#{} | _], ?of_kind(update_trace_done, Trace)) end ). diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 761c9c0f6..ad674a07c 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -38,7 +38,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([connect/1]). @@ -695,6 +696,11 @@ handle_result({error, Error}) -> handle_result(Res) -> Res. +on_format_query_result({ok, Cnt}) when is_integer(Cnt) -> + #{result => ok, affected_rows => Cnt}; +on_format_query_result(Res) -> + Res. + handle_batch_result([{ok, Count} | Rest], Acc) -> handle_batch_result(Rest, Acc + Count); handle_batch_result([{error, Error} | _Rest], _Acc) -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index eba0d33af..8340bf589 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -86,7 +86,9 @@ forget_allocated_resources/1, deallocate_resource/2, %% Get channel config from resource - call_get_channel_config/3 + call_get_channel_config/3, + % Call the format query result function + call_format_query_result/2 ]). %% Direct calls to the callback module @@ -154,7 +156,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - query_mode/1 + query_mode/1, + on_format_query_result/1 ]). %% when calling emqx_resource:start/1 @@ -230,6 +233,14 @@ ResId :: term() ) -> [term()]. +%% When given the result of a on_*query call this function should return a +%% version of the result that is suitable for JSON trace logging. This +%% typically means converting Erlang tuples to maps with appropriate names for +%% the values in the tuple. +-callback on_format_query_result( + QueryResult :: term() +) -> term(). + -define(SAFE_CALL(EXPR), (fun() -> try @@ -551,6 +562,14 @@ call_get_channel_config(ResId, ChannelId, Mod) -> <<"on_get_channels callback function not available for resource id", ResId/binary>>} end. +call_format_query_result(Mod, Result) -> + case erlang:function_exported(Mod, on_format_query_result, 1) of + true -> + Mod:on_format_query_result(Result); + false -> + Result + end. + -spec call_stop(resource_id(), module(), resource_state()) -> term(). call_stop(ResId, Mod, ResourceState) -> ?SAFE_CALL(begin diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 4d57d4c2a..dcd9024ef 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -104,6 +104,20 @@ pre_process_action_args(_, Args) -> %%-------------------------------------------------------------------- -spec console(map(), map(), map()) -> any(). console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) -> + case logger:get_process_metadata() of + #{action_id := ActionID} -> + emqx_trace:rendered_action_template( + ActionID, + #{ + selected => Selected, + environment => Envs + } + ); + _ -> + %% We may not have an action ID in the metadata if this is called + %% from a test case or similar + ok + end, ?ULOG( "[rule action] ~ts~n" "\tAction Data: ~p~n" @@ -149,15 +163,24 @@ republish( PubProps0 = render_pub_props(UserPropertiesTemplate, Selected, Env), MQTTProps = render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env), PubProps = maps:merge(PubProps0, MQTTProps), + TraceInfo = #{ + flags => Flags, + topic => Topic, + payload => Payload, + pub_props => PubProps + }, + case logger:get_process_metadata() of + #{action_id := ActionID} -> + emqx_trace:rendered_action_template(ActionID, TraceInfo); + _ -> + %% We may not have an action ID in the metadata if this is called + %% from a test case or similar + ok + end, ?TRACE( "RULE", "republish_message", - #{ - flags => Flags, - topic => Topic, - payload => Payload, - pub_props => PubProps - } + TraceInfo ), safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index d203dd915..3dd8048d7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -274,6 +274,7 @@ schema("/rules/:id/test") -> responses => #{ 400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 412 => error_schema('NOT_MATCH', "SQL Not Match"), + 404 => error_schema('RULE_NOT_FOUND', "The rule could not be found"), 200 => <<"Rule Applied">> } } @@ -419,11 +420,13 @@ param_path_id() -> begin case emqx_rule_sqltester:apply_rule(RuleId, CheckedParams) of {ok, Result} -> - {200, Result}; + {200, emqx_logger_jsonfmt:best_effort_json_obj(Result)}; {error, {parse_error, Reason}} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}; + {error, rule_not_found} -> + {404, #{code => 'RULE_NOT_FOUND', message => <<"The rule could not be found">>}}; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 5ec4bdc6e..41be864c5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -415,6 +415,15 @@ handle_action(RuleId, ActId, Selected, Envs) -> rule_metrics, RuleId, 'actions.failed.out_of_service' ), trace_action(ActId, "out_of_service", #{}, warning); + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> + ?EMQX_TRACE_STOP_ACTION(Explanation) = Reason, + trace_action( + ActId, + "action_stopped_after_template_rendering", + #{reason => Explanation} + ), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); Err:Reason:ST -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'), @@ -475,7 +484,18 @@ do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) - trace_action(Action, "call_action_function"), %% the function can also throw 'out_of_service' Args = maps:get(args, Action, []), - Result = Mod:Func(Selected, Envs, Args), + PrevProcessMetadata = + case logger:get_process_metadata() of + undefined -> #{}; + D -> D + end, + Result = + try + logger:update_process_metadata(#{action_id => Action}), + Mod:Func(Selected, Envs, Args) + after + logger:set_process_metadata(PrevProcessMetadata) + end, {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), inc_action_metrics(IncCtx, Result), Result. @@ -747,21 +767,49 @@ do_inc_action_metrics( {error, {unrecoverable_error, _} = Reason} ) -> TraceContext1 = maps:remove(action_id, TraceContext), - trace_action(ActId, "action_failed", maps:merge(#{reason => Reason}, TraceContext1)), + FormatterRes = #emqx_trace_format_func_data{ + function = fun trace_formatted_result/1, + data = {ActId, Reason} + }, + trace_action(ActId, "action_failed", maps:merge(#{reason => FormatterRes}, TraceContext1)), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R) -> TraceContext1 = maps:remove(action_id, TraceContext), + FormatterRes = #emqx_trace_format_func_data{ + function = fun trace_formatted_result/1, + data = {ActId, R} + }, case is_ok_result(R) of false -> - trace_action(ActId, "action_failed", maps:merge(#{reason => R}, TraceContext1)), + trace_action( + ActId, + "action_failed", + maps:merge(#{reason => FormatterRes}, TraceContext1) + ), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); true -> - trace_action(ActId, "action_success", maps:merge(#{result => R}, TraceContext1)), + trace_action( + ActId, + "action_success", + maps:merge(#{result => FormatterRes}, TraceContext1) + ), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success') end. +trace_formatted_result({{bridge_v2, Type, _Name}, R}) -> + ConnectorType = emqx_action_info:action_type_to_connector_type(Type), + ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), + emqx_resource:call_format_query_result(ResourceModule, R); +trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) -> + BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType), + ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type), + ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), + emqx_resource:call_format_query_result(ResourceModule, R); +trace_formatted_result({_, R}) -> + R. + is_ok_result(ok) -> true; is_ok_result({async_return, R}) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 83f29eef3..6d393c24a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -26,12 +26,22 @@ apply_rule( RuleId, + Parameters +) -> + case emqx_rule_engine:get_rule(RuleId) of + {ok, Rule} -> + do_apply_rule(Rule, Parameters); + not_found -> + {error, rule_not_found} + end. + +do_apply_rule( + Rule, #{ context := Context, stop_action_after_template_rendering := StopAfterRender } ) -> - {ok, Rule} = emqx_rule_engine:get_rule(RuleId), InTopic = get_in_topic(Context), EventTopics = maps:get(from, Rule, []), case lists:all(fun is_publish_topic/1, EventTopics) of diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index 52fa1a2e5..c11b40b23 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -26,7 +26,24 @@ -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>). all() -> - emqx_common_test_helpers:all(?MODULE). + [ + emqx_common_test_helpers:all(?MODULE), + {group, republish}, + {group, console_print} + ]. + +groups() -> + [ + {republish, [], basic_tests()}, + {console_print, [], basic_tests()} + ]. + +basic_tests() -> + [ + t_basic_apply_rule_trace_ruleid, + t_basic_apply_rule_trace_clientid, + t_basic_apply_rule_trace_ruleid_stop_after_render + ]. init_per_suite(Config) -> application:load(emqx_conf), @@ -50,6 +67,12 @@ init_per_suite(Config) -> emqx_mgmt_api_test_util:init_suite(), [{apps, Apps} | Config]. +init_per_group(GroupName, Config) -> + [{group_name, GroupName} | Config]. + +end_per_group(_GroupName, Config) -> + Config. + end_per_suite(Config) -> Apps = ?config(apps, Config), emqx_mgmt_api_test_util:end_suite(), @@ -67,28 +90,58 @@ end_per_testcase(_TestCase, _Config) -> ok. t_basic_apply_rule_trace_ruleid(Config) -> - basic_apply_rule_test_helper(Config, ruleid, false). + basic_apply_rule_test_helper(get_action(Config), ruleid, false). t_basic_apply_rule_trace_clientid(Config) -> - basic_apply_rule_test_helper(Config, clientid, false). + basic_apply_rule_test_helper(get_action(Config), clientid, false). t_basic_apply_rule_trace_ruleid_stop_after_render(Config) -> - basic_apply_rule_test_helper(Config, ruleid, true). + basic_apply_rule_test_helper(get_action(Config), ruleid, true). -basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> +get_action(Config) -> + case ?config(group_name, Config) of + republish -> + republish_action(); + console_print -> + console_print_action(); + _ -> + make_http_bridge(Config) + end. + +make_http_bridge(Config) -> HTTPServerConfig = ?config(http_server, Config), emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig), #{status := connected} = emqx_bridge_v2:health_check( http, emqx_bridge_http_test_lib:bridge_name() ), + BridgeName = ?config(bridge_name, Config), + emqx_bridge_resource:bridge_id(http, BridgeName). + +republish_action() -> + #{ + <<"args">> => + #{ + <<"mqtt_properties">> => #{}, + <<"payload">> => <<"MY PL">>, + <<"qos">> => 0, + <<"retain">> => false, + <<"topic">> => <<"rule_apply_test_SUITE">>, + <<"user_properties">> => <<>> + }, + <<"function">> => <<"republish">> + }. + +console_print_action() -> + #{<<"function">> => <<"console">>}. + +basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> %% Create Rule RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>, {ok, #{<<"id">> := RuleId}} = - emqx_bridge_testlib:create_rule_and_action_http( - http, + emqx_bridge_testlib:create_rule_and_action( + Action, RuleTopic, - Config, #{sql => SQL} ), ClientId = <<"c_emqx">>, @@ -117,10 +170,7 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> <<"context">> => Context, <<"stop_action_after_template_rendering">> => StopAfterRender }, - emqx_trace:check(), - ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType), Now = erlang:system_time(second) - 10, - {ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)), ?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)), ?retry( _Interval0 = 200, @@ -130,9 +180,14 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> io:format("THELOG:~n~s", [Bin]), ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) + case Action of + A when is_binary(A) -> + ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])); + _ -> + ?assertNotEqual(nomatch, binary:match(Bin, [<<"call_action_function">>])) + end, + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])) end ), case StopAfterRender of @@ -155,7 +210,8 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> begin Bin = read_rule_trace_file(TraceName, TraceType, Now), io:format("THELOG3:~n~s", [Bin]), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])) + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])), + do_final_log_check(Action, Bin) end ) end, @@ -176,6 +232,51 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> emqx_trace:delete(TraceName), ok. +do_final_log_check(Action, Bin0) when is_binary(Action) -> + %% The last line in the Bin should be the action_success entry + Bin1 = string:trim(Bin0), + LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), + LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), + %% Check that lazy formatting of the action result works correctly + ?assertMatch( + #{ + <<"level">> := <<"debug">>, + <<"meta">> := + #{ + <<"action_info">> := + #{ + <<"name">> := <<"emqx_bridge_http_test_lib">>, + <<"type">> := <<"http">> + }, + <<"clientid">> := <<"c_emqx">>, + <<"result">> := + #{ + <<"response">> := + #{ + <<"body">> := <<"hello">>, + <<"headers">> := + #{ + <<"content-type">> := <<"text/plain">>, + <<"date">> := _, + <<"server">> := _ + }, + <<"status">> := 200 + }, + <<"result">> := <<"ok">> + }, + <<"rule_id">> := _, + <<"rule_trigger_time">> := _, + <<"stop_action_after_render">> := false, + <<"trace_tag">> := <<"ACTION">> + }, + <<"msg">> := <<"action_success">>, + <<"time">> := _ + }, + LastEntryJSON + ); +do_final_log_check(_, _) -> + ok. + create_trace(TraceName, TraceType, TraceValue) -> Now = erlang:system_time(second) - 10, Start = Now, @@ -239,8 +340,6 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> SQL ), create_trace(Name, ruleid, RuleID), - emqx_trace:check(), - ok = emqx_trace_handler_SUITE:filesync(Name, ruleid), Now = erlang:system_time(second) - 10, %% Stop ParmsStopAfterRender = apply_rule_parms(true, Name),