From 157e2c25357cea5e48fff7c133bb81194b56134c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 3 May 2024 17:25:02 -0300 Subject: [PATCH] refactor: make api more generic --- .../src/emqx_bridge_s3_connector.erl | 9 +++++---- .../src/emqx_connector_aggreg_delivery.erl | 17 +++++++++++------ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index fa488fab9..a3edc717e 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -409,18 +409,19 @@ init_transfer_state(Buffer, Opts) -> mk_object_key(Buffer, #{action := Name, key := Template}) -> emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}). -process_append(Writes, Upload) -> - emqx_s3_upload:append(Writes, Upload). +process_append(Writes, Upload0) -> + {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), + Upload. process_write(Upload0) -> case emqx_s3_upload:write(Upload0) of {ok, Upload} -> - Upload; + {ok, Upload}; {cont, Upload} -> process_write(Upload); {error, Reason} -> _ = emqx_s3_upload:abort(Upload0), - exit({upload_failed, Reason}) + {error, Reason} end. process_complete(Upload) -> diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl index 6b62bc15c..5f946dd7c 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -44,9 +44,9 @@ -callback init_transfer_state(buffer(), map()) -> transfer_state(). --callback process_append(iodata(), transfer_state()) -> {ok, transfer_state()}. +-callback process_append(iodata(), transfer_state()) -> transfer_state(). --callback process_write(transfer_state()) -> transfer_state(). +-callback process_write(transfer_state()) -> {ok, transfer_state()} | {error, term()}. -callback process_complete(transfer_state()) -> {ok, term()}. @@ -134,7 +134,7 @@ process_append_records( } ) -> {Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0), - {ok, Transfer} = Mod:process_append(Writes, Transfer0), + Transfer = Mod:process_append(Writes, Transfer0), Delivery#delivery{ container = Container, transfer = Transfer, @@ -142,8 +142,13 @@ process_append_records( }. process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) -> - Transfer = Mod:process_write(Transfer0), - Delivery#delivery{transfer = Transfer}. + case Mod:process_write(Transfer0) of + {ok, Transfer} -> + Delivery#delivery{transfer = Transfer}; + {error, Reason} -> + %% Todo: handle more gracefully? Retry? + error({transfer_failed, Reason}) + end. process_complete(#delivery{name = Name, empty = true}) -> ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}), @@ -152,7 +157,7 @@ process_complete(#delivery{ name = Name, callback_module = Mod, container = Container, transfer = Transfer0 }) -> Trailer = emqx_connector_aggreg_csv:close(Container), - {ok, Transfer} = Mod:process_append(Trailer, Transfer0), + Transfer = Mod:process_append(Trailer, Transfer0), {ok, Completed} = Mod:process_complete(Transfer), ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}), ok.