refactor: move common templating logic to `emqx_connector_aggreg_buffer_ctx`

This commit is contained in:
Thales Macedo Garitezi 2024-05-27 11:53:20 -03:00
parent c916c83c7c
commit af99829a21
4 changed files with 82 additions and 57 deletions

View File

@ -339,9 +339,9 @@ do_list_blobs(Worker, Container) ->
%% `emqx_connector_aggreg_delivery' API
%%------------------------------------------------------------------------------
-spec init_transfer_state(emqx_connector_aggreg_delivery:buffer_map(), transfer_opts()) ->
-spec init_transfer_state(buffer(), transfer_opts()) ->
transfer_state().
init_transfer_state(BufferMap, Opts) ->
init_transfer_state(Buffer, Opts) ->
#{
upload_options := #{
action := ActionName,
@ -351,7 +351,7 @@ init_transfer_state(BufferMap, Opts) ->
pool := Pool
}
} = Opts,
Blob = mk_blob_name_key(BufferMap, ActionName, BlobTemplate),
Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate),
#{
blob => Blob,
buffer => [],
@ -363,8 +363,8 @@ init_transfer_state(BufferMap, Opts) ->
started => false
}.
mk_blob_name_key(BufferMap, ActionName, BlobTemplate) ->
emqx_template:render_strict(BlobTemplate, {?MODULE, {ActionName, BufferMap}}).
mk_blob_name_key(Buffer, ActionName, BlobTemplate) ->
emqx_template:render_strict(BlobTemplate, {?MODULE, {ActionName, Buffer}}).
-spec process_append(iodata(), transfer_state()) ->
transfer_state().
@ -455,34 +455,26 @@ process_complete(TransferState) ->
%% `emqx_template' API
%%------------------------------------------------------------------------------
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {ActionName, _BufferMap}) ->
lookup([<<"action">>], {ActionName, _Buffer}) ->
{ok, mk_fs_safe_string(ActionName)};
lookup(Accessor, {_ActionName, BufferMap = #{}}) ->
lookup_buffer_var(Accessor, BufferMap);
lookup([<<"node">>], {_ActionName, _Buffer}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup(Accessor, {_ActionName, Buffer}) ->
lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) ->
{ok, format_timestamp(Since, Format)};
lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) ->
{ok, format_timestamp(Until, Format)};
lookup_buffer_var([<<"sequence">>], #{seq := Seq}) ->
{ok, Seq};
lookup_buffer_var([<<"node">>], #{}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup_buffer_var(_Binding, _Context) ->
{error, undefined}.
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"rfc3339">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.
lookup_buffer_var(Accessor, Buffer) ->
case emqx_connector_aggreg_buffer_ctx:lookup(Accessor, Buffer) of
{ok, String} when is_list(String) ->
{ok, mk_fs_safe_string(String)};
{ok, Value} ->
{ok, Value};
{error, Reason} ->
{error, Reason}
end.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).

View File

@ -407,8 +407,8 @@ iolist_to_string(IOList) ->
%% `emqx_connector_aggreg_delivery` APIs
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().
init_transfer_state(BufferMap, Opts) ->
-spec init_transfer_state(buffer(), map()) -> emqx_s3_upload:t().
init_transfer_state(Buffer, Opts) ->
#{
bucket := Bucket,
upload_options := UploadOpts,
@ -416,11 +416,11 @@ init_transfer_state(BufferMap, Opts) ->
uploader_config := UploaderConfig
} = Opts,
Client = emqx_s3_client:create(Bucket, Config),
Key = mk_object_key(BufferMap, Opts),
Key = mk_object_key(Buffer, Opts),
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
mk_object_key(BufferMap, #{action := AggregId, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}).
mk_object_key(Buffer, #{action := AggregId, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {AggregId, Buffer}}).
process_append(Writes, Upload0) ->
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
@ -454,34 +454,26 @@ process_terminate(Upload) ->
%% `emqx_template` APIs
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) ->
{ok, mk_fs_safe_string(Name)};
lookup(Accessor, {_AggregId, Buffer = #{}}) ->
lookup([<<"node">>], {_AggregId, _Buffer}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup(Accessor, {_AggregId, Buffer}) ->
lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) ->
{ok, format_timestamp(Since, Format)};
lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) ->
{ok, format_timestamp(Until, Format)};
lookup_buffer_var([<<"sequence">>], #{seq := Seq}) ->
{ok, Seq};
lookup_buffer_var([<<"node">>], #{}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup_buffer_var(_Binding, _Context) ->
{error, undefined}.
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"rfc3339">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.
lookup_buffer_var(Accessor, Buffer) ->
case emqx_connector_aggreg_buffer_ctx:lookup(Accessor, Buffer) of
{ok, String} when is_list(String) ->
{ok, mk_fs_safe_string(String)};
{ok, Value} ->
{ok, Value};
{error, Reason} ->
{error, Reason}
end.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_connector_aggreg_buffer_ctx).
-behaviour(emqx_template).
-include("emqx_connector_aggregator.hrl").
%% `emqx_template' API
-export([lookup/2]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_template' API
%%------------------------------------------------------------------------------
-spec lookup(emqx_template:accessor(), buffer()) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"datetime">>, Format], #buffer{since = Since}) ->
{ok, format_timestamp(Since, Format)};
lookup([<<"datetime_until">>, Format], #buffer{until = Until}) ->
{ok, format_timestamp(Until, Format)};
lookup([<<"sequence">>], #buffer{seq = Seq}) ->
{ok, Seq};
lookup(_Binding, _Context) ->
{error, undefined}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]);
format_timestamp(Timestamp, <<"rfc3339">>) ->
calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.

View File

@ -48,7 +48,7 @@
%% @doc Initialize the transfer state, such as blob storage path, transfer options, client
%% credentials, etc. .
-callback init_transfer_state(buffer_map(), map()) -> transfer_state().
-callback init_transfer_state(buffer(), map()) -> transfer_state().
%% @doc Append data to the transfer before sending. Usually should not fail.
-callback process_append(iodata(), transfer_state()) -> transfer_state().
@ -86,13 +86,12 @@ init_delivery(
callback_module := Mod
}
) ->
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
#delivery{
id = Id,
callback_module = Mod,
container = mk_container(ContainerOpts),
reader = Reader,
transfer = Mod:init_transfer_state(BufferMap, Opts),
transfer = Mod:init_transfer_state(Buffer, Opts),
empty = true
}.