feat: add stop after render and after render trace to mqtt action

This commit is contained in:
Kjell Winblad 2024-04-23 10:36:05 +02:00
parent 5ca90ccced
commit 1f676ce035
2 changed files with 64 additions and 20 deletions

View File

@ -264,7 +264,7 @@ on_query(
), ),
Channels = maps:get(installed_channels, State), Channels = maps:get(installed_channels, State),
ChannelConfig = maps:get(ChannelId, Channels), ChannelConfig = maps:get(ChannelId, Channels),
handle_send_result(with_egress_client(PoolName, send, [Msg, ChannelConfig])); handle_send_result(with_egress_client(ChannelId, PoolName, send, [Msg, ChannelConfig]));
on_query(ResourceId, {_ChannelId, Msg}, #{}) -> on_query(ResourceId, {_ChannelId, Msg}, #{}) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "forwarding_unavailable", msg => "forwarding_unavailable",
@ -283,7 +283,7 @@ on_query_async(
Callback = {fun on_async_result/2, [CallbackIn]}, Callback = {fun on_async_result/2, [CallbackIn]},
Channels = maps:get(installed_channels, State), Channels = maps:get(installed_channels, State),
ChannelConfig = maps:get(ChannelId, Channels), ChannelConfig = maps:get(ChannelId, Channels),
Result = with_egress_client(PoolName, send_async, [Msg, Callback, ChannelConfig]), Result = with_egress_client(ChannelId, PoolName, send_async, [Msg, Callback, ChannelConfig]),
case Result of case Result of
ok -> ok ->
ok; ok;
@ -300,8 +300,25 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
reason => "Egress is not configured" reason => "Egress is not configured"
}). }).
with_egress_client(ResourceId, Fun, Args) -> with_egress_client(ActionID, ResourceId, Fun, Args) ->
ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover). LogMetaData = logger:get_process_metadata(),
TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ActionID},
TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext},
ecpool:pick_and_do(
ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedFunc | Args]}, no_handover
).
trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) ->
OldMetaData = logger:get_process_metadata(),
try
logger:set_process_metadata(LogMetaData),
emqx_trace:rendered_action_template(
ActionID,
RenderResult
)
after
logger:set_process_metadata(OldMetaData)
end.
on_async_result(Callback, Result) -> on_async_result(Callback, Result) ->
apply_callback_function(Callback, handle_send_result(Result)). apply_callback_function(Callback, handle_send_result(Result)).
@ -322,7 +339,9 @@ handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) ->
handle_send_result({ok, Reply}) -> handle_send_result({ok, Reply}) ->
{error, classify_reply(Reply)}; {error, classify_reply(Reply)};
handle_send_result({error, Reason}) -> handle_send_result({error, Reason}) ->
{error, classify_error(Reason)}. {error, classify_error(Reason)};
handle_send_result({unrecoverable_error, Reason}) ->
{error, {unrecoverable_error, Reason}}.
classify_reply(Reply = #{reason_code := _}) -> classify_reply(Reply = #{reason_code := _}) ->
{unrecoverable_error, Reply}. {unrecoverable_error, Reply}.

View File

@ -22,13 +22,16 @@
-export([ -export([
config/1, config/1,
send/3, send/4,
send_async/4 send_async/5
]). ]).
-type message() :: emqx_types:message() | map(). -type message() :: emqx_types:message() | map().
-type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. -type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}.
-type remote_message() :: #mqtt_msg{}. -type remote_message() :: #mqtt_msg{}.
-type trace_rendered_func() :: {
fun((RenderResult :: any(), CTX :: map()) -> any()), TraceCTX :: map()
}.
-type egress() :: #{ -type egress() :: #{
local => #{ local => #{
@ -42,25 +45,37 @@
config(#{remote := RC = #{}} = Conf) -> config(#{remote := RC = #{}} = Conf) ->
Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}.
-spec send(pid(), message(), egress()) -> ok. -spec send(pid(), trace_rendered_func(), message(), egress()) -> ok.
send(Pid, MsgIn, Egress) -> send(Pid, TraceRenderedFunc, MsgIn, Egress) ->
emqtt:publish(Pid, export_msg(MsgIn, Egress)). try
emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedFunc))
catch
error:{unrecoverable_error, Reason} ->
{unrecoverable_error, Reason}
end.
-spec send_async(pid(), message(), callback(), egress()) -> -spec send_async(pid(), trace_rendered_func(), message(), callback(), egress()) ->
ok | {ok, pid()}. ok | {ok, pid()}.
send_async(Pid, MsgIn, Callback, Egress) -> send_async(Pid, TraceRenderedFunc, MsgIn, Callback, Egress) ->
ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback), try
{ok, Pid}. ok = emqtt:publish_async(
Pid, export_msg(MsgIn, Egress, TraceRenderedFunc), _Timeout = infinity, Callback
),
{ok, Pid}
catch
error:{unrecoverable_error, Reason} ->
{unrecoverable_error, Reason}
end.
export_msg(Msg, #{remote := Remote}) -> export_msg(Msg, #{remote := Remote}, TraceRenderedFunc) ->
to_remote_msg(Msg, Remote). to_remote_msg(Msg, Remote, TraceRenderedFunc).
-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) -> -spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars(), trace_rendered_func()) ->
remote_message(). remote_message().
to_remote_msg(#message{flags = Flags} = Msg, Vars) -> to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedFunc) ->
{EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedFunc);
to_remote_msg(Msg = #{}, Remote) -> to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) ->
#{ #{
topic := Topic, topic := Topic,
payload := Payload, payload := Payload,
@ -68,6 +83,16 @@ to_remote_msg(Msg = #{}, Remote) ->
retain := Retain retain := Retain
} = emqx_bridge_mqtt_msg:render(Msg, Remote), } = emqx_bridge_mqtt_msg:render(Msg, Remote),
PubProps = maps:get(pub_props, Msg, #{}), PubProps = maps:get(pub_props, Msg, #{}),
TraceRenderedFun(
#{
qos => QoS,
retain => Retain,
topic => Topic,
props => PubProps,
payload => Payload
},
TraceRenderedCTX
),
#mqtt_msg{ #mqtt_msg{
qos = QoS, qos = QoS,
retain = Retain, retain = Retain,