diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 3bac17cfb..fa488fab9 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -27,7 +27,7 @@ -behaviour(emqx_connector_aggreg_delivery). -export([ - init_upload_state/2, + init_transfer_state/2, process_append/2, process_write/1, process_complete/1, @@ -394,8 +394,8 @@ iolist_to_string(IOList) -> %% `emqx_connector_aggreg_delivery` APIs --spec init_upload_state(buffer(), map()) -> emqx_s3_upload:t(). -init_upload_state(Buffer, Opts) -> +-spec init_transfer_state(buffer(), map()) -> emqx_s3_upload:t(). +init_transfer_state(Buffer, Opts) -> #{ bucket := Bucket, upload_options := UploadOpts, diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl index ed74311b5..6b62bc15c 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -29,7 +29,7 @@ callback_module :: module(), container :: emqx_connector_aggreg_csv:container(), reader :: emqx_connector_aggreg_buffer:reader(), - upload :: impl_specific_upload_state(), + transfer :: transfer_state(), empty :: boolean() }). @@ -40,16 +40,15 @@ any() => term() }. --type impl_specific_upload_state() :: term(). +-type transfer_state() :: term(). --callback init_upload_state(buffer(), map()) -> impl_specific_upload_state(). +-callback init_transfer_state(buffer(), map()) -> transfer_state(). --callback process_append(iodata(), impl_specific_upload_state()) -> - {ok, impl_specific_upload_state()}. +-callback process_append(iodata(), transfer_state()) -> {ok, transfer_state()}. --callback process_write(impl_specific_upload_state()) -> impl_specific_upload_state(). +-callback process_write(transfer_state()) -> transfer_state(). --callback process_complete(impl_specific_upload_state()) -> {ok, term()}. +-callback process_complete(transfer_state()) -> {ok, term()}. %% @@ -81,7 +80,7 @@ init_delivery( callback_module = Mod, container = mk_container(ContainerOpts), reader = Reader, - upload = Mod:init_upload_state(Buffer, Opts), + transfer = Mod:init_transfer_state(Buffer, Opts), empty = true }. @@ -131,31 +130,31 @@ process_append_records( Delivery = #delivery{ callback_module = Mod, container = Container0, - upload = Upload0 + transfer = Transfer0 } ) -> {Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0), - {ok, Upload} = Mod:process_append(Writes, Upload0), + {ok, Transfer} = Mod:process_append(Writes, Transfer0), Delivery#delivery{ container = Container, - upload = Upload, + transfer = Transfer, empty = false }. -process_write(Delivery = #delivery{callback_module = Mod, upload = Upload0}) -> - Upload = Mod:process_write(Upload0), - Delivery#delivery{upload = Upload}. +process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) -> + Transfer = Mod:process_write(Transfer0), + Delivery#delivery{transfer = Transfer}. process_complete(#delivery{name = Name, empty = true}) -> - ?tp(connector_aggreg_delivery_completed, #{action => Name, upload => empty}), + ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}), exit({shutdown, {skipped, empty}}); process_complete(#delivery{ - name = Name, callback_module = Mod, container = Container, upload = Upload0 + name = Name, callback_module = Mod, container = Container, transfer = Transfer0 }) -> Trailer = emqx_connector_aggreg_csv:close(Container), - {ok, Upload} = Mod:process_append(Trailer, Upload0), - {ok, Completed} = Mod:process_complete(Upload), - ?tp(connector_aggreg_delivery_completed, #{action => Name, upload => Completed}), + {ok, Transfer} = Mod:process_append(Trailer, Transfer0), + {ok, Completed} = Mod:process_complete(Transfer), + ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}), ok. %% @@ -172,12 +171,12 @@ system_continue(Parent, Debug, Delivery) -> loop(Delivery, Parent, Debug). -spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _. -system_terminate(_Reason, _Parent, _Debug, #delivery{callback_module = Mod, upload = Upload}) -> - Mod:process_terminate(Upload). +system_terminate(_Reason, _Parent, _Debug, #delivery{callback_module = Mod, transfer = Transfer}) -> + Mod:process_terminate(Transfer). -spec format_status(normal, Args :: [term()]) -> _StateFormatted. format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) -> #delivery{callback_module = Mod} = Delivery, Delivery#delivery{ - upload = Mod:process_format_status(Delivery#delivery.upload) + transfer = Mod:process_format_status(Delivery#delivery.transfer) }.