refactor: extract blob aggregator logic to separate application

This will allow other bridges to share the same aggregation logic as S3 aggregated action.
This commit is contained in:
Thales Macedo Garitezi 2024-05-03 14:35:58 -03:00
parent 004dc80fb2
commit d61f4078e2
25 changed files with 530 additions and 306 deletions

View File

@ -2,5 +2,6 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{emqx_resource, {path, "../../apps/emqx_resource"}} {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}}
]}. ]}.

View File

@ -7,6 +7,7 @@
stdlib, stdlib,
erlcloud, erlcloud,
emqx_resource, emqx_resource,
emqx_connector_aggregator,
emqx_s3 emqx_s3
]}, ]},
{env, [ {env, [
@ -18,7 +19,6 @@
emqx_bridge_s3_connector_info emqx_bridge_s3_connector_info
]} ]}
]}, ]},
{mod, {emqx_bridge_s3_app, []}},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -1,212 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% This module takes aggregated records from a buffer and delivers them to S3,
%% wrapped in a configurable container (though currently there's only CSV).
-module(emqx_bridge_s3_aggreg_delivery).
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_bridge_s3_aggregator.hrl").
-export([start_link/3]).
%% Internal exports
-export([
init/4,
loop/3
]).
-behaviour(emqx_template).
-export([lookup/2]).
%% Sys
-export([
system_continue/3,
system_terminate/4,
format_status/2
]).
-record(delivery, {
name :: _Name,
container :: emqx_bridge_s3_aggreg_csv:container(),
reader :: emqx_bridge_s3_aggreg_buffer:reader(),
upload :: emqx_s3_upload:t(),
empty :: boolean()
}).
-type state() :: #delivery{}.
%%
start_link(Name, Buffer, Opts) ->
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
%%
-spec init(pid(), _Name, buffer(), _Opts :: map()) -> no_return().
init(Parent, Name, Buffer, Opts) ->
?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
Reader = open_buffer(Buffer),
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
_ = erlang:process_flag(trap_exit, true),
ok = proc_lib:init_ack({ok, self()}),
loop(Delivery, Parent, []).
init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) ->
#delivery{
name = Name,
container = mk_container(ContainerOpts),
reader = Reader,
upload = mk_upload(Buffer, Opts),
empty = true
}.
open_buffer(#buffer{filename = Filename}) ->
case file:open(Filename, [read, binary, raw]) of
{ok, FD} ->
{_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD),
Reader;
{error, Reason} ->
error({buffer_open_failed, Reason})
end.
mk_container(#{type := csv, column_order := OrderOpt}) ->
%% TODO: Deduplicate?
ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}).
mk_upload(
Buffer,
Opts = #{
bucket := Bucket,
upload_options := UploadOpts,
client_config := Config,
uploader_config := UploaderConfig
}
) ->
Client = emqx_s3_client:create(Bucket, Config),
Key = mk_object_key(Buffer, Opts),
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
mk_object_key(Buffer, #{action := Name, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
%%
-spec loop(state(), pid(), [sys:debug_option()]) -> no_return().
loop(Delivery, Parent, Debug) ->
%% NOTE: This function is mocked in tests.
receive
Msg -> handle_msg(Msg, Delivery, Parent, Debug)
after 0 ->
process_delivery(Delivery, Parent, Debug)
end.
process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) ->
case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
{Records = [#{} | _], Reader} ->
Delivery1 = Delivery0#delivery{reader = Reader},
Delivery2 = process_append_records(Records, Delivery1),
Delivery = process_write(Delivery2),
loop(Delivery, Parent, Debug);
{[], Reader} ->
Delivery = Delivery0#delivery{reader = Reader},
loop(Delivery, Parent, Debug);
eof ->
process_complete(Delivery0);
{Unexpected, _Reader} ->
exit({buffer_unexpected_record, Unexpected})
end.
process_append_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) ->
{Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0),
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
Delivery#delivery{
container = Container,
upload = Upload,
empty = false
}.
process_write(Delivery = #delivery{upload = Upload0}) ->
case emqx_s3_upload:write(Upload0) of
{ok, Upload} ->
Delivery#delivery{upload = Upload};
{cont, Upload} ->
process_write(Delivery#delivery{upload = Upload});
{error, Reason} ->
_ = emqx_s3_upload:abort(Upload0),
exit({upload_failed, Reason})
end.
process_complete(#delivery{name = Name, empty = true}) ->
?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}),
exit({shutdown, {skipped, empty}});
process_complete(#delivery{name = Name, container = Container, upload = Upload0}) ->
Trailer = emqx_bridge_s3_aggreg_csv:close(Container),
{ok, Upload} = emqx_s3_upload:append(Trailer, Upload0),
case emqx_s3_upload:complete(Upload) of
{ok, Completed} ->
?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}),
ok;
{error, Reason} ->
_ = emqx_s3_upload:abort(Upload),
exit({upload_failed, Reason})
end.
%%
handle_msg({system, From, Msg}, Delivery, Parent, Debug) ->
sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery);
handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) ->
system_terminate(Reason, Parent, Debug, Delivery);
handle_msg(_Msg, Delivery, Parent, Debug) ->
loop(Parent, Debug, Delivery).
-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return().
system_continue(Parent, Debug, Delivery) ->
loop(Delivery, Parent, Debug).
-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
system_terminate(_Reason, _Parent, _Debug, #delivery{upload = Upload}) ->
emqx_s3_upload:abort(Upload).
-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
Delivery#delivery{
upload = emqx_s3_upload:format(Delivery#delivery.upload)
}.
%%
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {Name, _Buffer}) ->
{ok, mk_fs_safe_string(Name)};
lookup(Accessor, {_Name, Buffer = #buffer{}}) ->
lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) ->
{ok, format_timestamp(Since, Format)};
lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) ->
{ok, format_timestamp(Until, Format)};
lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) ->
{ok, Seq};
lookup_buffer_var([<<"node">>], #buffer{}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup_buffer_var(_Binding, _Context) ->
{error, undefined}.
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"rfc3339">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).

View File

@ -1,16 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_app).
-behaviour(application).
-export([start/2, stop/1]).
%%
start(_StartType, _StartArgs) ->
emqx_bridge_s3_sup:start_link().
stop(_State) ->
ok.

View File

@ -8,6 +8,7 @@
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("emqx/include/emqx_trace.hrl").
-include_lib("emqx_connector_aggregator/include/emqx_connector_aggregator.hrl").
-include("emqx_bridge_s3.hrl"). -include("emqx_bridge_s3.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -24,6 +25,19 @@
on_get_channel_status/3 on_get_channel_status/3
]). ]).
-behaviour(emqx_connector_aggreg_delivery).
-export([
init_upload_state/2,
process_append/2,
process_write/1,
process_complete/1,
process_format_status/1,
process_terminate/1
]).
-behaviour(emqx_template).
-export([lookup/2]).
-type config() :: #{ -type config() :: #{
access_key_id => string(), access_key_id => string(),
secret_access_key => emqx_secret:t(string()), secret_access_key => emqx_secret:t(string()),
@ -205,13 +219,14 @@ start_channel(State, #{
key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters), key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters),
container => Container, container => Container,
upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters), upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters),
callback_module => ?MODULE,
client_config => maps:get(client_config, State), client_config => maps:get(client_config, State),
uploader_config => maps:with([min_part_size, max_part_size], Parameters) uploader_config => maps:with([min_part_size, max_part_size], Parameters)
}, },
_ = emqx_bridge_s3_sup:delete_child({Type, Name}), _ = emqx_connector_aggreg_sup:delete_child({Type, Name}),
{ok, SupPid} = emqx_bridge_s3_sup:start_child(#{ {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
id => {Type, Name}, id => {Type, Name},
start => {emqx_bridge_s3_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]},
type => supervisor, type => supervisor,
restart => permanent restart => permanent
}), }),
@ -220,7 +235,7 @@ start_channel(State, #{
name => Name, name => Name,
bucket => Bucket, bucket => Bucket,
supervisor => SupPid, supervisor => SupPid,
on_stop => fun() -> emqx_bridge_s3_sup:delete_child({Type, Name}) end on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end
}. }.
upload_options(Parameters) -> upload_options(Parameters) ->
@ -242,7 +257,7 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) ->
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second), Timestamp = erlang:system_time(second),
ok = emqx_bridge_s3_aggregator:tick(Name, Timestamp), ok = emqx_connector_aggregator:tick(Name, Timestamp),
ok = check_bucket_accessible(Bucket, State), ok = check_bucket_accessible(Bucket, State),
ok = check_aggreg_upload_errors(Name), ok = check_aggreg_upload_errors(Name),
?status_connected. ?status_connected.
@ -264,7 +279,7 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
end. end.
check_aggreg_upload_errors(Name) -> check_aggreg_upload_errors(Name) ->
case emqx_bridge_s3_aggregator:take_error(Name) of case emqx_connector_aggregator:take_error(Name) of
[Error] -> [Error] ->
%% TODO %% TODO
%% This approach means that, for example, 3 upload failures will cause %% This approach means that, for example, 3 upload failures will cause
@ -340,7 +355,7 @@ run_simple_upload(
run_aggregated_upload(InstId, Records, #{name := Name}) -> run_aggregated_upload(InstId, Records, #{name := Name}) ->
Timestamp = erlang:system_time(second), Timestamp = erlang:system_time(second),
case emqx_bridge_s3_aggregator:push_records(Name, Timestamp, Records) of case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of
ok -> ok ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}),
ok; ok;
@ -376,3 +391,83 @@ render_content(Template, Data) ->
iolist_to_string(IOList) -> iolist_to_string(IOList) ->
unicode:characters_to_list(IOList). unicode:characters_to_list(IOList).
%% `emqx_connector_aggreg_delivery` APIs
-spec init_upload_state(buffer(), map()) -> emqx_s3_upload:t().
init_upload_state(Buffer, Opts) ->
#{
bucket := Bucket,
upload_options := UploadOpts,
client_config := Config,
uploader_config := UploaderConfig
} = Opts,
Client = emqx_s3_client:create(Bucket, Config),
Key = mk_object_key(Buffer, Opts),
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
mk_object_key(Buffer, #{action := Name, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
process_append(Writes, Upload) ->
emqx_s3_upload:append(Writes, Upload).
process_write(Upload0) ->
case emqx_s3_upload:write(Upload0) of
{ok, Upload} ->
Upload;
{cont, Upload} ->
process_write(Upload);
{error, Reason} ->
_ = emqx_s3_upload:abort(Upload0),
exit({upload_failed, Reason})
end.
process_complete(Upload) ->
case emqx_s3_upload:complete(Upload) of
{ok, Completed} ->
{ok, Completed};
{error, Reason} ->
_ = emqx_s3_upload:abort(Upload),
exit({upload_failed, Reason})
end.
process_format_status(Upload) ->
emqx_s3_upload:format(Upload).
process_terminate(Upload) ->
emqx_s3_upload:abort(Upload).
%% `emqx_template` APIs
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {Name, _Buffer}) ->
{ok, mk_fs_safe_string(Name)};
lookup(Accessor, {_Name, Buffer = #buffer{}}) ->
lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) ->
{ok, format_timestamp(Since, Format)};
lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) ->
{ok, format_timestamp(Until, Format)};
lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) ->
{ok, Seq};
lookup_buffer_var([<<"node">>], #buffer{}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup_buffer_var(_Binding, _Context) ->
{error, undefined}.
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"rfc3339">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).

View File

@ -170,7 +170,7 @@ t_aggreg_upload(Config) ->
]), ]),
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
%% Check the uploaded objects. %% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
?assertMatch( ?assertMatch(
@ -217,7 +217,7 @@ t_aggreg_upload_rule(Config) ->
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
]), ]),
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
%% Check the uploaded objects. %% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -258,15 +258,15 @@ t_aggreg_upload_restart(Config) ->
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
]), ]),
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
%% Restart the bridge. %% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Send some more messages. %% Send some more messages.
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
%% Check there's still only one upload. %% Check there's still only one upload.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
@ -300,18 +300,18 @@ t_aggreg_upload_restart_corrupted(Config) ->
%% Ensure that they span multiple batch queries. %% Ensure that they span multiple batch queries.
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
{ok, _} = ?block_until( {ok, _} = ?block_until(
#{?snk_kind := s3_aggreg_records_written, action := BridgeName}, #{?snk_kind := connector_aggreg_records_written, action := BridgeName},
infinity, infinity,
0 0
), ),
%% Find out the buffer file. %% Find out the buffer file.
{ok, #{filename := Filename}} = ?block_until( {ok, #{filename := Filename}} = ?block_until(
#{?snk_kind := s3_aggreg_buffer_allocated, action := BridgeName} #{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName}
), ),
%% Stop the bridge, corrupt the buffer file, and restart the bridge. %% Stop the bridge, corrupt the buffer file, and restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
BufferFileSize = filelib:file_size(Filename), BufferFileSize = filelib:file_size(Filename),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, BufferFileSize div 2), ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, BufferFileSize div 2),
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Send some more messages. %% Send some more messages.
Messages2 = [ Messages2 = [
@ -320,7 +320,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
], ],
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
%% Check that upload contains part of the first batch and all of the second batch. %% Check that upload contains part of the first batch and all of the second batch.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -362,7 +362,7 @@ t_aggreg_pending_upload_restart(Config) ->
%% Restart the bridge. %% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
%% Check that delivery contains all the messages. %% Check that delivery contains all the messages.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
[_Header | Rows] = fetch_parse_csv(Bucket, Key), [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -392,7 +392,9 @@ t_aggreg_next_rotate(Config) ->
NSent = receive_sender_reports(Senders), NSent = receive_sender_reports(Senders),
%% Wait for the last delivery to complete. %% Wait for the last delivery to complete.
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}, infinity, 0), ?block_until(
#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0
),
%% There should be at least 2 time windows of aggregated records. %% There should be at least 2 time windows of aggregated records.
Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]], DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]],

View File

@ -48,11 +48,3 @@ list_pending_uploads(Bucket, Key) ->
{ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig), {ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig),
Uploads = proplists:get_value(uploads, Props), Uploads = proplists:get_value(uploads, Props),
lists:map(fun maps:from_list/1, Uploads). lists:map(fun maps:from_list/1, Uploads).
%% File utilities
truncate_at(Filename, Pos) ->
{ok, FD} = file:open(Filename, [read, write, binary]),
{ok, Pos} = file:position(FD, Pos),
ok = file:truncate(FD),
ok = file:close(FD).

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2028-01-26
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,11 @@
# EMQX Connector Aggregator
This application provides common logic for connector and action implementations of EMQX to aggregate multiple incoming messsages into a container file before sending it to a blob storage backend.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -3,8 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-record(buffer, { -record(buffer, {
since :: emqx_bridge_s3_aggregator:timestamp(), since :: emqx_connector_aggregator:timestamp(),
until :: emqx_bridge_s3_aggregator:timestamp(), until :: emqx_connector_aggregator:timestamp(),
seq :: non_neg_integer(), seq :: non_neg_integer(),
filename :: file:filename(), filename :: file:filename(),
fd :: file:io_device() | undefined, fd :: file:io_device() | undefined,

View File

@ -0,0 +1,7 @@
%% -*- mode: erlang; -*-
{deps, [
{emqx, {path, "../../apps/emqx"}}
]}.
{project_plugins, [erlfmt]}.

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_connector_aggreg_app).
-behaviour(application).
-export([start/2, stop/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `application` API
%%------------------------------------------------------------------------------
start(_StartType, _StartArgs) ->
emqx_connector_aggreg_sup:start_link().
stop(_State) ->
ok.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -17,7 +17,7 @@
%% ... %% ...
%% ``` %% ```
%% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`). %% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`).
-module(emqx_bridge_s3_aggreg_buffer). -module(emqx_connector_aggreg_buffer).
-export([ -export([
new_writer/2, new_writer/2,

View File

@ -2,8 +2,8 @@
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% CSV container implementation for `emqx_bridge_s3_aggregator`. %% CSV container implementation for `emqx_connector_aggregator`.
-module(emqx_bridge_s3_aggreg_csv). -module(emqx_connector_aggreg_csv).
%% Container API %% Container API
-export([ -export([
@ -33,7 +33,7 @@
column_order => [column()] column_order => [column()]
}. }.
-type record() :: emqx_bridge_s3_aggregator:record(). -type record() :: emqx_connector_aggregator:record().
-type column() :: binary(). -type column() :: binary().
%% %%

View File

@ -0,0 +1,183 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% This module takes aggregated records from a buffer and delivers them to a blob storage
%% backend, wrapped in a configurable container (though currently there's only CSV).
-module(emqx_connector_aggreg_delivery).
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_connector_aggregator.hrl").
-export([start_link/3]).
%% Internal exports
-export([
init/4,
loop/3
]).
%% Sys
-export([
system_continue/3,
system_terminate/4,
format_status/2
]).
-record(delivery, {
name :: _Name,
callback_module :: module(),
container :: emqx_connector_aggreg_csv:container(),
reader :: emqx_connector_aggreg_buffer:reader(),
upload :: impl_specific_upload_state(),
empty :: boolean()
}).
-type state() :: #delivery{}.
-type init_opts() :: #{
callback_module := module(),
any() => term()
}.
-type impl_specific_upload_state() :: term().
-callback init_upload_state(buffer(), map()) -> impl_specific_upload_state().
-callback process_append(iodata(), impl_specific_upload_state()) ->
{ok, impl_specific_upload_state()}.
-callback process_write(impl_specific_upload_state()) -> impl_specific_upload_state().
-callback process_complete(impl_specific_upload_state()) -> {ok, term()}.
%%
start_link(Name, Buffer, Opts) ->
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
%%
-spec init(pid(), _Name, buffer(), init_opts()) -> no_return().
init(Parent, Name, Buffer, Opts) ->
?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
Reader = open_buffer(Buffer),
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
_ = erlang:process_flag(trap_exit, true),
ok = proc_lib:init_ack({ok, self()}),
loop(Delivery, Parent, []).
init_delivery(
Name,
Reader,
Buffer,
Opts = #{
container := ContainerOpts,
callback_module := Mod
}
) ->
#delivery{
name = Name,
callback_module = Mod,
container = mk_container(ContainerOpts),
reader = Reader,
upload = Mod:init_upload_state(Buffer, Opts),
empty = true
}.
open_buffer(#buffer{filename = Filename}) ->
case file:open(Filename, [read, binary, raw]) of
{ok, FD} ->
{_Meta, Reader} = emqx_connector_aggreg_buffer:new_reader(FD),
Reader;
{error, Reason} ->
error({buffer_open_failed, Reason})
end.
mk_container(#{type := csv, column_order := OrderOpt}) ->
%% TODO: Deduplicate?
ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
emqx_connector_aggreg_csv:new(#{column_order => ColumnOrder}).
%%
-spec loop(state(), pid(), [sys:debug_option()]) -> no_return().
loop(Delivery, Parent, Debug) ->
%% NOTE: This function is mocked in tests.
receive
Msg -> handle_msg(Msg, Delivery, Parent, Debug)
after 0 ->
process_delivery(Delivery, Parent, Debug)
end.
process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) ->
case emqx_connector_aggreg_buffer:read(Reader0) of
{Records = [#{} | _], Reader} ->
Delivery1 = Delivery0#delivery{reader = Reader},
Delivery2 = process_append_records(Records, Delivery1),
Delivery = process_write(Delivery2),
loop(Delivery, Parent, Debug);
{[], Reader} ->
Delivery = Delivery0#delivery{reader = Reader},
loop(Delivery, Parent, Debug);
eof ->
process_complete(Delivery0);
{Unexpected, _Reader} ->
exit({buffer_unexpected_record, Unexpected})
end.
process_append_records(
Records,
Delivery = #delivery{
callback_module = Mod,
container = Container0,
upload = Upload0
}
) ->
{Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0),
{ok, Upload} = Mod:process_append(Writes, Upload0),
Delivery#delivery{
container = Container,
upload = Upload,
empty = false
}.
process_write(Delivery = #delivery{callback_module = Mod, upload = Upload0}) ->
Upload = Mod:process_write(Upload0),
Delivery#delivery{upload = Upload}.
process_complete(#delivery{name = Name, empty = true}) ->
?tp(connector_aggreg_delivery_completed, #{action => Name, upload => empty}),
exit({shutdown, {skipped, empty}});
process_complete(#delivery{
name = Name, callback_module = Mod, container = Container, upload = Upload0
}) ->
Trailer = emqx_connector_aggreg_csv:close(Container),
{ok, Upload} = Mod:process_append(Trailer, Upload0),
{ok, Completed} = Mod:process_complete(Upload),
?tp(connector_aggreg_delivery_completed, #{action => Name, upload => Completed}),
ok.
%%
handle_msg({system, From, Msg}, Delivery, Parent, Debug) ->
sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery);
handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) ->
system_terminate(Reason, Parent, Debug, Delivery);
handle_msg(_Msg, Delivery, Parent, Debug) ->
loop(Parent, Debug, Delivery).
-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return().
system_continue(Parent, Debug, Delivery) ->
loop(Delivery, Parent, Debug).
-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
system_terminate(_Reason, _Parent, _Debug, #delivery{callback_module = Mod, upload = Upload}) ->
Mod:process_terminate(Upload).
-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
#delivery{callback_module = Mod} = Delivery,
Delivery#delivery{
upload = Mod:process_format_status(Delivery#delivery.upload)
}.

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_s3_sup). -module(emqx_connector_aggreg_sup).
-export([ -export([
start_link/0, start_link/0,

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_upload_sup). -module(emqx_connector_aggreg_upload_sup).
-export([ -export([
start_link/3, start_link/3,
@ -33,7 +33,7 @@ start_delivery(Name, Buffer) ->
supervisor:start_child(?SUPREF(Name), [Buffer]). supervisor:start_child(?SUPREF(Name), [Buffer]).
start_delivery_proc(Name, DeliveryOpts, Buffer) -> start_delivery_proc(Name, DeliveryOpts, Buffer) ->
emqx_bridge_s3_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts). emqx_connector_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts).
%% %%
@ -45,7 +45,7 @@ init({root, Name, AggregOpts, DeliveryOpts}) ->
}, },
AggregatorChildSpec = #{ AggregatorChildSpec = #{
id => aggregator, id => aggregator,
start => {emqx_bridge_s3_aggregator, start_link, [Name, AggregOpts]}, start => {emqx_connector_aggregator, start_link, [Name, AggregOpts]},
type => worker, type => worker,
restart => permanent restart => permanent
}, },

View File

@ -0,0 +1,14 @@
{application, emqx_connector_aggregator, [
{description, "EMQX Enterprise Connector Data Aggregator"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx_resource
]},
{env, []},
{mod, {emqx_connector_aggreg_app, []}},
{modules, []},
{links, []}
]}.

View File

@ -5,12 +5,12 @@
%% This module manages buffers for aggregating records and offloads them %% This module manages buffers for aggregating records and offloads them
%% to separate "delivery" processes when they are full or time interval %% to separate "delivery" processes when they are full or time interval
%% is over. %% is over.
-module(emqx_bridge_s3_aggregator). -module(emqx_connector_aggregator).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_bridge_s3_aggregator.hrl"). -include("emqx_connector_aggregator.hrl").
-export([ -export([
start_link/2, start_link/2,
@ -90,9 +90,9 @@ write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records)
end. end.
write_records(Name, Buffer = #buffer{fd = Writer}, Records) -> write_records(Name, Buffer = #buffer{fd = Writer}, Records) ->
case emqx_bridge_s3_aggreg_buffer:write(Records, Writer) of case emqx_connector_aggreg_buffer:write(Records, Writer) of
ok -> ok ->
?tp(s3_aggreg_records_written, #{action => Name, records => Records}), ?tp(connector_aggreg_records_written, #{action => Name, records => Records}),
ok; ok;
{error, terminated} -> {error, terminated} ->
BufferNext = rotate_buffer(Name, Buffer), BufferNext = rotate_buffer(Name, Buffer),
@ -250,9 +250,9 @@ compute_since(Timestamp, PrevSince, Interval) ->
allocate_buffer(Since, Seq, St = #st{name = Name}) -> allocate_buffer(Since, Seq, St = #st{name = Name}) ->
Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St), Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St),
{ok, FD} = file:open(Filename, [write, binary]), {ok, FD} = file:open(Filename, [write, binary]),
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(FD, _Meta = []), Writer = emqx_connector_aggreg_buffer:new_writer(FD, _Meta = []),
_ = add_counter(Counter), _ = add_counter(Counter),
?tp(s3_aggreg_buffer_allocated, #{action => Name, filename => Filename}), ?tp(connector_aggreg_buffer_allocated, #{action => Name, filename => Filename}),
Buffer#buffer{fd = Writer}. Buffer#buffer{fd = Writer}.
recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) -> recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
@ -274,7 +274,7 @@ recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
end. end.
recover_buffer_writer(FD, Filename) -> recover_buffer_writer(FD, Filename) ->
try emqx_bridge_s3_aggreg_buffer:new_reader(FD) of try emqx_connector_aggreg_buffer:new_reader(FD) of
{_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0) {_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0)
catch catch
error:Reason -> error:Reason ->
@ -282,7 +282,7 @@ recover_buffer_writer(FD, Filename) ->
end. end.
recover_buffer_writer(FD, Filename, Reader0, NWritten) -> recover_buffer_writer(FD, Filename, Reader0, NWritten) ->
try emqx_bridge_s3_aggreg_buffer:read(Reader0) of try emqx_connector_aggreg_buffer:read(Reader0) of
{Records, Reader} when is_list(Records) -> {Records, Reader} when is_list(Records) ->
recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records)); recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records));
{Unexpected, _Reader} -> {Unexpected, _Reader} ->
@ -303,7 +303,7 @@ recover_buffer_writer(FD, Filename, Reader0, NWritten) ->
"Buffer is truncated or corrupted somewhere in the middle. " "Buffer is truncated or corrupted somewhere in the middle. "
"Corrupted records will be discarded." "Corrupted records will be discarded."
}), }),
Writer = emqx_bridge_s3_aggreg_buffer:takeover(Reader0), Writer = emqx_connector_aggreg_buffer:takeover(Reader0),
{ok, Writer, NWritten} {ok, Writer, NWritten}
end. end.
@ -362,7 +362,7 @@ lookup_current_buffer(Name) ->
%% %%
enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) -> enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
{ok, Pid} = emqx_bridge_s3_aggreg_upload_sup:start_delivery(Name, Buffer), {ok, Pid} = emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer),
MRef = erlang:monitor(process, Pid), MRef = erlang:monitor(process, Pid),
St#st{deliveries = Ds#{MRef => Buffer}}. St#st{deliveries = Ds#{MRef => Buffer}}.

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_buffer_SUITE). -module(emqx_connector_aggreg_buffer_SUITE).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
@ -29,7 +29,7 @@ t_write_read_cycle(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config), Filename = mk_filename(?FUNCTION_NAME, Config),
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
{ok, WFD} = file:open(Filename, [write, binary]), {ok, WFD} = file:open(Filename, [write, binary]),
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), Writer = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata),
Terms = [ Terms = [
[], [],
[[[[[[[[]]]]]]]], [[[[[[[[]]]]]]]],
@ -43,12 +43,12 @@ t_write_read_cycle(Config) ->
{<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})} {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}
], ],
ok = lists:foreach( ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer)) end,
Terms Terms
), ),
ok = file:close(WFD), ok = file:close(WFD),
{ok, RFD} = file:open(Filename, [read, binary, raw]), {ok, RFD} = file:open(Filename, [read, binary, raw]),
{MetadataRead, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), {MetadataRead, Reader} = emqx_connector_aggreg_buffer:new_reader(RFD),
?assertEqual(Metadata, MetadataRead), ?assertEqual(Metadata, MetadataRead),
TermsRead = read_until_eof(Reader), TermsRead = read_until_eof(Reader),
?assertEqual(Terms, TermsRead). ?assertEqual(Terms, TermsRead).
@ -60,7 +60,7 @@ t_read_empty(Config) ->
{ok, RFD} = file:open(Filename, [read, binary]), {ok, RFD} = file:open(Filename, [read, binary]),
?assertError( ?assertError(
{buffer_incomplete, header}, {buffer_incomplete, header},
emqx_bridge_s3_aggreg_buffer:new_reader(RFD) emqx_connector_aggreg_buffer:new_reader(RFD)
). ).
t_read_garbage(Config) -> t_read_garbage(Config) ->
@ -71,14 +71,14 @@ t_read_garbage(Config) ->
{ok, RFD} = file:open(Filename, [read, binary]), {ok, RFD} = file:open(Filename, [read, binary]),
?assertError( ?assertError(
badarg, badarg,
emqx_bridge_s3_aggreg_buffer:new_reader(RFD) emqx_connector_aggreg_buffer:new_reader(RFD)
). ).
t_read_truncated(Config) -> t_read_truncated(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config), Filename = mk_filename(?FUNCTION_NAME, Config),
{ok, WFD} = file:open(Filename, [write, binary]), {ok, WFD} = file:open(Filename, [write, binary]),
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), Writer = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata),
Terms = [ Terms = [
[[[[[[[[[[[]]]]]]]]]]], [[[[[[[[[[[]]]]]]]]]]],
lists:seq(1, 100000), lists:seq(1, 100000),
@ -88,36 +88,36 @@ t_read_truncated(Config) ->
LastTerm = LastTerm =
{<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}, {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})},
ok = lists:foreach( ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer)) end,
Terms Terms
), ),
{ok, WPos} = file:position(WFD, cur), {ok, WPos} = file:position(WFD, cur),
?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(LastTerm, Writer)), ?assertEqual(ok, emqx_connector_aggreg_buffer:write(LastTerm, Writer)),
ok = file:close(WFD), ok = file:close(WFD),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos + 1), ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos + 1),
{ok, RFD1} = file:open(Filename, [read, binary]), {ok, RFD1} = file:open(Filename, [read, binary]),
{Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD1), {Metadata, Reader0} = emqx_connector_aggreg_buffer:new_reader(RFD1),
{ReadTerms1, Reader1} = read_terms(length(Terms), Reader0), {ReadTerms1, Reader1} = read_terms(length(Terms), Reader0),
?assertEqual(Terms, ReadTerms1), ?assertEqual(Terms, ReadTerms1),
?assertError( ?assertError(
badarg, badarg,
emqx_bridge_s3_aggreg_buffer:read(Reader1) emqx_connector_aggreg_buffer:read(Reader1)
), ),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos div 2),
{ok, RFD2} = file:open(Filename, [read, binary]), {ok, RFD2} = file:open(Filename, [read, binary]),
{Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD2), {Metadata, Reader2} = emqx_connector_aggreg_buffer:new_reader(RFD2),
{ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2), {ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2),
?assertEqual(lists:sublist(Terms, 3), ReadTerms2), ?assertEqual(lists:sublist(Terms, 3), ReadTerms2),
?assertError( ?assertError(
badarg, badarg,
emqx_bridge_s3_aggreg_buffer:read(Reader3) emqx_connector_aggreg_buffer:read(Reader3)
). ).
t_read_truncated_takeover_write(Config) -> t_read_truncated_takeover_write(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config), Filename = mk_filename(?FUNCTION_NAME, Config),
{ok, WFD} = file:open(Filename, [write, binary]), {ok, WFD} = file:open(Filename, [write, binary]),
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
Writer1 = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), Writer1 = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata),
Terms1 = [ Terms1 = [
[[[[[[[[[[[]]]]]]]]]]], [[[[[[[[[[[]]]]]]]]]]],
lists:seq(1, 10000), lists:seq(1, 10000),
@ -129,14 +129,14 @@ t_read_truncated_takeover_write(Config) ->
{<<"application/x-octet-stream">>, rand:bytes(102400)} {<<"application/x-octet-stream">>, rand:bytes(102400)}
], ],
ok = lists:foreach( ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer1)) end, fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer1)) end,
Terms1 Terms1
), ),
{ok, WPos} = file:position(WFD, cur), {ok, WPos} = file:position(WFD, cur),
ok = file:close(WFD), ok = file:close(WFD),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos div 2),
{ok, RWFD} = file:open(Filename, [read, write, binary]), {ok, RWFD} = file:open(Filename, [read, write, binary]),
{Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RWFD), {Metadata, Reader0} = emqx_connector_aggreg_buffer:new_reader(RWFD),
{ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0), {ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0),
?assertEqual( ?assertEqual(
lists:sublist(Terms1, 3), lists:sublist(Terms1, 3),
@ -144,16 +144,16 @@ t_read_truncated_takeover_write(Config) ->
), ),
?assertError( ?assertError(
badarg, badarg,
emqx_bridge_s3_aggreg_buffer:read(Reader1) emqx_connector_aggreg_buffer:read(Reader1)
), ),
Writer2 = emqx_bridge_s3_aggreg_buffer:takeover(Reader1), Writer2 = emqx_connector_aggreg_buffer:takeover(Reader1),
ok = lists:foreach( ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer2)) end, fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer2)) end,
Terms2 Terms2
), ),
ok = file:close(RWFD), ok = file:close(RWFD),
{ok, RFD} = file:open(Filename, [read, binary]), {ok, RFD} = file:open(Filename, [read, binary]),
{Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), {Metadata, Reader2} = emqx_connector_aggreg_buffer:new_reader(RFD),
ReadTerms2 = read_until_eof(Reader2), ReadTerms2 = read_until_eof(Reader2),
?assertEqual( ?assertEqual(
lists:sublist(Terms1, 3) ++ Terms2, lists:sublist(Terms1, 3) ++ Terms2,
@ -168,12 +168,12 @@ mk_filename(Name, Config) ->
read_terms(0, Reader) -> read_terms(0, Reader) ->
{[], Reader}; {[], Reader};
read_terms(N, Reader0) -> read_terms(N, Reader0) ->
{Term, Reader1} = emqx_bridge_s3_aggreg_buffer:read(Reader0), {Term, Reader1} = emqx_connector_aggreg_buffer:read(Reader0),
{Terms, Reader} = read_terms(N - 1, Reader1), {Terms, Reader} = read_terms(N - 1, Reader1),
{[Term | Terms], Reader}. {[Term | Terms], Reader}.
read_until_eof(Reader0) -> read_until_eof(Reader0) ->
case emqx_bridge_s3_aggreg_buffer:read(Reader0) of case emqx_connector_aggreg_buffer:read(Reader0) of
{Term, Reader} -> {Term, Reader} ->
[Term | read_until_eof(Reader)]; [Term | read_until_eof(Reader)];
eof -> eof ->

View File

@ -2,12 +2,12 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_csv_tests). -module(emqx_connector_aggreg_csv_tests).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
encoding_test() -> encoding_test() ->
CSV = emqx_bridge_s3_aggreg_csv:new(#{}), CSV = emqx_connector_aggreg_csv:new(#{}),
?assertEqual( ?assertEqual(
"A,B,Ç\n" "A,B,Ç\n"
"1.2345,string,0.0\n" "1.2345,string,0.0\n"
@ -28,7 +28,7 @@ encoding_test() ->
column_order_test() -> column_order_test() ->
Order = [<<"ID">>, <<"TS">>], Order = [<<"ID">>, <<"TS">>],
CSV = emqx_bridge_s3_aggreg_csv:new(#{column_order => Order}), CSV = emqx_connector_aggreg_csv:new(#{column_order => Order}),
?assertEqual( ?assertEqual(
"ID,TS,A,B,D\n" "ID,TS,A,B,D\n"
"1,2024-01-01,12.34,str,\"[]\"\n" "1,2024-01-01,12.34,str,\"[]\"\n"
@ -63,10 +63,10 @@ fill_close(CSV, LRecords) ->
string(fill_close_(CSV, LRecords)). string(fill_close_(CSV, LRecords)).
fill_close_(CSV0, [Records | LRest]) -> fill_close_(CSV0, [Records | LRest]) ->
{Writes, CSV} = emqx_bridge_s3_aggreg_csv:fill(Records, CSV0), {Writes, CSV} = emqx_connector_aggreg_csv:fill(Records, CSV0),
[Writes | fill_close_(CSV, LRest)]; [Writes | fill_close_(CSV, LRest)];
fill_close_(CSV, []) -> fill_close_(CSV, []) ->
[emqx_bridge_s3_aggreg_csv:close(CSV)]. [emqx_connector_aggreg_csv:close(CSV)].
string(Writes) -> string(Writes) ->
unicode:characters_to_list(Writes). unicode:characters_to_list(Writes).

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_connector_aggregator_test_helpers).
-compile(nowarn_export_all).
-compile(export_all).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% File utilities
%%------------------------------------------------------------------------------
truncate_at(Filename, Pos) ->
{ok, FD} = file:open(Filename, [read, write, binary]),
{ok, Pos} = file:position(FD, Pos),
ok = file:truncate(FD),
ok = file:close(FD).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -89,6 +89,7 @@
emqx_license, emqx_license,
emqx_enterprise, emqx_enterprise,
emqx_message_validation, emqx_message_validation,
emqx_connector_aggregator,
emqx_bridge_kafka, emqx_bridge_kafka,
emqx_bridge_pulsar, emqx_bridge_pulsar,
emqx_bridge_gcp_pubsub, emqx_bridge_gcp_pubsub,

View File

@ -158,6 +158,7 @@ defmodule EMQXUmbrella.MixProject do
# need to remove those when listing `/apps/`... # need to remove those when listing `/apps/`...
defp enterprise_umbrella_apps(_release_type) do defp enterprise_umbrella_apps(_release_type) do
MapSet.new([ MapSet.new([
:emqx_connector_aggregator,
:emqx_bridge_kafka, :emqx_bridge_kafka,
:emqx_bridge_confluent, :emqx_bridge_confluent,
:emqx_bridge_gcp_pubsub, :emqx_bridge_gcp_pubsub,

View File

@ -76,6 +76,7 @@ is_cover_enabled() ->
is_enterprise(ce) -> false; is_enterprise(ce) -> false;
is_enterprise(ee) -> true. is_enterprise(ee) -> true.
is_community_umbrella_app("apps/emqx_connector_aggregator") -> false;
is_community_umbrella_app("apps/emqx_bridge_kafka") -> false; is_community_umbrella_app("apps/emqx_bridge_kafka") -> false;
is_community_umbrella_app("apps/emqx_bridge_confluent") -> false; is_community_umbrella_app("apps/emqx_bridge_confluent") -> false;
is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false; is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false;