diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 408644128..4e182f300 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -88,16 +88,18 @@ unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). rendered_action_template(ActionID, RenderResult) -> - Msg = io_lib:format("action_template_rendered(~s)", [ActionID]), + Msg = lists:flatten(io_lib:format("action_template_rendered(~ts)", [ActionID])), TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult), case logger:get_process_metadata() of #{stop_action_after_render := true} -> %% We throw an unrecoverable error to stop action before the %% resource is called/modified - StopMsg = io_lib:format( - "action_stopped_after_render(~s): " - "Action stopped after template render due to test setting.", - [ActionID] + StopMsg = lists:flatten( + io_lib:format( + "action_stopped_after_render(~ts): " + "Action stopped after template render due to test setting.", + [ActionID] + ) ), MsgBin = iolist_to_binary(StopMsg), error({unrecoverable_error, MsgBin}); diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 46b3d5e1f..19b7ef875 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -371,29 +371,7 @@ on_query( } ), NRequest = formalize_request(Method, BasePath, Request), - case NRequest of - {Path, Headers} -> - emqx_trace:rendered_action_template( - InstId, - #{ - path => Path, - method => Method, - headers => Headers, - timeout => Timeout - } - ); - {Path, Headers, Body} -> - emqx_trace:rendered_action_template( - InstId, - #{ - path => Path, - method => Method, - headers => Headers, - timeout => Timeout, - body => Body - } - ) - end, + trace_rendered_action_template(InstId, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -503,29 +481,7 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), - case NRequest of - {Path, Headers} -> - emqx_trace:rendered_action_template( - InstId, - #{ - path => Path, - method => Method, - headers => Headers, - timeout => Timeout - } - ); - {Path, Headers, Body} -> - emqx_trace:rendered_action_template( - InstId, - #{ - path => Path, - method => Method, - headers => Headers, - timeout => Timeout, - body => Body - } - ) - end, + trace_rendered_action_template(InstId, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -545,6 +501,31 @@ on_query_async( ), {ok, Worker}. +trace_rendered_action_template(InstId, Method, NRequest, Timeout) -> + case NRequest of + {Path, Headers} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => Headers, + timeout => Timeout + } + ); + {Path, Headers, Body} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => emqx_utils_redact:redact_headers(Headers), + timeout => Timeout, + body => Body + } + ) + end. + resolve_pool_worker(State, undefined) -> resolve_pool_worker(State, self()); resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> diff --git a/apps/emqx_rule_engine/rebar.config b/apps/emqx_rule_engine/rebar.config index d51bffa20..0f00f15c6 100644 --- a/apps/emqx_rule_engine/rebar.config +++ b/apps/emqx_rule_engine/rebar.config @@ -2,7 +2,8 @@ {deps, [ {emqx, {path, "../emqx"}}, - {emqx_utils, {path, "../emqx_utils"}} + {emqx_utils, {path, "../emqx_utils"}}, + {emqx_modules, {path, "../emqx_modules"}} ]}. {profiles, [ diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 5e19b33ee..f4685222c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -108,15 +108,6 @@ fields("rule_test") -> fields("rule_apply_test") -> [ rule_input_message_context(), - {"environment", - sc( - typerefl:map(), - #{ - desc => - ?DESC("test_rule_environment"), - default => #{} - } - )}, {"stop_action_after_template_rendering", sc( typerefl:boolean(), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 1768141ae..1fed922dd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -17,7 +17,9 @@ %% rule_engine should wait for bridge connector start, %% it's will check action/connector ref's exist. emqx_bridge, - emqx_connector + emqx_connector, + %% Needed to start the tracing functionality + emqx_modules ]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f90b5a974..d4cde213d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -420,13 +420,13 @@ handle_action(RuleId, ActId, Selected, Envs) -> end. -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target). -do_handle_action(_RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) -> +do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) -> trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug), - TraceCtx = do_handle_action_get_trace_context(Action), - ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}}, + {TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}}, case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{ - reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx) + reply_to => ReplyTo, trace_ctx => TraceCtx }) of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> @@ -437,20 +437,20 @@ do_handle_action(_RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Sele Result end; do_handle_action( - _RuleId, + RuleId, {bridge_v2, BridgeType, BridgeName} = Action, Selected, _Envs ) -> trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug), - TraceCtx = do_handle_action_get_trace_context(Action), - ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}}, + {TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}}, case emqx_bridge_v2:send_message( BridgeType, BridgeName, Selected, - #{reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx)} + #{reply_to => ReplyTo, trace_ctx => TraceCtx} ) of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> @@ -460,17 +460,31 @@ do_handle_action( Result -> Result end; -do_handle_action(_RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -> +do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -> trace_action(Action, "call_action_function"), %% the function can also throw 'out_of_service' Args = maps:get(args, Action, []), Result = Mod:Func(Selected, Envs, Args), - TraceCtx = do_handle_action_get_trace_context(Action), - inc_action_metrics(TraceCtx, Result), + {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), + inc_action_metrics(IncCtx, Result), trace_action(Action, "call_action_function_result", #{result => Result}, debug), Result. -do_handle_action_get_trace_context(Action) -> +do_handle_action_get_trace_inc_metrics_context(RuleID, Action) -> + case emqx_trace:list() of + [] -> + %% As a performance/memory optimization, we don't create any trace + %% context if there are no trace patterns. + {undefined, #{ + rule_id => RuleID, + action_id => Action + }}; + _List -> + Ctx = do_handle_action_get_trace_inc_metrics_context_unconditionally(Action), + {maps:remove(action_id, Ctx), Ctx} + end. + +do_handle_action_get_trace_inc_metrics_context_unconditionally(Action) -> Metadata = logger:get_process_metadata(), StopAfterRender = maps:get(stop_action_after_render, Metadata, false), case Metadata of diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 342a8d9f9..83f29eef3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -28,7 +28,6 @@ apply_rule( RuleId, #{ context := Context, - environment := Env, stop_action_after_template_rendering := StopAfterRender } ) -> @@ -39,30 +38,32 @@ apply_rule( true -> %% test if the topic matches the topic filters in the rule case emqx_topic:match_any(InTopic, EventTopics) of - true -> do_apply_matched_rule(Rule, Context, Env, StopAfterRender); - false -> {error, nomatch} + true -> + do_apply_matched_rule( + Rule, + Context, + StopAfterRender + ); + false -> + {error, nomatch} end; false -> case lists:member(InTopic, EventTopics) of true -> %% the rule is for both publish and events, test it directly - do_apply_matched_rule(Rule, Context, Env, StopAfterRender); + do_apply_matched_rule(Rule, Context, StopAfterRender); false -> {error, nomatch} end end. -do_apply_matched_rule(Rule, Context, Env, StopAfterRender) -> +do_apply_matched_rule(Rule, Context, StopAfterRender) -> update_process_trace_metadata(StopAfterRender), - Env1 = - case Env of - M when map_size(M) =:= 0 -> - %% Use the default environment if no environment is provided - default_apply_rule_environment(); - _ -> - Env - end, - ApplyRuleRes = emqx_rule_runtime:apply_rule(Rule, Context, Env1), + ApplyRuleRes = emqx_rule_runtime:apply_rule( + Rule, + Context, + apply_rule_environment() + ), reset_trace_process_metadata(StopAfterRender), ApplyRuleRes. @@ -80,16 +81,11 @@ reset_trace_process_metadata(true = _StopAfterRender) -> reset_trace_process_metadata(false = _StopAfterRender) -> ok. -default_apply_rule_environment() -> - #{ - headers => #{ - protocol => mqtt, - username => undefined, - peerhost => {127, 0, 0, 1}, - proto_ver => 5, - properties => #{} - } - }. +%% At the time of writing the environment passed to the apply rule function is +%% not used at all for normal actions. When it is used for custom functions it +%% is first merged with the context so there does not seem to be any need to +%% set this to anything else then the empty map. +apply_rule_environment() -> #{}. -spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}. test(#{sql := Sql, context := Context}) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 5040d15b3..b0ca00a0e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -158,13 +158,13 @@ init_per_suite(Config) -> emqx_rule_funcs_demo:module_info(), application:load(emqx_conf), ok = emqx_common_test_helpers:start_apps( - [emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge], + [emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge], fun set_special_configs/1 ), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]), + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]), ok. set_special_configs(emqx_auth) -> diff --git a/rel/i18n/emqx_rule_api_schema.hocon b/rel/i18n/emqx_rule_api_schema.hocon index 25535d0ca..68c6a560d 100644 --- a/rel/i18n/emqx_rule_api_schema.hocon +++ b/rel/i18n/emqx_rule_api_schema.hocon @@ -66,12 +66,6 @@ test_context.desc: test_context.label: """Event Conetxt""" -test_rule_environment.desc: -"""The environment that will be passed to the rule when it is applied. A default environment will be used if no environment is given.""" - -test_rule_environment.label: -"""Event Environment""" - stop_action_after_template_render.desc: """Set this to true if the action should be stopped after its template has been rendered (default is true)."""