fix(trace): several improvements thanks to comments from @zmstone

This commit is contained in:
Kjell Winblad 2024-04-11 16:48:43 +02:00
parent 31142df5cf
commit 9998940aa2
9 changed files with 87 additions and 106 deletions

View File

@ -88,16 +88,18 @@ unsubscribe(Topic, SubOpts) ->
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
rendered_action_template(ActionID, RenderResult) ->
Msg = io_lib:format("action_template_rendered(~s)", [ActionID]),
Msg = lists:flatten(io_lib:format("action_template_rendered(~ts)", [ActionID])),
TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult),
case logger:get_process_metadata() of
#{stop_action_after_render := true} ->
%% We throw an unrecoverable error to stop action before the
%% resource is called/modified
StopMsg = io_lib:format(
"action_stopped_after_render(~s): "
StopMsg = lists:flatten(
io_lib:format(
"action_stopped_after_render(~ts): "
"Action stopped after template render due to test setting.",
[ActionID]
)
),
MsgBin = iolist_to_binary(StopMsg),
error({unrecoverable_error, MsgBin});

View File

@ -371,29 +371,7 @@ on_query(
}
),
NRequest = formalize_request(Method, BasePath, Request),
case NRequest of
{Path, Headers} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout
}
);
{Path, Headers, Body} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout,
body => Body
}
)
end,
trace_rendered_action_template(InstId, Method, NRequest, Timeout),
Worker = resolve_pool_worker(State, KeyOrNum),
Result0 = ehttpc:request(
Worker,
@ -503,29 +481,7 @@ on_query_async(
}
),
NRequest = formalize_request(Method, BasePath, Request),
case NRequest of
{Path, Headers} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout
}
);
{Path, Headers, Body} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout,
body => Body
}
)
end,
trace_rendered_action_template(InstId, Method, NRequest, Timeout),
MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{
attempt => 1,
@ -545,6 +501,31 @@ on_query_async(
),
{ok, Worker}.
trace_rendered_action_template(InstId, Method, NRequest, Timeout) ->
case NRequest of
{Path, Headers} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout
}
);
{Path, Headers, Body} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => emqx_utils_redact:redact_headers(Headers),
timeout => Timeout,
body => Body
}
)
end.
resolve_pool_worker(State, undefined) ->
resolve_pool_worker(State, self());
resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->

View File

@ -2,7 +2,8 @@
{deps, [
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}}
{emqx_utils, {path, "../emqx_utils"}},
{emqx_modules, {path, "../emqx_modules"}}
]}.
{profiles, [

View File

@ -108,15 +108,6 @@ fields("rule_test") ->
fields("rule_apply_test") ->
[
rule_input_message_context(),
{"environment",
sc(
typerefl:map(),
#{
desc =>
?DESC("test_rule_environment"),
default => #{}
}
)},
{"stop_action_after_template_rendering",
sc(
typerefl:boolean(),

View File

@ -17,7 +17,9 @@
%% rule_engine should wait for bridge connector start,
%% it's will check action/connector ref's exist.
emqx_bridge,
emqx_connector
emqx_connector,
%% Needed to start the tracing functionality
emqx_modules
]},
{mod, {emqx_rule_engine_app, []}},
{env, []},

View File

@ -420,13 +420,13 @@ handle_action(RuleId, ActId, Selected, Envs) ->
end.
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target).
do_handle_action(_RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) ->
do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) ->
trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug),
TraceCtx = do_handle_action_get_trace_context(Action),
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}},
{TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}},
case
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{
reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx)
reply_to => ReplyTo, trace_ctx => TraceCtx
})
of
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
@ -437,20 +437,20 @@ do_handle_action(_RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Sele
Result
end;
do_handle_action(
_RuleId,
RuleId,
{bridge_v2, BridgeType, BridgeName} = Action,
Selected,
_Envs
) ->
trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug),
TraceCtx = do_handle_action_get_trace_context(Action),
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}},
{TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}},
case
emqx_bridge_v2:send_message(
BridgeType,
BridgeName,
Selected,
#{reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx)}
#{reply_to => ReplyTo, trace_ctx => TraceCtx}
)
of
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
@ -460,17 +460,31 @@ do_handle_action(
Result ->
Result
end;
do_handle_action(_RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) ->
do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) ->
trace_action(Action, "call_action_function"),
%% the function can also throw 'out_of_service'
Args = maps:get(args, Action, []),
Result = Mod:Func(Selected, Envs, Args),
TraceCtx = do_handle_action_get_trace_context(Action),
inc_action_metrics(TraceCtx, Result),
{_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
inc_action_metrics(IncCtx, Result),
trace_action(Action, "call_action_function_result", #{result => Result}, debug),
Result.
do_handle_action_get_trace_context(Action) ->
do_handle_action_get_trace_inc_metrics_context(RuleID, Action) ->
case emqx_trace:list() of
[] ->
%% As a performance/memory optimization, we don't create any trace
%% context if there are no trace patterns.
{undefined, #{
rule_id => RuleID,
action_id => Action
}};
_List ->
Ctx = do_handle_action_get_trace_inc_metrics_context_unconditionally(Action),
{maps:remove(action_id, Ctx), Ctx}
end.
do_handle_action_get_trace_inc_metrics_context_unconditionally(Action) ->
Metadata = logger:get_process_metadata(),
StopAfterRender = maps:get(stop_action_after_render, Metadata, false),
case Metadata of

View File

@ -28,7 +28,6 @@ apply_rule(
RuleId,
#{
context := Context,
environment := Env,
stop_action_after_template_rendering := StopAfterRender
}
) ->
@ -39,30 +38,32 @@ apply_rule(
true ->
%% test if the topic matches the topic filters in the rule
case emqx_topic:match_any(InTopic, EventTopics) of
true -> do_apply_matched_rule(Rule, Context, Env, StopAfterRender);
false -> {error, nomatch}
true ->
do_apply_matched_rule(
Rule,
Context,
StopAfterRender
);
false ->
{error, nomatch}
end;
false ->
case lists:member(InTopic, EventTopics) of
true ->
%% the rule is for both publish and events, test it directly
do_apply_matched_rule(Rule, Context, Env, StopAfterRender);
do_apply_matched_rule(Rule, Context, StopAfterRender);
false ->
{error, nomatch}
end
end.
do_apply_matched_rule(Rule, Context, Env, StopAfterRender) ->
do_apply_matched_rule(Rule, Context, StopAfterRender) ->
update_process_trace_metadata(StopAfterRender),
Env1 =
case Env of
M when map_size(M) =:= 0 ->
%% Use the default environment if no environment is provided
default_apply_rule_environment();
_ ->
Env
end,
ApplyRuleRes = emqx_rule_runtime:apply_rule(Rule, Context, Env1),
ApplyRuleRes = emqx_rule_runtime:apply_rule(
Rule,
Context,
apply_rule_environment()
),
reset_trace_process_metadata(StopAfterRender),
ApplyRuleRes.
@ -80,16 +81,11 @@ reset_trace_process_metadata(true = _StopAfterRender) ->
reset_trace_process_metadata(false = _StopAfterRender) ->
ok.
default_apply_rule_environment() ->
#{
headers => #{
protocol => mqtt,
username => undefined,
peerhost => {127, 0, 0, 1},
proto_ver => 5,
properties => #{}
}
}.
%% At the time of writing the environment passed to the apply rule function is
%% not used at all for normal actions. When it is used for custom functions it
%% is first merged with the context so there does not seem to be any need to
%% set this to anything else then the empty map.
apply_rule_environment() -> #{}.
-spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}.
test(#{sql := Sql, context := Context}) ->

View File

@ -158,13 +158,13 @@ init_per_suite(Config) ->
emqx_rule_funcs_demo:module_info(),
application:load(emqx_conf),
ok = emqx_common_test_helpers:start_apps(
[emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge],
[emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge],
fun set_special_configs/1
),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]),
ok.
set_special_configs(emqx_auth) ->

View File

@ -66,12 +66,6 @@ test_context.desc:
test_context.label:
"""Event Conetxt"""
test_rule_environment.desc:
"""The environment that will be passed to the rule when it is applied. A default environment will be used if no environment is given."""
test_rule_environment.label:
"""Event Environment"""
stop_action_after_template_render.desc:
"""Set this to true if the action should be stopped after its template has been rendered (default is true)."""