feat: add stop after render and after render trace to tdengine action
This commit is contained in:
parent
d6ceeb3b30
commit
0dbaef4316
|
@ -32,7 +32,7 @@
|
||||||
|
|
||||||
-export([connector_examples/1]).
|
-export([connector_examples/1]).
|
||||||
|
|
||||||
-export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]).
|
-export([connect/1, do_get_status/1, execute/3, do_batch_insert/5]).
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||||
|
|
||||||
|
@ -186,6 +186,7 @@ on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) ->
|
||||||
case maps:find(ChannelId, Channels) of
|
case maps:find(ChannelId, Channels) of
|
||||||
{ok, #{insert := Tokens, opts := Opts}} ->
|
{ok, #{insert := Tokens, opts := Opts}} ->
|
||||||
Query = emqx_placeholder:proc_tmpl(Tokens, Data),
|
Query = emqx_placeholder:proc_tmpl(Tokens, Data),
|
||||||
|
emqx_trace:rendered_action_template(ChannelId, #{query => Query}),
|
||||||
do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State);
|
do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State);
|
||||||
_ ->
|
_ ->
|
||||||
{error, {unrecoverable_error, {invalid_channel_id, InstanceId}}}
|
{error, {unrecoverable_error, {invalid_channel_id, InstanceId}}}
|
||||||
|
@ -199,9 +200,12 @@ on_batch_query(
|
||||||
) ->
|
) ->
|
||||||
case maps:find(ChannelId, Channels) of
|
case maps:find(ChannelId, Channels) of
|
||||||
{ok, #{batch := Tokens, opts := Opts}} ->
|
{ok, #{batch := Tokens, opts := Opts}} ->
|
||||||
|
LogMetaData = logger:get_process_metadata(),
|
||||||
|
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
|
||||||
|
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
|
||||||
do_query_job(
|
do_query_job(
|
||||||
InstanceId,
|
InstanceId,
|
||||||
{?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]},
|
{?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedFunc]},
|
||||||
State
|
State
|
||||||
);
|
);
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -212,6 +216,22 @@ on_batch_query(InstanceId, BatchReq, State) ->
|
||||||
?SLOG(error, LogMeta#{msg => "invalid_request"}),
|
?SLOG(error, LogMeta#{msg => "invalid_request"}),
|
||||||
{error, {unrecoverable_error, invalid_request}}.
|
{error, {unrecoverable_error, invalid_request}}.
|
||||||
|
|
||||||
|
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
|
||||||
|
OldMetaData =
|
||||||
|
case logger:get_process_metadata() of
|
||||||
|
undefined -> #{};
|
||||||
|
M -> M
|
||||||
|
end,
|
||||||
|
try
|
||||||
|
logger:set_process_metadata(LogMetaData),
|
||||||
|
emqx_trace:rendered_action_template(
|
||||||
|
ActionID,
|
||||||
|
RenderResult
|
||||||
|
)
|
||||||
|
after
|
||||||
|
logger:set_process_metadata(OldMetaData)
|
||||||
|
end.
|
||||||
|
|
||||||
on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
|
on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
|
||||||
case
|
case
|
||||||
emqx_resource_pool:health_check_workers(
|
emqx_resource_pool:health_check_workers(
|
||||||
|
@ -338,9 +358,15 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
|
||||||
execute(Conn, Query, Opts) ->
|
execute(Conn, Query, Opts) ->
|
||||||
tdengine:insert(Conn, Query, Opts).
|
tdengine:insert(Conn, Query, Opts).
|
||||||
|
|
||||||
do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
|
do_batch_insert(Conn, Tokens, BatchReqs, Opts, {TraceRenderedFun, TraceRenderedFunCTX}) ->
|
||||||
SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
|
SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
|
||||||
execute(Conn, SQL, Opts).
|
try
|
||||||
|
TraceRenderedFun(#{query => SQL}, TraceRenderedFunCTX),
|
||||||
|
execute(Conn, SQL, Opts)
|
||||||
|
catch
|
||||||
|
error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
aggregate_query(BatchTks, BatchReqs, Acc) ->
|
aggregate_query(BatchTks, BatchReqs, Acc) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
|
|
Loading…
Reference in New Issue