refactor(rule trace): templates rendered trace to increase code reuse

* The code for passing the trace context to a sub process has been
  improved to increase code reuse. This code is used when the action
  templates are rendered in a sub process.
* A macro has also been added for the error term that is thrown when the
  action shall be stopped after the templates has been rendered. This is
  also done to reduce code duplication and to reduce the risk of
  introducing bugs due to typos.
* Fix incorrect type spec

Thanks to @zmstone for suggesting these improvements in comments to a PR
(https://github.com/emqx/emqx/pull/12916).
This commit is contained in:
Kjell Winblad 2024-04-25 11:39:20 +02:00
parent 0dbaef4316
commit ef9884cf47
9 changed files with 140 additions and 146 deletions

View File

@ -38,4 +38,10 @@
-define(SHARD, ?COMMON_SHARD).
-define(MAX_SIZE, 30).
-define(EMQX_TRACE_STOP_ACTION(REASON),
{unrecoverable_error, {action_stopped_after_template_rendering, REASON}}
).
-define(EMQX_TRACE_STOP_ACTION_MATCH, ?EMQX_TRACE_STOP_ACTION(_)).
-endif.

View File

@ -29,7 +29,9 @@
unsubscribe/2,
log/3,
log/4,
rendered_action_template/2
rendered_action_template/2,
make_rendered_action_template_trace_context/1,
rendered_action_template_with_ctx/2
]).
-export([
@ -70,6 +72,12 @@
-export_type([ruleid/0]).
-type ruleid() :: binary().
-export_type([rendered_action_template_ctx/0]).
-opaque rendered_action_template_ctx() :: #{
trace_ctx := map(),
action_id := any()
}.
publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ignore;
publish(#message{from = From, topic = Topic, payload = Payload}) when
@ -107,15 +115,56 @@ rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) ->
)
),
MsgBin = unicode:characters_to_binary(StopMsg),
error({unrecoverable_error, {action_stopped_after_template_rendering, MsgBin}});
error(?EMQX_TRACE_STOP_ACTION(MsgBin));
_ ->
ok
end,
TraceResult;
rendered_action_template(_ActionID, _RenderResult) ->
%% We do nothing if we don't get a valid Action ID
%% We do nothing if we don't get a valid Action ID. This can happen when
%% called from connectors that are used for actions as well as authz and
%% authn.
ok.
%% The following two functions are used for connectors that don't do the
%% rendering in the main process (the one that called on_*query). In this case
%% we need to pass the trace context to the sub process that do the rendering
%% so that the result of the rendering can be traced correctly. It is also
%% important to ensure that the error that can be thrown from
%% rendered_action_template_with_ctx is handled in the appropriate way in the
%% sub process.
-spec make_rendered_action_template_trace_context(any()) -> rendered_action_template_ctx().
make_rendered_action_template_trace_context(ActionID) ->
MetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
#{trace_ctx => MetaData, action_id => ActionID}.
-spec rendered_action_template_with_ctx(rendered_action_template_ctx(), Result :: term()) -> term().
rendered_action_template_with_ctx(
#{
trace_ctx := LogMetaData,
action_id := ActionID
},
RenderResult
) ->
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
log(List, Msg, Meta) ->
log(debug, List, Msg, Meta).

View File

@ -9,6 +9,7 @@
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -246,16 +247,15 @@ do_query(
table := Table,
templates := Templates
} = ChannelState,
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
TraceRenderedCTX =
emqx_trace:make_rendered_action_template_trace_context(ChannelId),
Result =
case ensuare_dynamo_keys(Query, ChannelState) of
true ->
ecpool:pick_and_do(
PoolName,
{emqx_bridge_dynamo_connector_client, query, [
Table, QueryTuple, Templates, TraceRenderedFunc
Table, QueryTuple, Templates, TraceRenderedCTX
]},
no_handover
);
@ -264,7 +264,7 @@ do_query(
end,
case Result of
{error, {unrecoverable_error, {action_stopped_after_template_rendering, _}}} = Error ->
{error, ?EMQX_TRACE_STOP_ACTION(_)} = Error ->
Error;
{error, Reason} ->
?tp(
@ -298,22 +298,6 @@ do_query(
Result
end.
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
get_channel_id([{ChannelId, _Req} | _]) ->
ChannelId;
get_channel_id({ChannelId, _Req}) ->

View File

@ -40,8 +40,8 @@ is_connected(Pid, Timeout) ->
{false, Error}
end.
query(Pid, Table, Query, Templates, TraceRenderedFunc) ->
gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedFunc}, infinity).
query(Pid, Table, Query, Templates, TraceRenderedCTX) ->
gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedCTX}, infinity).
%%--------------------------------------------------------------------
%% @doc
@ -77,8 +77,8 @@ handle_call(is_connected, _From, State) ->
{false, Error}
end,
{reply, IsConnected, State};
handle_call({query, Table, Query, Templates, TraceRenderedFunc}, _From, State) ->
Result = do_query(Table, Query, Templates, TraceRenderedFunc),
handle_call({query, Table, Query, Templates, TraceRenderedCTX}, _From, State) ->
Result = do_query(Table, Query, Templates, TraceRenderedCTX),
{reply, Result, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
@ -102,10 +102,13 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) ->
do_query(Table, Query0, Templates, TraceRenderedCTX) ->
try
Query = apply_template(Query0, Templates),
TraceRenderedFun(#{table => Table, query => Query}, TraceRenderedCTX),
emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
table => Table,
query => {fun trace_format_query/1, Query}
}),
execute(Query, Table)
catch
error:{unrecoverable_error, Reason} ->
@ -114,6 +117,14 @@ do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) ->
{error, {unrecoverable_error, {invalid_request, Reason}}}
end.
trace_format_query({Type, Data}) ->
#{type => Type, data => Data};
trace_format_query([_ | _] = Batch) ->
BatchData = [trace_format_query(Q) || Q <- Batch],
#{type => batch, data => BatchData};
trace_format_query(Query) ->
Query.
%% some simple query commands for authn/authz or test
execute({insert_item, Msg}, Table) ->
Item = convert_to_item(Msg),

View File

@ -301,29 +301,11 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
}).
with_egress_client(ActionID, ResourceId, Fun, Args) ->
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ActionID},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ActionID),
ecpool:pick_and_do(
ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedFunc | Args]}, no_handover
ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedCTX | Args]}, no_handover
).
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
on_async_result(Callback, Result) ->
apply_callback_function(Callback, handle_send_result(Result)).
@ -343,9 +325,7 @@ handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) ->
handle_send_result({ok, Reply}) ->
{error, classify_reply(Reply)};
handle_send_result({error, Reason}) ->
{error, classify_error(Reason)};
handle_send_result({unrecoverable_error, Reason}) ->
{error, {unrecoverable_error, Reason}}.
{error, classify_error(Reason)}.
classify_reply(Reply = #{reason_code := _}) ->
{unrecoverable_error, Reply}.
@ -360,6 +340,8 @@ classify_error({shutdown, _} = Reason) ->
{recoverable_error, Reason};
classify_error(shutdown = Reason) ->
{recoverable_error, Reason};
classify_error({unrecoverable_error, _Reason} = Error) ->
Error;
classify_error(Reason) ->
{unrecoverable_error, Reason}.

View File

@ -29,9 +29,6 @@
-type message() :: emqx_types:message() | map().
-type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}.
-type remote_message() :: #mqtt_msg{}.
-type trace_rendered_func() :: {
fun((RenderResult :: any(), CTX :: map()) -> any()), TraceCTX :: map()
}.
-type egress() :: #{
local => #{
@ -45,37 +42,40 @@
config(#{remote := RC = #{}} = Conf) ->
Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}.
-spec send(pid(), trace_rendered_func(), message(), egress()) -> ok.
send(Pid, TraceRenderedFunc, MsgIn, Egress) ->
-spec send(pid(), emqx_trace:rendered_action_template_ctx(), message(), egress()) ->
ok | {error, {unrecoverable_error, term()}}.
send(Pid, TraceRenderedCTX, MsgIn, Egress) ->
try
emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedFunc))
emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedCTX))
catch
error:{unrecoverable_error, Reason} ->
{unrecoverable_error, Reason}
{error, {unrecoverable_error, Reason}}
end.
-spec send_async(pid(), trace_rendered_func(), message(), callback(), egress()) ->
ok | {ok, pid()}.
send_async(Pid, TraceRenderedFunc, MsgIn, Callback, Egress) ->
-spec send_async(pid(), emqx_trace:rendered_action_template_ctx(), message(), callback(), egress()) ->
{ok, pid()} | {error, {unrecoverable_error, term()}}.
send_async(Pid, TraceRenderedCTX, MsgIn, Callback, Egress) ->
try
ok = emqtt:publish_async(
Pid, export_msg(MsgIn, Egress, TraceRenderedFunc), _Timeout = infinity, Callback
Pid, export_msg(MsgIn, Egress, TraceRenderedCTX), _Timeout = infinity, Callback
),
{ok, Pid}
catch
error:{unrecoverable_error, Reason} ->
{unrecoverable_error, Reason}
{error, {unrecoverable_error, Reason}}
end.
export_msg(Msg, #{remote := Remote}, TraceRenderedFunc) ->
to_remote_msg(Msg, Remote, TraceRenderedFunc).
export_msg(Msg, #{remote := Remote}, TraceRenderedCTX) ->
to_remote_msg(Msg, Remote, TraceRenderedCTX).
-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars(), trace_rendered_func()) ->
-spec to_remote_msg(
message(), emqx_bridge_mqtt_msg:msgvars(), emqx_trace:rendered_action_template_ctx()
) ->
remote_message().
to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedFunc) ->
to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedCTX) ->
{EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedFunc);
to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) ->
to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedCTX);
to_remote_msg(Msg = #{}, Remote, TraceRenderedCTX) ->
#{
topic := Topic,
payload := Payload,
@ -83,16 +83,13 @@ to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) ->
retain := Retain
} = emqx_bridge_mqtt_msg:render(Msg, Remote),
PubProps = maps:get(pub_props, Msg, #{}),
TraceRenderedFun(
#{
qos => QoS,
retain => Retain,
topic => Topic,
props => PubProps,
payload => Payload
},
TraceRenderedCTX
),
emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
qos => QoS,
retain => Retain,
topic => Topic,
props => PubProps,
payload => Payload
}),
#mqtt_msg{
qos = QoS,
retain = Retain,

View File

@ -9,6 +9,7 @@
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -214,12 +215,10 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) ->
#{channels := Channels} = State,
case maps:find(ChannelId, Channels) of
{ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId),
Res = ecpool:pick_and_do(
ResourceID,
{?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedFunc]},
{?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedCTX]},
no_handover
),
handle_result(Res);
@ -237,12 +236,10 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) ->
#{channels := Channels} = State,
case maps:find(ChannelId, Channels) of
{ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId),
Res = ecpool:pick_and_do(
ResourceID,
{?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedFunc]},
{?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedCTX]},
no_handover
),
handle_result(Res);
@ -250,22 +247,6 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) ->
{error, {unrecoverable_error, {invalid_message_tag, ChannelId}}}
end.
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
publish_messages(
Conn,
RabbitMQ,
@ -278,7 +259,7 @@ publish_messages(
publish_confirmation_timeout := PublishConfirmationTimeout
},
Messages,
TraceRenderedFunc
TraceRenderedCTX
) ->
try
publish_messages(
@ -291,10 +272,10 @@ publish_messages(
Messages,
WaitForPublishConfirmations,
PublishConfirmationTimeout,
TraceRenderedFunc
TraceRenderedCTX
)
catch
error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason ->
error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason ->
{error, Reason};
%% if send a message to a non-existent exchange, RabbitMQ client will crash
%% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>}
@ -314,7 +295,7 @@ publish_messages(
Messages,
WaitForPublishConfirmations,
PublishConfirmationTimeout,
{TraceRenderedFun, TraceRenderedFuncCTX}
TraceRenderedCTX
) ->
case maps:find(Conn, RabbitMQ) of
{ok, Channel} ->
@ -330,20 +311,17 @@ publish_messages(
format_data(PayloadTmpl, M)
|| {_, M} <- Messages
],
TraceRenderedFun(
#{
messages => FormattedMsgs,
properties => #{
headers => [],
delivery_mode => DeliveryMode
},
method => #{
exchange => Exchange,
routing_key => RoutingKey
}
emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
messages => FormattedMsgs,
properties => #{
headers => [],
delivery_mode => DeliveryMode
},
TraceRenderedFuncCTX
),
method => #{
exchange => Exchange,
routing_key => RoutingKey
}
}),
lists:foreach(
fun(Msg) ->
amqp_channel:cast(

View File

@ -10,6 +10,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
@ -200,12 +201,10 @@ on_batch_query(
) ->
case maps:find(ChannelId, Channels) of
{ok, #{batch := Tokens, opts := Opts}} ->
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId),
do_query_job(
InstanceId,
{?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedFunc]},
{?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedCTX]},
State
);
_ ->
@ -216,22 +215,6 @@ on_batch_query(InstanceId, BatchReq, State) ->
?SLOG(error, LogMeta#{msg => "invalid_request"}),
{error, {unrecoverable_error, invalid_request}}.
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
case
emqx_resource_pool:health_check_workers(
@ -358,13 +341,16 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
execute(Conn, Query, Opts) ->
tdengine:insert(Conn, Query, Opts).
do_batch_insert(Conn, Tokens, BatchReqs, Opts, {TraceRenderedFun, TraceRenderedFunCTX}) ->
do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX) ->
SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
try
TraceRenderedFun(#{query => SQL}, TraceRenderedFunCTX),
emqx_trace:rendered_action_template_with_ctx(
TraceRenderedCTX,
#{query => SQL}
),
execute(Conn, SQL, Opts)
catch
error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason ->
error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason ->
{error, Reason}
end.

View File

@ -18,6 +18,7 @@
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include_lib("emqx_resource/include/emqx_resource_errors.hrl").
-export([
@ -724,7 +725,7 @@ inc_action_metrics(TraceCtx, Result) ->
do_inc_action_metrics(
#{rule_id := RuleId, action_id := ActId} = TraceContext,
{error, {unrecoverable_error, {action_stopped_after_template_rendering, Explanation}} = _Reason}
{error, ?EMQX_TRACE_STOP_ACTION(Explanation) = _Reason}
) ->
TraceContext1 = maps:remove(action_id, TraceContext),
trace_action(