From af99829a211c2463cb2b1d8bfa1d454667ca55fd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 May 2024 11:53:20 -0300 Subject: [PATCH] refactor: move common templating logic to `emqx_connector_aggreg_buffer_ctx` --- ...qx_bridge_azure_blob_storage_connector.erl | 48 ++++++++----------- .../src/emqx_bridge_s3_connector.erl | 44 +++++++---------- .../src/emqx_connector_aggreg_buffer_ctx.erl | 42 ++++++++++++++++ .../src/emqx_connector_aggreg_delivery.erl | 5 +- 4 files changed, 82 insertions(+), 57 deletions(-) create mode 100644 apps/emqx_connector_aggregator/src/emqx_connector_aggreg_buffer_ctx.erl diff --git a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl index 8f4a04b80..95d88ab28 100644 --- a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl +++ b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl @@ -339,9 +339,9 @@ do_list_blobs(Worker, Container) -> %% `emqx_connector_aggreg_delivery' API %%------------------------------------------------------------------------------ --spec init_transfer_state(emqx_connector_aggreg_delivery:buffer_map(), transfer_opts()) -> +-spec init_transfer_state(buffer(), transfer_opts()) -> transfer_state(). -init_transfer_state(BufferMap, Opts) -> +init_transfer_state(Buffer, Opts) -> #{ upload_options := #{ action := ActionName, @@ -351,7 +351,7 @@ init_transfer_state(BufferMap, Opts) -> pool := Pool } } = Opts, - Blob = mk_blob_name_key(BufferMap, ActionName, BlobTemplate), + Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate), #{ blob => Blob, buffer => [], @@ -363,8 +363,8 @@ init_transfer_state(BufferMap, Opts) -> started => false }. -mk_blob_name_key(BufferMap, ActionName, BlobTemplate) -> - emqx_template:render_strict(BlobTemplate, {?MODULE, {ActionName, BufferMap}}). +mk_blob_name_key(Buffer, ActionName, BlobTemplate) -> + emqx_template:render_strict(BlobTemplate, {?MODULE, {ActionName, Buffer}}). -spec process_append(iodata(), transfer_state()) -> transfer_state(). @@ -455,34 +455,26 @@ process_complete(TransferState) -> %% `emqx_template' API %%------------------------------------------------------------------------------ --spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> +-spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> {ok, integer() | string()} | {error, undefined}. -lookup([<<"action">>], {ActionName, _BufferMap}) -> +lookup([<<"action">>], {ActionName, _Buffer}) -> {ok, mk_fs_safe_string(ActionName)}; -lookup(Accessor, {_ActionName, BufferMap = #{}}) -> - lookup_buffer_var(Accessor, BufferMap); +lookup([<<"node">>], {_ActionName, _Buffer}) -> + {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; +lookup(Accessor, {_ActionName, Buffer}) -> + lookup_buffer_var(Accessor, Buffer); lookup(_Accessor, _Context) -> {error, undefined}. -lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) -> - {ok, format_timestamp(Since, Format)}; -lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) -> - {ok, format_timestamp(Until, Format)}; -lookup_buffer_var([<<"sequence">>], #{seq := Seq}) -> - {ok, Seq}; -lookup_buffer_var([<<"node">>], #{}) -> - {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; -lookup_buffer_var(_Binding, _Context) -> - {error, undefined}. - -format_timestamp(Timestamp, <<"rfc3339utc">>) -> - String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]), - mk_fs_safe_string(String); -format_timestamp(Timestamp, <<"rfc3339">>) -> - String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]), - mk_fs_safe_string(String); -format_timestamp(Timestamp, <<"unix">>) -> - Timestamp. +lookup_buffer_var(Accessor, Buffer) -> + case emqx_connector_aggreg_buffer_ctx:lookup(Accessor, Buffer) of + {ok, String} when is_list(String) -> + {ok, mk_fs_safe_string(String)}; + {ok, Value} -> + {ok, Value}; + {error, Reason} -> + {error, Reason} + end. mk_fs_safe_string(String) -> unicode:characters_to_binary(string:replace(String, ":", "_", all)). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index bc9f37935..69c4609d8 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -407,8 +407,8 @@ iolist_to_string(IOList) -> %% `emqx_connector_aggreg_delivery` APIs --spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t(). -init_transfer_state(BufferMap, Opts) -> +-spec init_transfer_state(buffer(), map()) -> emqx_s3_upload:t(). +init_transfer_state(Buffer, Opts) -> #{ bucket := Bucket, upload_options := UploadOpts, @@ -416,11 +416,11 @@ init_transfer_state(BufferMap, Opts) -> uploader_config := UploaderConfig } = Opts, Client = emqx_s3_client:create(Bucket, Config), - Key = mk_object_key(BufferMap, Opts), + Key = mk_object_key(Buffer, Opts), emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). -mk_object_key(BufferMap, #{action := AggregId, key := Template}) -> - emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}). +mk_object_key(Buffer, #{action := AggregId, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {AggregId, Buffer}}). process_append(Writes, Upload0) -> {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), @@ -454,34 +454,26 @@ process_terminate(Upload) -> %% `emqx_template` APIs --spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> +-spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> {ok, integer() | string()} | {error, undefined}. lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) -> {ok, mk_fs_safe_string(Name)}; -lookup(Accessor, {_AggregId, Buffer = #{}}) -> +lookup([<<"node">>], {_AggregId, _Buffer}) -> + {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; +lookup(Accessor, {_AggregId, Buffer}) -> lookup_buffer_var(Accessor, Buffer); lookup(_Accessor, _Context) -> {error, undefined}. -lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) -> - {ok, format_timestamp(Since, Format)}; -lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) -> - {ok, format_timestamp(Until, Format)}; -lookup_buffer_var([<<"sequence">>], #{seq := Seq}) -> - {ok, Seq}; -lookup_buffer_var([<<"node">>], #{}) -> - {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; -lookup_buffer_var(_Binding, _Context) -> - {error, undefined}. - -format_timestamp(Timestamp, <<"rfc3339utc">>) -> - String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]), - mk_fs_safe_string(String); -format_timestamp(Timestamp, <<"rfc3339">>) -> - String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]), - mk_fs_safe_string(String); -format_timestamp(Timestamp, <<"unix">>) -> - Timestamp. +lookup_buffer_var(Accessor, Buffer) -> + case emqx_connector_aggreg_buffer_ctx:lookup(Accessor, Buffer) of + {ok, String} when is_list(String) -> + {ok, mk_fs_safe_string(String)}; + {ok, Value} -> + {ok, Value}; + {error, Reason} -> + {error, Reason} + end. mk_fs_safe_string(String) -> unicode:characters_to_binary(string:replace(String, ":", "_", all)). diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_buffer_ctx.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_buffer_ctx.erl new file mode 100644 index 000000000..ba5323444 --- /dev/null +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_buffer_ctx.erl @@ -0,0 +1,42 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_connector_aggreg_buffer_ctx). + +-behaviour(emqx_template). + +-include("emqx_connector_aggregator.hrl"). + +%% `emqx_template' API +-export([lookup/2]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% `emqx_template' API +%%------------------------------------------------------------------------------ + +-spec lookup(emqx_template:accessor(), buffer()) -> + {ok, integer() | string()} | {error, undefined}. +lookup([<<"datetime">>, Format], #buffer{since = Since}) -> + {ok, format_timestamp(Since, Format)}; +lookup([<<"datetime_until">>, Format], #buffer{until = Until}) -> + {ok, format_timestamp(Until, Format)}; +lookup([<<"sequence">>], #buffer{seq = Seq}) -> + {ok, Seq}; +lookup(_Binding, _Context) -> + {error, undefined}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +format_timestamp(Timestamp, <<"rfc3339utc">>) -> + calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]); +format_timestamp(Timestamp, <<"rfc3339">>) -> + calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]); +format_timestamp(Timestamp, <<"unix">>) -> + Timestamp. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl index e83d75b34..9e2a26bbd 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -48,7 +48,7 @@ %% @doc Initialize the transfer state, such as blob storage path, transfer options, client %% credentials, etc. . --callback init_transfer_state(buffer_map(), map()) -> transfer_state(). +-callback init_transfer_state(buffer(), map()) -> transfer_state(). %% @doc Append data to the transfer before sending. Usually should not fail. -callback process_append(iodata(), transfer_state()) -> transfer_state(). @@ -86,13 +86,12 @@ init_delivery( callback_module := Mod } ) -> - BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer), #delivery{ id = Id, callback_module = Mod, container = mk_container(ContainerOpts), reader = Reader, - transfer = Mod:init_transfer_state(BufferMap, Opts), + transfer = Mod:init_transfer_state(Buffer, Opts), empty = true }.