diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 8151c19b5..408644128 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -28,7 +28,8 @@ subscribe/3, unsubscribe/2, log/3, - log/4 + log/4, + rendered_action_template/2 ]). -export([ @@ -86,6 +87,25 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). +rendered_action_template(ActionID, RenderResult) -> + Msg = io_lib:format("action_template_rendered(~s)", [ActionID]), + TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult), + case logger:get_process_metadata() of + #{stop_action_after_render := true} -> + %% We throw an unrecoverable error to stop action before the + %% resource is called/modified + StopMsg = io_lib:format( + "action_stopped_after_render(~s): " + "Action stopped after template render due to test setting.", + [ActionID] + ), + MsgBin = iolist_to_binary(StopMsg), + error({unrecoverable_error, MsgBin}); + _ -> + ok + end, + TraceResult. + log(List, Msg, Meta) -> log(debug, List, Msg, Meta). diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 99222aa00..46b3d5e1f 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -371,6 +371,29 @@ on_query( } ), NRequest = formalize_request(Method, BasePath, Request), + case NRequest of + {Path, Headers} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => Headers, + timeout => Timeout + } + ); + {Path, Headers, Body} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => Headers, + timeout => Timeout, + body => Body + } + ) + end, Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -480,6 +503,29 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), + case NRequest of + {Path, Headers} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => Headers, + timeout => Timeout + } + ); + {Path, Headers, Body} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => Headers, + timeout => Timeout, + body => Body + } + ) + end, MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -661,22 +707,13 @@ process_request_and_action(Request, ActionState, Msg) -> ), BodyTemplate = maps:get(body, ActionState), Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg), - RenderResult = #{ + #{ method => Method, path => Path, body => Body, headers => Headers, request_timeout => maps:get(request_timeout, ActionState) - }, - ?TRACE( - "QUERY_RENDER", - "http_connector_successfully_rendered_request", - #{ - request => Request, - render_result => RenderResult - } - ), - RenderResult. + }. merge_proplist(Proplist1, Proplist2) -> lists:foldl( diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 1204ea5e5..f2c89e8ed 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -471,7 +471,9 @@ do_handle_action(_RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) Result. do_handle_action_get_trace_context(Action) -> - case logger:get_process_metadata() of + Metadata = logger:get_process_metadata(), + StopAfterRender = maps:get(stop_action_after_render, Metadata, false), + case Metadata of #{ rule_id := RuleID, clientid := ClientID @@ -479,14 +481,16 @@ do_handle_action_get_trace_context(Action) -> #{ rule_id => RuleID, clientid => ClientID, - action_id => Action + action_id => Action, + stop_action_after_render => StopAfterRender }; #{ rule_id := RuleID } -> #{ rule_id => RuleID, - action_id => Action + action_id => Action, + stop_action_after_render => StopAfterRender } end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index e72b0fcd0..fc4d2614f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -67,7 +67,7 @@ do_apply_matched_rule(Rule, Context, Env, StopAfterRender) -> ApplyRuleRes. update_process_trace_metadata(true = _StopAfterRender) -> - logger:update_process_trace_metadata(#{ + logger:update_process_metadata(#{ stop_action_after_render => true }); update_process_trace_metadata(false = _StopAfterRender) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index d9d15dba0..31a462de3 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -64,12 +64,15 @@ end_per_testcase(_TestCase, _Config) -> ok. t_basic_apply_rule_trace_ruleid(Config) -> - basic_apply_rule_test_helper(Config, ruleid). + basic_apply_rule_test_helper(Config, ruleid, false). t_basic_apply_rule_trace_clientid(Config) -> - basic_apply_rule_test_helper(Config, clientid). + basic_apply_rule_test_helper(Config, clientid, false). -basic_apply_rule_test_helper(Config, TraceType) -> +t_basic_apply_rule_trace_ruleid_stop_after_render(Config) -> + basic_apply_rule_test_helper(Config, ruleid, true). + +basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> HTTPServerConfig = ?config(http_server, Config), emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig), #{status := connected} = emqx_bridge_v2:health_check( @@ -121,7 +124,8 @@ basic_apply_rule_test_helper(Config, TraceType) -> }, Params = #{ % body => #{ - <<"context">> => Context + <<"context">> => Context, + <<"stop_action_after_template_render">> => StopAfterRender % } }, emqx_trace:check(), @@ -133,279 +137,42 @@ basic_apply_rule_test_helper(Config, TraceType) -> _NAttempts0 = 20, begin Bin = read_rule_trace_file(TraceName, TraceType, Now), + io:format("THELOG:~n~s", [Bin]), ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"successfully_rendered_request">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) end ), + case StopAfterRender of + true -> + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, TraceType, Now), + io:format("THELOG2:~n~s", [Bin]), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_failed">>])) + end + ); + false -> + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, TraceType, Now), + io:format("THELOG3:~n~s", [Bin]), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])) + end + ) + end, emqx_trace:delete(TraceName), ok. %% Helper Functions -% t_ctx_pub(_) -> -% SQL = <<"SELECT payload.msg as msg, clientid, username, payload, topic, qos FROM \"t/#\"">>, -% Context = #{ -% clientid => <<"c_emqx">>, -% event_type => message_publish, -% payload => <<"{\"msg\": \"hello\"}">>, -% qos => 1, -% topic => <<"t/a">>, -% username => <<"u_emqx">> -% }, -% Expected = Context#{msg => <<"hello">>}, -% do_test(SQL, Context, Expected). - -% t_ctx_sub(_) -> -% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_subscribed\"">>, -% Context = #{ -% clientid => <<"c_emqx">>, -% event_type => session_subscribed, -% qos => 1, -% topic => <<"t/a">>, -% username => <<"u_emqx">> -% }, - -% do_test(SQL, Context, Context). - -% t_ctx_unsub(_) -> -% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_unsubscribed\"">>, -% Context = #{ -% clientid => <<"c_emqx">>, -% event_type => session_unsubscribed, -% qos => 1, -% topic => <<"t/a">>, -% username => <<"u_emqx">> -% }, -% do_test(SQL, Context, Context). - -% t_ctx_delivered(_) -> -% SQL = -% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_delivered\"">>, -% Context = #{ -% clientid => <<"c_emqx_2">>, -% event_type => message_delivered, -% from_clientid => <<"c_emqx_1">>, -% from_username => <<"u_emqx_1">>, -% payload => <<"{\"msg\": \"hello\"}">>, -% qos => 1, -% topic => <<"t/a">>, -% username => <<"u_emqx_2">> -% }, -% Expected = check_result([from_clientid, from_username, topic, qos], [node, timestamp], Context), -% do_test(SQL, Context, Expected). - -% t_ctx_acked(_) -> -% SQL = -% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_acked\"">>, - -% Context = #{ -% clientid => <<"c_emqx_2">>, -% event_type => message_acked, -% from_clientid => <<"c_emqx_1">>, -% from_username => <<"u_emqx_1">>, -% payload => <<"{\"msg\": \"hello\"}">>, -% qos => 1, -% topic => <<"t/a">>, -% username => <<"u_emqx_2">> -% }, - -% Expected = with_node_timestampe([from_clientid, from_username, topic, qos], Context), - -% do_test(SQL, Context, Expected). - -% t_ctx_droped(_) -> -% SQL = <<"SELECT reason, topic, qos, node, timestamp FROM \"$events/message_dropped\"">>, -% Topic = <<"t/a">>, -% QoS = 1, -% Reason = <<"no_subscribers">>, -% Context = #{ -% clientid => <<"c_emqx">>, -% event_type => message_dropped, -% payload => <<"{\"msg\": \"hello\"}">>, -% qos => QoS, -% reason => Reason, -% topic => Topic, -% username => <<"u_emqx">> -% }, - -% Expected = with_node_timestampe([reason, topic, qos], Context), -% do_test(SQL, Context, Expected). - -% t_ctx_connected(_) -> -% SQL = -% <<"SELECT clientid, username, keepalive, is_bridge FROM \"$events/client_connected\"">>, - -% Context = -% #{ -% clean_start => true, -% clientid => <<"c_emqx">>, -% event_type => client_connected, -% is_bridge => false, -% peername => <<"127.0.0.1:52918">>, -% username => <<"u_emqx">> -% }, -% Expected = check_result([clientid, username, keepalive, is_bridge], [], Context), -% do_test(SQL, Context, Expected). - -% t_ctx_disconnected(_) -> -% SQL = -% <<"SELECT clientid, username, reason, disconnected_at, node FROM \"$events/client_disconnected\"">>, - -% Context = -% #{ -% clientid => <<"c_emqx">>, -% event_type => client_disconnected, -% reason => <<"normal">>, -% username => <<"u_emqx">> -% }, -% Expected = check_result([clientid, username, reason], [disconnected_at, node], Context), -% do_test(SQL, Context, Expected). - -% t_ctx_connack(_) -> -% SQL = -% <<"SELECT clientid, username, reason_code, node FROM \"$events/client_connack\"">>, - -% Context = -% #{ -% clean_start => true, -% clientid => <<"c_emqx">>, -% event_type => client_connack, -% reason_code => <<"sucess">>, -% username => <<"u_emqx">> -% }, -% Expected = check_result([clientid, username, reason_code], [node], Context), -% do_test(SQL, Context, Expected). - -% t_ctx_check_authz_complete(_) -> -% SQL = -% << -% "SELECT clientid, username, topic, action, result,\n" -% "authz_source, node FROM \"$events/client_check_authz_complete\"" -% >>, - -% Context = -% #{ -% action => <<"publish">>, -% clientid => <<"c_emqx">>, -% event_type => client_check_authz_complete, -% result => <<"allow">>, -% topic => <<"t/1">>, -% username => <<"u_emqx">> -% }, -% Expected = check_result( -% [clientid, username, topic, action], -% [authz_source, node, result], -% Context -% ), - -% do_test(SQL, Context, Expected). - -% t_ctx_delivery_dropped(_) -> -% SQL = -% <<"SELECT from_clientid, from_username, reason, topic, qos FROM \"$events/delivery_dropped\"">>, - -% Context = -% #{ -% clientid => <<"c_emqx_2">>, -% event_type => delivery_dropped, -% from_clientid => <<"c_emqx_1">>, -% from_username => <<"u_emqx_1">>, -% payload => <<"{\"msg\": \"hello\"}">>, -% qos => 1, -% reason => <<"queue_full">>, -% topic => <<"t/a">>, -% username => <<"u_emqx_2">> -% }, -% Expected = check_result([from_clientid, from_username, reason, qos, topic], [], Context), -% do_test(SQL, Context, Expected). - -% t_mongo_date_function_should_return_string_in_test_env(_) -> -% SQL = -% <<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>, -% Context = -% #{ -% action => <<"publish">>, -% clientid => <<"c_emqx">>, -% event_type => client_check_authz_complete, -% result => <<"allow">>, -% topic => <<"t/1">>, -% username => <<"u_emqx">> -% }, -% CheckFunction = fun(Result) -> -% MongoDate = maps:get(mongo_date, Result), -% %% Use regex to match the expected string -% MatchResult = re:run(MongoDate, <<"ISODate\\([0-9]{4}-[0-9]{2}-[0-9]{2}T.*\\)">>), -% ?assertMatch({match, _}, MatchResult), -% ok -% end, -% do_test(SQL, Context, CheckFunction). - -% do_test(SQL, Context, Expected0) -> -% Res = emqx_rule_engine_api:'/rule_test'( -% post, -% test_rule_params(SQL, Context) -% ), -% ?assertMatch({200, _}, Res), -% {200, Result0} = Res, -% Result = emqx_utils_maps:unsafe_atom_key_map(Result0), -% case is_function(Expected0) of -% false -> -% Expected = maps:without([event_type], Expected0), -% ?assertMatch(Expected, Result, Expected); -% _ -> -% Expected0(Result) -% end, -% ok. - -% test_rule_params(Sql, Context) -> -% #{ -% body => #{ -% <<"context">> => Context, -% <<"sql">> => Sql -% } -% }. - -% with_node_timestampe(Keys, Context) -> -% check_result(Keys, [node, timestamp], Context). - -% check_result(Keys, Exists, Context) -> -% Log = fun(Format, Args) -> -% lists:flatten(io_lib:format(Format, Args)) -% end, - -% Base = maps:with(Keys, Context), - -% fun(Result) -> -% maps:foreach( -% fun(Key, Value) -> -% ?assertEqual( -% Value, -% maps:get(Key, Result, undefined), -% Log("Key:~p value error~nResult:~p~n", [Key, Result]) -% ) -% end, -% Base -% ), - -% NotExists = fun(Key) -> Log("Key:~p not exists in result:~p~n", [Key, Result]) end, -% lists:foreach( -% fun(Key) -> -% Find = maps:find(Key, Result), -% Formatter = NotExists(Key), -% ?assertMatch({ok, _}, Find, Formatter), -% ?assertNotMatch({ok, undefined}, Find, Formatter), -% ?assertNotMatch({ok, <<"undefined">>}, Find, Formatter) -% end, -% Exists -% ), - -% ?assertEqual(erlang:length(Keys) + erlang:length(Exists), maps:size(Result), Result) -% end. - call_apply_rule_api(RuleId, Params) -> Method = post, Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]),