From 0dbaef431671ada97cc4a9030a8a3e42e791a7fa Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 17:03:04 +0200 Subject: [PATCH] feat: add stop after render and after render trace to tdengine action --- .../src/emqx_bridge_tdengine_connector.erl | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 7bb342ed1..383ceabf7 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -32,7 +32,7 @@ -export([connector_examples/1]). --export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]). +-export([connect/1, do_get_status/1, execute/3, do_batch_insert/5]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -186,6 +186,7 @@ on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) -> case maps:find(ChannelId, Channels) of {ok, #{insert := Tokens, opts := Opts}} -> Query = emqx_placeholder:proc_tmpl(Tokens, Data), + emqx_trace:rendered_action_template(ChannelId, #{query => Query}), do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State); _ -> {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}} @@ -199,9 +200,12 @@ on_batch_query( ) -> case maps:find(ChannelId, Channels) of {ok, #{batch := Tokens, opts := Opts}} -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, do_query_job( InstanceId, - {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]}, + {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedFunc]}, State ); _ -> @@ -212,6 +216,22 @@ on_batch_query(InstanceId, BatchReq, State) -> ?SLOG(error, LogMeta#{msg => "invalid_request"}), {error, {unrecoverable_error, invalid_request}}. +trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. + on_get_status(_InstanceId, #{pool_name := PoolName} = State) -> case emqx_resource_pool:health_check_workers( @@ -338,9 +358,15 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). -do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> +do_batch_insert(Conn, Tokens, BatchReqs, Opts, {TraceRenderedFun, TraceRenderedFunCTX}) -> SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>), - execute(Conn, SQL, Opts). + try + TraceRenderedFun(#{query => SQL}, TraceRenderedFunCTX), + execute(Conn, SQL, Opts) + catch + error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + {error, Reason} + end. aggregate_query(BatchTks, BatchReqs, Acc) -> lists:foldl(