fix: add action rendered trace for s3 in aggregated mode
Fixes: https://emqx.atlassian.net/browse/EMQX-12429
This commit is contained in:
parent
26c988fe11
commit
e10c87b825
|
@ -310,7 +310,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels})
|
||||||
ChannelState = #{mode := direct} ->
|
ChannelState = #{mode := direct} ->
|
||||||
run_simple_upload(InstId, Tag, Data, ChannelState, Config);
|
run_simple_upload(InstId, Tag, Data, ChannelState, Config);
|
||||||
ChannelState = #{mode := aggregated} ->
|
ChannelState = #{mode := aggregated} ->
|
||||||
run_aggregated_upload(InstId, [Data], ChannelState);
|
run_aggregated_upload(InstId, Tag, [Data], ChannelState);
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
||||||
end.
|
end.
|
||||||
|
@ -321,7 +321,7 @@ on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) ->
|
||||||
case maps:get(Tag, Channels, undefined) of
|
case maps:get(Tag, Channels, undefined) of
|
||||||
ChannelState = #{mode := aggregated} ->
|
ChannelState = #{mode := aggregated} ->
|
||||||
Records = [Data0 | [Data || {_, Data} <- Rest]],
|
Records = [Data0 | [Data || {_, Data} <- Rest]],
|
||||||
run_aggregated_upload(InstId, Records, ChannelState);
|
run_aggregated_upload(InstId, Tag, Records, ChannelState);
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
||||||
end.
|
end.
|
||||||
|
@ -362,8 +362,15 @@ run_simple_upload(
|
||||||
{error, map_error(Reason)}
|
{error, map_error(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
|
run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) ->
|
||||||
Timestamp = erlang:system_time(second),
|
Timestamp = erlang:system_time(second),
|
||||||
|
emqx_trace:rendered_action_template(ChannelID, #{
|
||||||
|
mode => aggregated,
|
||||||
|
records => #emqx_trace_format_func_data{
|
||||||
|
function = fun render_records/1,
|
||||||
|
data = Records
|
||||||
|
}
|
||||||
|
}),
|
||||||
case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
|
case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
|
||||||
ok ->
|
ok ->
|
||||||
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
|
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
|
||||||
|
@ -372,6 +379,13 @@ run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
|
||||||
{error, {unrecoverable_error, Reason}}
|
{error, {unrecoverable_error, Reason}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
render_records(Records) ->
|
||||||
|
try
|
||||||
|
[unicode:characters_to_binary(R) || R <- Records]
|
||||||
|
catch
|
||||||
|
_:_ -> Records
|
||||||
|
end.
|
||||||
|
|
||||||
map_error({socket_error, _} = Reason) ->
|
map_error({socket_error, _} = Reason) ->
|
||||||
{recoverable_error, Reason};
|
{recoverable_error, Reason};
|
||||||
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
|
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
|
||||||
|
|
Loading…
Reference in New Issue