diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index a3edc717e..f9d3af478 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -394,8 +394,8 @@ iolist_to_string(IOList) -> %% `emqx_connector_aggreg_delivery` APIs --spec init_transfer_state(buffer(), map()) -> emqx_s3_upload:t(). -init_transfer_state(Buffer, Opts) -> +-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t(). +init_transfer_state(BufferMap, Opts) -> #{ bucket := Bucket, upload_options := UploadOpts, @@ -403,11 +403,11 @@ init_transfer_state(Buffer, Opts) -> uploader_config := UploaderConfig } = Opts, Client = emqx_s3_client:create(Bucket, Config), - Key = mk_object_key(Buffer, Opts), + Key = mk_object_key(BufferMap, Opts), emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). -mk_object_key(Buffer, #{action := Name, key := Template}) -> - emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}). +mk_object_key(BufferMap, #{action := Name, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}). process_append(Writes, Upload0) -> {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), @@ -441,22 +441,22 @@ process_terminate(Upload) -> %% `emqx_template` APIs --spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> +-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> {ok, integer() | string()} | {error, undefined}. lookup([<<"action">>], {Name, _Buffer}) -> {ok, mk_fs_safe_string(Name)}; -lookup(Accessor, {_Name, Buffer = #buffer{}}) -> +lookup(Accessor, {_Name, Buffer = #{}}) -> lookup_buffer_var(Accessor, Buffer); lookup(_Accessor, _Context) -> {error, undefined}. -lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) -> +lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) -> {ok, format_timestamp(Since, Format)}; -lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) -> +lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) -> {ok, format_timestamp(Until, Format)}; -lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) -> +lookup_buffer_var([<<"sequence">>], #{seq := Seq}) -> {ok, Seq}; -lookup_buffer_var([<<"node">>], #buffer{}) -> +lookup_buffer_var([<<"node">>], #{}) -> {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; lookup_buffer_var(_Binding, _Context) -> {error, undefined}. diff --git a/apps/emqx_connector_aggregator/include/emqx_connector_aggregator.hrl b/apps/emqx_connector_aggregator/include/emqx_connector_aggregator.hrl index eeef351f2..07d4a414f 100644 --- a/apps/emqx_connector_aggregator/include/emqx_connector_aggregator.hrl +++ b/apps/emqx_connector_aggregator/include/emqx_connector_aggregator.hrl @@ -13,3 +13,11 @@ }). -type buffer() :: #buffer{}. + +-type buffer_map() :: #{ + since := emqx_connector_aggregator:timestamp(), + until := emqx_connector_aggregator:timestamp(), + seq := non_neg_integer(), + filename := file:filename(), + max_records := pos_integer() | undefined +}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl index 5f946dd7c..70f54a385 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -42,7 +42,7 @@ -type transfer_state() :: term(). --callback init_transfer_state(buffer(), map()) -> transfer_state(). +-callback init_transfer_state(buffer_map(), map()) -> transfer_state(). -callback process_append(iodata(), transfer_state()) -> transfer_state(). @@ -75,12 +75,13 @@ init_delivery( callback_module := Mod } ) -> + BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer), #delivery{ name = Name, callback_module = Mod, container = mk_container(ContainerOpts), reader = Reader, - transfer = Mod:init_transfer_state(Buffer, Opts), + transfer = Mod:init_transfer_state(BufferMap, Opts), empty = true }. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl index 01b330646..f3936fd54 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl @@ -16,7 +16,8 @@ start_link/2, push_records/3, tick/2, - take_error/1 + take_error/1, + buffer_to_map/1 ]). -behaviour(gen_server). @@ -72,6 +73,15 @@ tick(Name, Timestamp) -> take_error(Name) -> gen_server:call(?SRVREF(Name), take_error). +buffer_to_map(#buffer{} = Buffer) -> + #{ + since => Buffer#buffer.since, + until => Buffer#buffer.until, + seq => Buffer#buffer.seq, + filename => Buffer#buffer.filename, + max_records => Buffer#buffer.max_records + }. + %% write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->