From ef9884cf47727ef334af65f469779f6345a325d2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 25 Apr 2024 11:39:20 +0200 Subject: [PATCH] refactor(rule trace): templates rendered trace to increase code reuse * The code for passing the trace context to a sub process has been improved to increase code reuse. This code is used when the action templates are rendered in a sub process. * A macro has also been added for the error term that is thrown when the action shall be stopped after the templates has been rendered. This is also done to reduce code duplication and to reduce the risk of introducing bugs due to typos. * Fix incorrect type spec Thanks to @zmstone for suggesting these improvements in comments to a PR (https://github.com/emqx/emqx/pull/12916). --- apps/emqx/include/emqx_trace.hrl | 6 ++ apps/emqx/src/emqx_trace/emqx_trace.erl | 55 ++++++++++++++++- .../src/emqx_bridge_dynamo_connector.erl | 26 ++------ .../emqx_bridge_dynamo_connector_client.erl | 23 +++++-- .../src/emqx_bridge_mqtt_connector.erl | 28 ++------- .../src/emqx_bridge_mqtt_egress.erl | 53 ++++++++-------- .../src/emqx_bridge_rabbitmq_connector.erl | 60 ++++++------------- .../src/emqx_bridge_tdengine_connector.erl | 32 +++------- .../src/emqx_rule_runtime.erl | 3 +- 9 files changed, 140 insertions(+), 146 deletions(-) diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index d1e70b184..27dd8b6c8 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -38,4 +38,10 @@ -define(SHARD, ?COMMON_SHARD). -define(MAX_SIZE, 30). +-define(EMQX_TRACE_STOP_ACTION(REASON), + {unrecoverable_error, {action_stopped_after_template_rendering, REASON}} +). + +-define(EMQX_TRACE_STOP_ACTION_MATCH, ?EMQX_TRACE_STOP_ACTION(_)). + -endif. diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 329e5f696..91de65b39 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -29,7 +29,9 @@ unsubscribe/2, log/3, log/4, - rendered_action_template/2 + rendered_action_template/2, + make_rendered_action_template_trace_context/1, + rendered_action_template_with_ctx/2 ]). -export([ @@ -70,6 +72,12 @@ -export_type([ruleid/0]). -type ruleid() :: binary(). +-export_type([rendered_action_template_ctx/0]). +-opaque rendered_action_template_ctx() :: #{ + trace_ctx := map(), + action_id := any() +}. + publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when @@ -107,15 +115,56 @@ rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> ) ), MsgBin = unicode:characters_to_binary(StopMsg), - error({unrecoverable_error, {action_stopped_after_template_rendering, MsgBin}}); + error(?EMQX_TRACE_STOP_ACTION(MsgBin)); _ -> ok end, TraceResult; rendered_action_template(_ActionID, _RenderResult) -> - %% We do nothing if we don't get a valid Action ID + %% We do nothing if we don't get a valid Action ID. This can happen when + %% called from connectors that are used for actions as well as authz and + %% authn. ok. +%% The following two functions are used for connectors that don't do the +%% rendering in the main process (the one that called on_*query). In this case +%% we need to pass the trace context to the sub process that do the rendering +%% so that the result of the rendering can be traced correctly. It is also +%% important to ensure that the error that can be thrown from +%% rendered_action_template_with_ctx is handled in the appropriate way in the +%% sub process. +-spec make_rendered_action_template_trace_context(any()) -> rendered_action_template_ctx(). +make_rendered_action_template_trace_context(ActionID) -> + MetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + #{trace_ctx => MetaData, action_id => ActionID}. + +-spec rendered_action_template_with_ctx(rendered_action_template_ctx(), Result :: term()) -> term(). +rendered_action_template_with_ctx( + #{ + trace_ctx := LogMetaData, + action_id := ActionID + }, + RenderResult +) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. + log(List, Msg, Meta) -> log(debug, List, Msg, Meta). diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index f9a87ccf7..598b3342d 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -9,6 +9,7 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -246,16 +247,15 @@ do_query( table := Table, templates := Templates } = ChannelState, - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = + emqx_trace:make_rendered_action_template_trace_context(ChannelId), Result = case ensuare_dynamo_keys(Query, ChannelState) of true -> ecpool:pick_and_do( PoolName, {emqx_bridge_dynamo_connector_client, query, [ - Table, QueryTuple, Templates, TraceRenderedFunc + Table, QueryTuple, Templates, TraceRenderedCTX ]}, no_handover ); @@ -264,7 +264,7 @@ do_query( end, case Result of - {error, {unrecoverable_error, {action_stopped_after_template_rendering, _}}} = Error -> + {error, ?EMQX_TRACE_STOP_ACTION(_)} = Error -> Error; {error, Reason} -> ?tp( @@ -298,22 +298,6 @@ do_query( Result end. -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - get_channel_id([{ChannelId, _Req} | _]) -> ChannelId; get_channel_id({ChannelId, _Req}) -> diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index 1d1ad3760..f257ae389 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -40,8 +40,8 @@ is_connected(Pid, Timeout) -> {false, Error} end. -query(Pid, Table, Query, Templates, TraceRenderedFunc) -> - gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedFunc}, infinity). +query(Pid, Table, Query, Templates, TraceRenderedCTX) -> + gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedCTX}, infinity). %%-------------------------------------------------------------------- %% @doc @@ -77,8 +77,8 @@ handle_call(is_connected, _From, State) -> {false, Error} end, {reply, IsConnected, State}; -handle_call({query, Table, Query, Templates, TraceRenderedFunc}, _From, State) -> - Result = do_query(Table, Query, Templates, TraceRenderedFunc), +handle_call({query, Table, Query, Templates, TraceRenderedCTX}, _From, State) -> + Result = do_query(Table, Query, Templates, TraceRenderedCTX), {reply, Result, State}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -102,10 +102,13 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) -> +do_query(Table, Query0, Templates, TraceRenderedCTX) -> try Query = apply_template(Query0, Templates), - TraceRenderedFun(#{table => Table, query => Query}, TraceRenderedCTX), + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + table => Table, + query => {fun trace_format_query/1, Query} + }), execute(Query, Table) catch error:{unrecoverable_error, Reason} -> @@ -114,6 +117,14 @@ do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) -> {error, {unrecoverable_error, {invalid_request, Reason}}} end. +trace_format_query({Type, Data}) -> + #{type => Type, data => Data}; +trace_format_query([_ | _] = Batch) -> + BatchData = [trace_format_query(Q) || Q <- Batch], + #{type => batch, data => BatchData}; +trace_format_query(Query) -> + Query. + %% some simple query commands for authn/authz or test execute({insert_item, Msg}, Table) -> Item = convert_to_item(Msg), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index d3ffd6c92..f133bf334 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -301,29 +301,11 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> }). with_egress_client(ActionID, ResourceId, Fun, Args) -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ActionID}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ActionID), ecpool:pick_and_do( - ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedFunc | Args]}, no_handover + ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedCTX | Args]}, no_handover ). -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - on_async_result(Callback, Result) -> apply_callback_function(Callback, handle_send_result(Result)). @@ -343,9 +325,7 @@ handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) -> handle_send_result({ok, Reply}) -> {error, classify_reply(Reply)}; handle_send_result({error, Reason}) -> - {error, classify_error(Reason)}; -handle_send_result({unrecoverable_error, Reason}) -> - {error, {unrecoverable_error, Reason}}. + {error, classify_error(Reason)}. classify_reply(Reply = #{reason_code := _}) -> {unrecoverable_error, Reply}. @@ -360,6 +340,8 @@ classify_error({shutdown, _} = Reason) -> {recoverable_error, Reason}; classify_error(shutdown = Reason) -> {recoverable_error, Reason}; +classify_error({unrecoverable_error, _Reason} = Error) -> + Error; classify_error(Reason) -> {unrecoverable_error, Reason}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index 80d38bc78..a4a0b0d37 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -29,9 +29,6 @@ -type message() :: emqx_types:message() | map(). -type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. -type remote_message() :: #mqtt_msg{}. --type trace_rendered_func() :: { - fun((RenderResult :: any(), CTX :: map()) -> any()), TraceCTX :: map() -}. -type egress() :: #{ local => #{ @@ -45,37 +42,40 @@ config(#{remote := RC = #{}} = Conf) -> Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. --spec send(pid(), trace_rendered_func(), message(), egress()) -> ok. -send(Pid, TraceRenderedFunc, MsgIn, Egress) -> +-spec send(pid(), emqx_trace:rendered_action_template_ctx(), message(), egress()) -> + ok | {error, {unrecoverable_error, term()}}. +send(Pid, TraceRenderedCTX, MsgIn, Egress) -> try - emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedFunc)) + emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedCTX)) catch error:{unrecoverable_error, Reason} -> - {unrecoverable_error, Reason} + {error, {unrecoverable_error, Reason}} end. --spec send_async(pid(), trace_rendered_func(), message(), callback(), egress()) -> - ok | {ok, pid()}. -send_async(Pid, TraceRenderedFunc, MsgIn, Callback, Egress) -> +-spec send_async(pid(), emqx_trace:rendered_action_template_ctx(), message(), callback(), egress()) -> + {ok, pid()} | {error, {unrecoverable_error, term()}}. +send_async(Pid, TraceRenderedCTX, MsgIn, Callback, Egress) -> try ok = emqtt:publish_async( - Pid, export_msg(MsgIn, Egress, TraceRenderedFunc), _Timeout = infinity, Callback + Pid, export_msg(MsgIn, Egress, TraceRenderedCTX), _Timeout = infinity, Callback ), {ok, Pid} catch error:{unrecoverable_error, Reason} -> - {unrecoverable_error, Reason} + {error, {unrecoverable_error, Reason}} end. -export_msg(Msg, #{remote := Remote}, TraceRenderedFunc) -> - to_remote_msg(Msg, Remote, TraceRenderedFunc). +export_msg(Msg, #{remote := Remote}, TraceRenderedCTX) -> + to_remote_msg(Msg, Remote, TraceRenderedCTX). --spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars(), trace_rendered_func()) -> +-spec to_remote_msg( + message(), emqx_bridge_mqtt_msg:msgvars(), emqx_trace:rendered_action_template_ctx() +) -> remote_message(). -to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedFunc) -> +to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedCTX) -> {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), - to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedFunc); -to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) -> + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedCTX); +to_remote_msg(Msg = #{}, Remote, TraceRenderedCTX) -> #{ topic := Topic, payload := Payload, @@ -83,16 +83,13 @@ to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) -> retain := Retain } = emqx_bridge_mqtt_msg:render(Msg, Remote), PubProps = maps:get(pub_props, Msg, #{}), - TraceRenderedFun( - #{ - qos => QoS, - retain => Retain, - topic => Topic, - props => PubProps, - payload => Payload - }, - TraceRenderedCTX - ), + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + qos => QoS, + retain => Retain, + topic => Topic, + props => PubProps, + payload => Payload + }), #mqtt_msg{ qos = QoS, retain = Retain, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 1637743b5..dacb47a57 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -9,6 +9,7 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -214,12 +215,10 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedFunc]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedCTX]}, no_handover ), handle_result(Res); @@ -237,12 +236,10 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedFunc]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedCTX]}, no_handover ), handle_result(Res); @@ -250,22 +247,6 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}} end. -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - publish_messages( Conn, RabbitMQ, @@ -278,7 +259,7 @@ publish_messages( publish_confirmation_timeout := PublishConfirmationTimeout }, Messages, - TraceRenderedFunc + TraceRenderedCTX ) -> try publish_messages( @@ -291,10 +272,10 @@ publish_messages( Messages, WaitForPublishConfirmations, PublishConfirmationTimeout, - TraceRenderedFunc + TraceRenderedCTX ) catch - error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> {error, Reason}; %% if send a message to a non-existent exchange, RabbitMQ client will crash %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>} @@ -314,7 +295,7 @@ publish_messages( Messages, WaitForPublishConfirmations, PublishConfirmationTimeout, - {TraceRenderedFun, TraceRenderedFuncCTX} + TraceRenderedCTX ) -> case maps:find(Conn, RabbitMQ) of {ok, Channel} -> @@ -330,20 +311,17 @@ publish_messages( format_data(PayloadTmpl, M) || {_, M} <- Messages ], - TraceRenderedFun( - #{ - messages => FormattedMsgs, - properties => #{ - headers => [], - delivery_mode => DeliveryMode - }, - method => #{ - exchange => Exchange, - routing_key => RoutingKey - } + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + messages => FormattedMsgs, + properties => #{ + headers => [], + delivery_mode => DeliveryMode }, - TraceRenderedFuncCTX - ), + method => #{ + exchange => Exchange, + routing_key => RoutingKey + } + }), lists:foreach( fun(Msg) -> amqp_channel:cast( diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 383ceabf7..67b0e77bc 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -10,6 +10,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -200,12 +201,10 @@ on_batch_query( ) -> case maps:find(ChannelId, Channels) of {ok, #{batch := Tokens, opts := Opts}} -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), do_query_job( InstanceId, - {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedFunc]}, + {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedCTX]}, State ); _ -> @@ -216,22 +215,6 @@ on_batch_query(InstanceId, BatchReq, State) -> ?SLOG(error, LogMeta#{msg => "invalid_request"}), {error, {unrecoverable_error, invalid_request}}. -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - on_get_status(_InstanceId, #{pool_name := PoolName} = State) -> case emqx_resource_pool:health_check_workers( @@ -358,13 +341,16 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). -do_batch_insert(Conn, Tokens, BatchReqs, Opts, {TraceRenderedFun, TraceRenderedFunCTX}) -> +do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX) -> SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>), try - TraceRenderedFun(#{query => SQL}, TraceRenderedFunCTX), + emqx_trace:rendered_action_template_with_ctx( + TraceRenderedCTX, + #{query => SQL} + ), execute(Conn, SQL, Opts) catch - error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> {error, Reason} end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index e2f01321a..5ec4bdc6e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -18,6 +18,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("emqx_resource/include/emqx_resource_errors.hrl"). -export([ @@ -724,7 +725,7 @@ inc_action_metrics(TraceCtx, Result) -> do_inc_action_metrics( #{rule_id := RuleId, action_id := ActId} = TraceContext, - {error, {unrecoverable_error, {action_stopped_after_template_rendering, Explanation}} = _Reason} + {error, ?EMQX_TRACE_STOP_ACTION(Explanation) = _Reason} ) -> TraceContext1 = maps:remove(action_id, TraceContext), trace_action(