feat: add stop after render and after render trace to dynamo action

This commit is contained in:
Kjell Winblad 2024-04-23 17:58:55 +02:00
parent 120b35ac75
commit 810aa68b02
3 changed files with 39 additions and 9 deletions

View File

@ -246,12 +246,17 @@ do_query(
table := Table,
templates := Templates
} = ChannelState,
LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
Result =
case ensuare_dynamo_keys(Query, ChannelState) of
true ->
ecpool:pick_and_do(
PoolName,
{emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]},
{emqx_bridge_dynamo_connector_client, query, [
Table, QueryTuple, Templates, TraceRenderedFunc
]},
no_handover
);
_ ->
@ -259,6 +264,8 @@ do_query(
end,
case Result of
{error, {unrecoverable_error, {action_stopped_after_template_rendering, _}}} = Error ->
Error;
{error, Reason} ->
?tp(
dynamo_connector_query_return,
@ -291,6 +298,22 @@ do_query(
Result
end.
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
get_channel_id([{ChannelId, _Req} | _]) ->
ChannelId;
get_channel_id({ChannelId, _Req}) ->

View File

@ -10,7 +10,7 @@
-export([
start_link/1,
is_connected/2,
query/4
query/5
]).
%% gen_server callbacks
@ -40,8 +40,8 @@ is_connected(Pid, Timeout) ->
{false, Error}
end.
query(Pid, Table, Query, Templates) ->
gen_server:call(Pid, {query, Table, Query, Templates}, infinity).
query(Pid, Table, Query, Templates, TraceRenderedFunc) ->
gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedFunc}, infinity).
%%--------------------------------------------------------------------
%% @doc
@ -77,14 +77,14 @@ handle_call(is_connected, _From, State) ->
{false, Error}
end,
{reply, IsConnected, State};
handle_call({query, Table, Query, Templates}, _From, State) ->
Result = do_query(Table, Query, Templates),
handle_call({query, Table, Query, Templates, TraceRenderedFunc}, _From, State) ->
Result = do_query(Table, Query, Templates, TraceRenderedFunc),
{reply, Result, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) ->
Result = do_query(Table, Query, Templates),
Result = do_query(Table, Query, Templates, {fun(_, _) -> ok end, none}),
ReplyFun(Context, Result),
{noreply, State};
handle_cast(_Request, State) ->
@ -102,11 +102,14 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
do_query(Table, Query0, Templates) ->
do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) ->
try
Query = apply_template(Query0, Templates),
TraceRenderedFun(#{table => Table, query => Query}, TraceRenderedCTX),
execute(Query, Table)
catch
error:{unrecoverable_error, Reason} ->
{error, {unrecoverable_error, Reason}};
_Type:Reason ->
{error, {unrecoverable_error, {invalid_request, Reason}}}
end.

View File

@ -309,7 +309,11 @@ with_egress_client(ActionID, ResourceId, Fun, Args) ->
).
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData = logger:get_process_metadata(),
OldMetaData =
case logger:get_process_metadata() of
undefined -> #{};
M -> M
end,
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(