diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 88b161033..dd0362e5d 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -310,7 +310,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ChannelState = #{mode := direct} -> run_simple_upload(InstId, Tag, Data, ChannelState, Config); ChannelState = #{mode := aggregated} -> - run_aggregated_upload(InstId, [Data], ChannelState); + run_aggregated_upload(InstId, Tag, [Data], ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -321,7 +321,7 @@ on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) -> case maps:get(Tag, Channels, undefined) of ChannelState = #{mode := aggregated} -> Records = [Data0 | [Data || {_, Data} <- Rest]], - run_aggregated_upload(InstId, Records, ChannelState); + run_aggregated_upload(InstId, Tag, Records, ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -362,8 +362,15 @@ run_simple_upload( {error, map_error(Reason)} end. -run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> +run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> Timestamp = erlang:system_time(second), + emqx_trace:rendered_action_template(ChannelID, #{ + mode => aggregated, + records => #emqx_trace_format_func_data{ + function = fun render_records/1, + data = Records + } + }), case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of ok -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), @@ -372,6 +379,13 @@ run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> {error, {unrecoverable_error, Reason}} end. +render_records(Records) -> + try + [unicode:characters_to_binary(R) || R <- Records] + catch + _:_ -> Records + end. + map_error({socket_error, _} = Reason) -> {recoverable_error, Reason}; map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->