refactor: avoid passing record down to template rendering function
This commit is contained in:
parent
157e2c2535
commit
32493e395c
|
@ -394,8 +394,8 @@ iolist_to_string(IOList) ->
|
||||||
|
|
||||||
%% `emqx_connector_aggreg_delivery` APIs
|
%% `emqx_connector_aggreg_delivery` APIs
|
||||||
|
|
||||||
-spec init_transfer_state(buffer(), map()) -> emqx_s3_upload:t().
|
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().
|
||||||
init_transfer_state(Buffer, Opts) ->
|
init_transfer_state(BufferMap, Opts) ->
|
||||||
#{
|
#{
|
||||||
bucket := Bucket,
|
bucket := Bucket,
|
||||||
upload_options := UploadOpts,
|
upload_options := UploadOpts,
|
||||||
|
@ -403,11 +403,11 @@ init_transfer_state(Buffer, Opts) ->
|
||||||
uploader_config := UploaderConfig
|
uploader_config := UploaderConfig
|
||||||
} = Opts,
|
} = Opts,
|
||||||
Client = emqx_s3_client:create(Bucket, Config),
|
Client = emqx_s3_client:create(Bucket, Config),
|
||||||
Key = mk_object_key(Buffer, Opts),
|
Key = mk_object_key(BufferMap, Opts),
|
||||||
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
|
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
|
||||||
|
|
||||||
mk_object_key(Buffer, #{action := Name, key := Template}) ->
|
mk_object_key(BufferMap, #{action := Name, key := Template}) ->
|
||||||
emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
|
emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}).
|
||||||
|
|
||||||
process_append(Writes, Upload0) ->
|
process_append(Writes, Upload0) ->
|
||||||
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
|
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
|
||||||
|
@ -441,22 +441,22 @@ process_terminate(Upload) ->
|
||||||
|
|
||||||
%% `emqx_template` APIs
|
%% `emqx_template` APIs
|
||||||
|
|
||||||
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
|
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
|
||||||
{ok, integer() | string()} | {error, undefined}.
|
{ok, integer() | string()} | {error, undefined}.
|
||||||
lookup([<<"action">>], {Name, _Buffer}) ->
|
lookup([<<"action">>], {Name, _Buffer}) ->
|
||||||
{ok, mk_fs_safe_string(Name)};
|
{ok, mk_fs_safe_string(Name)};
|
||||||
lookup(Accessor, {_Name, Buffer = #buffer{}}) ->
|
lookup(Accessor, {_Name, Buffer = #{}}) ->
|
||||||
lookup_buffer_var(Accessor, Buffer);
|
lookup_buffer_var(Accessor, Buffer);
|
||||||
lookup(_Accessor, _Context) ->
|
lookup(_Accessor, _Context) ->
|
||||||
{error, undefined}.
|
{error, undefined}.
|
||||||
|
|
||||||
lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) ->
|
lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) ->
|
||||||
{ok, format_timestamp(Since, Format)};
|
{ok, format_timestamp(Since, Format)};
|
||||||
lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) ->
|
lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) ->
|
||||||
{ok, format_timestamp(Until, Format)};
|
{ok, format_timestamp(Until, Format)};
|
||||||
lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) ->
|
lookup_buffer_var([<<"sequence">>], #{seq := Seq}) ->
|
||||||
{ok, Seq};
|
{ok, Seq};
|
||||||
lookup_buffer_var([<<"node">>], #buffer{}) ->
|
lookup_buffer_var([<<"node">>], #{}) ->
|
||||||
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
|
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
|
||||||
lookup_buffer_var(_Binding, _Context) ->
|
lookup_buffer_var(_Binding, _Context) ->
|
||||||
{error, undefined}.
|
{error, undefined}.
|
||||||
|
|
|
@ -13,3 +13,11 @@
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type buffer() :: #buffer{}.
|
-type buffer() :: #buffer{}.
|
||||||
|
|
||||||
|
-type buffer_map() :: #{
|
||||||
|
since := emqx_connector_aggregator:timestamp(),
|
||||||
|
until := emqx_connector_aggregator:timestamp(),
|
||||||
|
seq := non_neg_integer(),
|
||||||
|
filename := file:filename(),
|
||||||
|
max_records := pos_integer() | undefined
|
||||||
|
}.
|
||||||
|
|
|
@ -42,7 +42,7 @@
|
||||||
|
|
||||||
-type transfer_state() :: term().
|
-type transfer_state() :: term().
|
||||||
|
|
||||||
-callback init_transfer_state(buffer(), map()) -> transfer_state().
|
-callback init_transfer_state(buffer_map(), map()) -> transfer_state().
|
||||||
|
|
||||||
-callback process_append(iodata(), transfer_state()) -> transfer_state().
|
-callback process_append(iodata(), transfer_state()) -> transfer_state().
|
||||||
|
|
||||||
|
@ -75,12 +75,13 @@ init_delivery(
|
||||||
callback_module := Mod
|
callback_module := Mod
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
|
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
|
||||||
#delivery{
|
#delivery{
|
||||||
name = Name,
|
name = Name,
|
||||||
callback_module = Mod,
|
callback_module = Mod,
|
||||||
container = mk_container(ContainerOpts),
|
container = mk_container(ContainerOpts),
|
||||||
reader = Reader,
|
reader = Reader,
|
||||||
transfer = Mod:init_transfer_state(Buffer, Opts),
|
transfer = Mod:init_transfer_state(BufferMap, Opts),
|
||||||
empty = true
|
empty = true
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
start_link/2,
|
start_link/2,
|
||||||
push_records/3,
|
push_records/3,
|
||||||
tick/2,
|
tick/2,
|
||||||
take_error/1
|
take_error/1,
|
||||||
|
buffer_to_map/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
@ -72,6 +73,15 @@ tick(Name, Timestamp) ->
|
||||||
take_error(Name) ->
|
take_error(Name) ->
|
||||||
gen_server:call(?SRVREF(Name), take_error).
|
gen_server:call(?SRVREF(Name), take_error).
|
||||||
|
|
||||||
|
buffer_to_map(#buffer{} = Buffer) ->
|
||||||
|
#{
|
||||||
|
since => Buffer#buffer.since,
|
||||||
|
until => Buffer#buffer.until,
|
||||||
|
seq => Buffer#buffer.seq,
|
||||||
|
filename => Buffer#buffer.filename,
|
||||||
|
max_records => Buffer#buffer.max_records
|
||||||
|
}.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->
|
write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->
|
||||||
|
|
Loading…
Reference in New Issue