Merge pull request #12973 from thalesmg/shared-bridge-aggregator-r57-20240503
refactor: extract blob aggregator logic to separate application
This commit is contained in:
commit
83a0206aaa
|
@ -2,5 +2,6 @@
|
|||
|
||||
{erl_opts, [debug_info]}.
|
||||
{deps, [
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||
{emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}}
|
||||
]}.
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
stdlib,
|
||||
erlcloud,
|
||||
emqx_resource,
|
||||
emqx_connector_aggregator,
|
||||
emqx_s3
|
||||
]},
|
||||
{env, [
|
||||
|
@ -18,7 +19,6 @@
|
|||
emqx_bridge_s3_connector_info
|
||||
]}
|
||||
]},
|
||||
{mod, {emqx_bridge_s3_app, []}},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
]}.
|
||||
|
|
|
@ -1,212 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% This module takes aggregated records from a buffer and delivers them to S3,
|
||||
%% wrapped in a configurable container (though currently there's only CSV).
|
||||
-module(emqx_bridge_s3_aggreg_delivery).
|
||||
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include("emqx_bridge_s3_aggregator.hrl").
|
||||
|
||||
-export([start_link/3]).
|
||||
|
||||
%% Internal exports
|
||||
-export([
|
||||
init/4,
|
||||
loop/3
|
||||
]).
|
||||
|
||||
-behaviour(emqx_template).
|
||||
-export([lookup/2]).
|
||||
|
||||
%% Sys
|
||||
-export([
|
||||
system_continue/3,
|
||||
system_terminate/4,
|
||||
format_status/2
|
||||
]).
|
||||
|
||||
-record(delivery, {
|
||||
name :: _Name,
|
||||
container :: emqx_bridge_s3_aggreg_csv:container(),
|
||||
reader :: emqx_bridge_s3_aggreg_buffer:reader(),
|
||||
upload :: emqx_s3_upload:t(),
|
||||
empty :: boolean()
|
||||
}).
|
||||
|
||||
-type state() :: #delivery{}.
|
||||
|
||||
%%
|
||||
|
||||
start_link(Name, Buffer, Opts) ->
|
||||
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
|
||||
|
||||
%%
|
||||
|
||||
-spec init(pid(), _Name, buffer(), _Opts :: map()) -> no_return().
|
||||
init(Parent, Name, Buffer, Opts) ->
|
||||
?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
|
||||
Reader = open_buffer(Buffer),
|
||||
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
|
||||
_ = erlang:process_flag(trap_exit, true),
|
||||
ok = proc_lib:init_ack({ok, self()}),
|
||||
loop(Delivery, Parent, []).
|
||||
|
||||
init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) ->
|
||||
#delivery{
|
||||
name = Name,
|
||||
container = mk_container(ContainerOpts),
|
||||
reader = Reader,
|
||||
upload = mk_upload(Buffer, Opts),
|
||||
empty = true
|
||||
}.
|
||||
|
||||
open_buffer(#buffer{filename = Filename}) ->
|
||||
case file:open(Filename, [read, binary, raw]) of
|
||||
{ok, FD} ->
|
||||
{_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD),
|
||||
Reader;
|
||||
{error, Reason} ->
|
||||
error({buffer_open_failed, Reason})
|
||||
end.
|
||||
|
||||
mk_container(#{type := csv, column_order := OrderOpt}) ->
|
||||
%% TODO: Deduplicate?
|
||||
ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
|
||||
emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}).
|
||||
|
||||
mk_upload(
|
||||
Buffer,
|
||||
Opts = #{
|
||||
bucket := Bucket,
|
||||
upload_options := UploadOpts,
|
||||
client_config := Config,
|
||||
uploader_config := UploaderConfig
|
||||
}
|
||||
) ->
|
||||
Client = emqx_s3_client:create(Bucket, Config),
|
||||
Key = mk_object_key(Buffer, Opts),
|
||||
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
|
||||
|
||||
mk_object_key(Buffer, #{action := Name, key := Template}) ->
|
||||
emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
|
||||
|
||||
%%
|
||||
|
||||
-spec loop(state(), pid(), [sys:debug_option()]) -> no_return().
|
||||
loop(Delivery, Parent, Debug) ->
|
||||
%% NOTE: This function is mocked in tests.
|
||||
receive
|
||||
Msg -> handle_msg(Msg, Delivery, Parent, Debug)
|
||||
after 0 ->
|
||||
process_delivery(Delivery, Parent, Debug)
|
||||
end.
|
||||
|
||||
process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) ->
|
||||
case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
|
||||
{Records = [#{} | _], Reader} ->
|
||||
Delivery1 = Delivery0#delivery{reader = Reader},
|
||||
Delivery2 = process_append_records(Records, Delivery1),
|
||||
Delivery = process_write(Delivery2),
|
||||
loop(Delivery, Parent, Debug);
|
||||
{[], Reader} ->
|
||||
Delivery = Delivery0#delivery{reader = Reader},
|
||||
loop(Delivery, Parent, Debug);
|
||||
eof ->
|
||||
process_complete(Delivery0);
|
||||
{Unexpected, _Reader} ->
|
||||
exit({buffer_unexpected_record, Unexpected})
|
||||
end.
|
||||
|
||||
process_append_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) ->
|
||||
{Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0),
|
||||
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
|
||||
Delivery#delivery{
|
||||
container = Container,
|
||||
upload = Upload,
|
||||
empty = false
|
||||
}.
|
||||
|
||||
process_write(Delivery = #delivery{upload = Upload0}) ->
|
||||
case emqx_s3_upload:write(Upload0) of
|
||||
{ok, Upload} ->
|
||||
Delivery#delivery{upload = Upload};
|
||||
{cont, Upload} ->
|
||||
process_write(Delivery#delivery{upload = Upload});
|
||||
{error, Reason} ->
|
||||
_ = emqx_s3_upload:abort(Upload0),
|
||||
exit({upload_failed, Reason})
|
||||
end.
|
||||
|
||||
process_complete(#delivery{name = Name, empty = true}) ->
|
||||
?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}),
|
||||
exit({shutdown, {skipped, empty}});
|
||||
process_complete(#delivery{name = Name, container = Container, upload = Upload0}) ->
|
||||
Trailer = emqx_bridge_s3_aggreg_csv:close(Container),
|
||||
{ok, Upload} = emqx_s3_upload:append(Trailer, Upload0),
|
||||
case emqx_s3_upload:complete(Upload) of
|
||||
{ok, Completed} ->
|
||||
?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
_ = emqx_s3_upload:abort(Upload),
|
||||
exit({upload_failed, Reason})
|
||||
end.
|
||||
|
||||
%%
|
||||
|
||||
handle_msg({system, From, Msg}, Delivery, Parent, Debug) ->
|
||||
sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery);
|
||||
handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) ->
|
||||
system_terminate(Reason, Parent, Debug, Delivery);
|
||||
handle_msg(_Msg, Delivery, Parent, Debug) ->
|
||||
loop(Parent, Debug, Delivery).
|
||||
|
||||
-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return().
|
||||
system_continue(Parent, Debug, Delivery) ->
|
||||
loop(Delivery, Parent, Debug).
|
||||
|
||||
-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
|
||||
system_terminate(_Reason, _Parent, _Debug, #delivery{upload = Upload}) ->
|
||||
emqx_s3_upload:abort(Upload).
|
||||
|
||||
-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
|
||||
format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
|
||||
Delivery#delivery{
|
||||
upload = emqx_s3_upload:format(Delivery#delivery.upload)
|
||||
}.
|
||||
|
||||
%%
|
||||
|
||||
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
|
||||
{ok, integer() | string()} | {error, undefined}.
|
||||
lookup([<<"action">>], {Name, _Buffer}) ->
|
||||
{ok, mk_fs_safe_string(Name)};
|
||||
lookup(Accessor, {_Name, Buffer = #buffer{}}) ->
|
||||
lookup_buffer_var(Accessor, Buffer);
|
||||
lookup(_Accessor, _Context) ->
|
||||
{error, undefined}.
|
||||
|
||||
lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) ->
|
||||
{ok, format_timestamp(Since, Format)};
|
||||
lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) ->
|
||||
{ok, format_timestamp(Until, Format)};
|
||||
lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) ->
|
||||
{ok, Seq};
|
||||
lookup_buffer_var([<<"node">>], #buffer{}) ->
|
||||
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
|
||||
lookup_buffer_var(_Binding, _Context) ->
|
||||
{error, undefined}.
|
||||
|
||||
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
|
||||
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
|
||||
mk_fs_safe_string(String);
|
||||
format_timestamp(Timestamp, <<"rfc3339">>) ->
|
||||
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
|
||||
mk_fs_safe_string(String);
|
||||
format_timestamp(Timestamp, <<"unix">>) ->
|
||||
Timestamp.
|
||||
|
||||
mk_fs_safe_string(String) ->
|
||||
unicode:characters_to_binary(string:replace(String, ":", "_", all)).
|
|
@ -1,16 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_app).
|
||||
|
||||
-behaviour(application).
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
%%
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
emqx_bridge_s3_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
|
@ -8,6 +8,7 @@
|
|||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
-include_lib("emqx/include/emqx_trace.hrl").
|
||||
-include_lib("emqx_connector_aggregator/include/emqx_connector_aggregator.hrl").
|
||||
-include("emqx_bridge_s3.hrl").
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
@ -24,6 +25,19 @@
|
|||
on_get_channel_status/3
|
||||
]).
|
||||
|
||||
-behaviour(emqx_connector_aggreg_delivery).
|
||||
-export([
|
||||
init_transfer_state/2,
|
||||
process_append/2,
|
||||
process_write/1,
|
||||
process_complete/1,
|
||||
process_format_status/1,
|
||||
process_terminate/1
|
||||
]).
|
||||
|
||||
-behaviour(emqx_template).
|
||||
-export([lookup/2]).
|
||||
|
||||
-type config() :: #{
|
||||
access_key_id => string(),
|
||||
secret_access_key => emqx_secret:t(string()),
|
||||
|
@ -205,13 +219,14 @@ start_channel(State, #{
|
|||
key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters),
|
||||
container => Container,
|
||||
upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters),
|
||||
callback_module => ?MODULE,
|
||||
client_config => maps:get(client_config, State),
|
||||
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
|
||||
},
|
||||
_ = emqx_bridge_s3_sup:delete_child({Type, Name}),
|
||||
{ok, SupPid} = emqx_bridge_s3_sup:start_child(#{
|
||||
_ = emqx_connector_aggreg_sup:delete_child({Type, Name}),
|
||||
{ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
|
||||
id => {Type, Name},
|
||||
start => {emqx_bridge_s3_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]},
|
||||
start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]},
|
||||
type => supervisor,
|
||||
restart => permanent
|
||||
}),
|
||||
|
@ -220,7 +235,7 @@ start_channel(State, #{
|
|||
name => Name,
|
||||
bucket => Bucket,
|
||||
supervisor => SupPid,
|
||||
on_stop => fun() -> emqx_bridge_s3_sup:delete_child({Type, Name}) end
|
||||
on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end
|
||||
}.
|
||||
|
||||
upload_options(Parameters) ->
|
||||
|
@ -242,7 +257,7 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
|
|||
channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) ->
|
||||
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
|
||||
Timestamp = erlang:system_time(second),
|
||||
ok = emqx_bridge_s3_aggregator:tick(Name, Timestamp),
|
||||
ok = emqx_connector_aggregator:tick(Name, Timestamp),
|
||||
ok = check_bucket_accessible(Bucket, State),
|
||||
ok = check_aggreg_upload_errors(Name),
|
||||
?status_connected.
|
||||
|
@ -264,7 +279,7 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
|
|||
end.
|
||||
|
||||
check_aggreg_upload_errors(Name) ->
|
||||
case emqx_bridge_s3_aggregator:take_error(Name) of
|
||||
case emqx_connector_aggregator:take_error(Name) of
|
||||
[Error] ->
|
||||
%% TODO
|
||||
%% This approach means that, for example, 3 upload failures will cause
|
||||
|
@ -340,7 +355,7 @@ run_simple_upload(
|
|||
|
||||
run_aggregated_upload(InstId, Records, #{name := Name}) ->
|
||||
Timestamp = erlang:system_time(second),
|
||||
case emqx_bridge_s3_aggregator:push_records(Name, Timestamp, Records) of
|
||||
case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of
|
||||
ok ->
|
||||
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}),
|
||||
ok;
|
||||
|
@ -376,3 +391,84 @@ render_content(Template, Data) ->
|
|||
|
||||
iolist_to_string(IOList) ->
|
||||
unicode:characters_to_list(IOList).
|
||||
|
||||
%% `emqx_connector_aggreg_delivery` APIs
|
||||
|
||||
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().
|
||||
init_transfer_state(BufferMap, Opts) ->
|
||||
#{
|
||||
bucket := Bucket,
|
||||
upload_options := UploadOpts,
|
||||
client_config := Config,
|
||||
uploader_config := UploaderConfig
|
||||
} = Opts,
|
||||
Client = emqx_s3_client:create(Bucket, Config),
|
||||
Key = mk_object_key(BufferMap, Opts),
|
||||
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
|
||||
|
||||
mk_object_key(BufferMap, #{action := Name, key := Template}) ->
|
||||
emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}).
|
||||
|
||||
process_append(Writes, Upload0) ->
|
||||
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
|
||||
Upload.
|
||||
|
||||
process_write(Upload0) ->
|
||||
case emqx_s3_upload:write(Upload0) of
|
||||
{ok, Upload} ->
|
||||
{ok, Upload};
|
||||
{cont, Upload} ->
|
||||
process_write(Upload);
|
||||
{error, Reason} ->
|
||||
_ = emqx_s3_upload:abort(Upload0),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
process_complete(Upload) ->
|
||||
case emqx_s3_upload:complete(Upload) of
|
||||
{ok, Completed} ->
|
||||
{ok, Completed};
|
||||
{error, Reason} ->
|
||||
_ = emqx_s3_upload:abort(Upload),
|
||||
exit({upload_failed, Reason})
|
||||
end.
|
||||
|
||||
process_format_status(Upload) ->
|
||||
emqx_s3_upload:format(Upload).
|
||||
|
||||
process_terminate(Upload) ->
|
||||
emqx_s3_upload:abort(Upload).
|
||||
|
||||
%% `emqx_template` APIs
|
||||
|
||||
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
|
||||
{ok, integer() | string()} | {error, undefined}.
|
||||
lookup([<<"action">>], {Name, _Buffer}) ->
|
||||
{ok, mk_fs_safe_string(Name)};
|
||||
lookup(Accessor, {_Name, Buffer = #{}}) ->
|
||||
lookup_buffer_var(Accessor, Buffer);
|
||||
lookup(_Accessor, _Context) ->
|
||||
{error, undefined}.
|
||||
|
||||
lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) ->
|
||||
{ok, format_timestamp(Since, Format)};
|
||||
lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) ->
|
||||
{ok, format_timestamp(Until, Format)};
|
||||
lookup_buffer_var([<<"sequence">>], #{seq := Seq}) ->
|
||||
{ok, Seq};
|
||||
lookup_buffer_var([<<"node">>], #{}) ->
|
||||
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
|
||||
lookup_buffer_var(_Binding, _Context) ->
|
||||
{error, undefined}.
|
||||
|
||||
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
|
||||
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
|
||||
mk_fs_safe_string(String);
|
||||
format_timestamp(Timestamp, <<"rfc3339">>) ->
|
||||
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
|
||||
mk_fs_safe_string(String);
|
||||
format_timestamp(Timestamp, <<"unix">>) ->
|
||||
Timestamp.
|
||||
|
||||
mk_fs_safe_string(String) ->
|
||||
unicode:characters_to_binary(string:replace(String, ":", "_", all)).
|
||||
|
|
|
@ -170,7 +170,7 @@ t_aggreg_upload(Config) ->
|
|||
]),
|
||||
ok = send_messages(BridgeName, MessageEvents),
|
||||
%% Wait until the delivery is completed.
|
||||
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
|
||||
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
||||
%% Check the uploaded objects.
|
||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||
?assertMatch(
|
||||
|
@ -217,7 +217,7 @@ t_aggreg_upload_rule(Config) ->
|
|||
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
|
||||
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
|
||||
]),
|
||||
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
|
||||
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
||||
%% Check the uploaded objects.
|
||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
|
||||
|
@ -258,15 +258,15 @@ t_aggreg_upload_restart(Config) ->
|
|||
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
|
||||
]),
|
||||
ok = send_messages(BridgeName, MessageEvents),
|
||||
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}),
|
||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
|
||||
%% Restart the bridge.
|
||||
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
|
||||
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
||||
%% Send some more messages.
|
||||
ok = send_messages(BridgeName, MessageEvents),
|
||||
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}),
|
||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
|
||||
%% Wait until the delivery is completed.
|
||||
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
|
||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
||||
%% Check there's still only one upload.
|
||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
|
||||
|
@ -300,18 +300,18 @@ t_aggreg_upload_restart_corrupted(Config) ->
|
|||
%% Ensure that they span multiple batch queries.
|
||||
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
|
||||
{ok, _} = ?block_until(
|
||||
#{?snk_kind := s3_aggreg_records_written, action := BridgeName},
|
||||
#{?snk_kind := connector_aggreg_records_written, action := BridgeName},
|
||||
infinity,
|
||||
0
|
||||
),
|
||||
%% Find out the buffer file.
|
||||
{ok, #{filename := Filename}} = ?block_until(
|
||||
#{?snk_kind := s3_aggreg_buffer_allocated, action := BridgeName}
|
||||
#{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName}
|
||||
),
|
||||
%% Stop the bridge, corrupt the buffer file, and restart the bridge.
|
||||
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
|
||||
BufferFileSize = filelib:file_size(Filename),
|
||||
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, BufferFileSize div 2),
|
||||
ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, BufferFileSize div 2),
|
||||
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
||||
%% Send some more messages.
|
||||
Messages2 = [
|
||||
|
@ -320,7 +320,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
|
|||
],
|
||||
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
|
||||
%% Wait until the delivery is completed.
|
||||
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
|
||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
||||
%% Check that upload contains part of the first batch and all of the second batch.
|
||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||
CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
||||
|
@ -362,7 +362,7 @@ t_aggreg_pending_upload_restart(Config) ->
|
|||
%% Restart the bridge.
|
||||
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
||||
%% Wait until the delivery is completed.
|
||||
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
|
||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
||||
%% Check that delivery contains all the messages.
|
||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||
[_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
||||
|
@ -392,7 +392,9 @@ t_aggreg_next_rotate(Config) ->
|
|||
NSent = receive_sender_reports(Senders),
|
||||
%% Wait for the last delivery to complete.
|
||||
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
|
||||
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}, infinity, 0),
|
||||
?block_until(
|
||||
#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0
|
||||
),
|
||||
%% There should be at least 2 time windows of aggregated records.
|
||||
Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
|
||||
DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]],
|
||||
|
|
|
@ -48,11 +48,3 @@ list_pending_uploads(Bucket, Key) ->
|
|||
{ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig),
|
||||
Uploads = proplists:get_value(uploads, Props),
|
||||
lists:map(fun maps:from_list/1, Uploads).
|
||||
|
||||
%% File utilities
|
||||
|
||||
truncate_at(Filename, Pos) ->
|
||||
{ok, FD} = file:open(Filename, [read, write, binary]),
|
||||
{ok, Pos} = file:position(FD, Pos),
|
||||
ok = file:truncate(FD),
|
||||
ok = file:close(FD).
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
Business Source License 1.1
|
||||
|
||||
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||
Licensed Work: EMQX Enterprise Edition
|
||||
The Licensed Work is (c) 2024
|
||||
Hangzhou EMQ Technologies Co., Ltd.
|
||||
Additional Use Grant: Students and educators are granted right to copy,
|
||||
modify, and create derivative work for research
|
||||
or education.
|
||||
Change Date: 2028-05-06
|
||||
Change License: Apache License, Version 2.0
|
||||
|
||||
For information about alternative licensing arrangements for the Software,
|
||||
please contact Licensor: https://www.emqx.com/en/contact
|
||||
|
||||
Notice
|
||||
|
||||
The Business Source License (this document, or the “License”) is not an Open
|
||||
Source license. However, the Licensed Work will eventually be made available
|
||||
under an Open Source License, as stated in this License.
|
||||
|
||||
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
|
||||
“Business Source License” is a trademark of MariaDB Corporation Ab.
|
||||
|
||||
-----------------------------------------------------------------------------
|
||||
|
||||
Business Source License 1.1
|
||||
|
||||
Terms
|
||||
|
||||
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||
works, redistribute, and make non-production use of the Licensed Work. The
|
||||
Licensor may make an Additional Use Grant, above, permitting limited
|
||||
production use.
|
||||
|
||||
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||
available distribution of a specific version of the Licensed Work under this
|
||||
License, whichever comes first, the Licensor hereby grants you rights under
|
||||
the terms of the Change License, and the rights granted in the paragraph
|
||||
above terminate.
|
||||
|
||||
If your use of the Licensed Work does not comply with the requirements
|
||||
currently in effect as described in this License, you must purchase a
|
||||
commercial license from the Licensor, its affiliated entities, or authorized
|
||||
resellers, or you must refrain from using the Licensed Work.
|
||||
|
||||
All copies of the original and modified Licensed Work, and derivative works
|
||||
of the Licensed Work, are subject to this License. This License applies
|
||||
separately for each version of the Licensed Work and the Change Date may vary
|
||||
for each version of the Licensed Work released by Licensor.
|
||||
|
||||
You must conspicuously display this License on each original or modified copy
|
||||
of the Licensed Work. If you receive the Licensed Work in original or
|
||||
modified form from a third party, the terms and conditions set forth in this
|
||||
License apply to your use of that work.
|
||||
|
||||
Any use of the Licensed Work in violation of this License will automatically
|
||||
terminate your rights under this License for the current and all other
|
||||
versions of the Licensed Work.
|
||||
|
||||
This License does not grant you any right in any trademark or logo of
|
||||
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||
Licensor as expressly required by this License).
|
||||
|
||||
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||
TITLE.
|
||||
|
||||
MariaDB hereby grants you permission to use this License’s text to license
|
||||
your works, and to refer to it using the trademark “Business Source License”,
|
||||
as long as you comply with the Covenants of Licensor below.
|
||||
|
||||
Covenants of Licensor
|
||||
|
||||
In consideration of the right to use this License’s text and the “Business
|
||||
Source License” name and trademark, Licensor covenants to MariaDB, and to all
|
||||
other recipients of the licensed work to be provided by Licensor:
|
||||
|
||||
1. To specify as the Change License the GPL Version 2.0 or any later version,
|
||||
or a license that is compatible with GPL Version 2.0 or a later version,
|
||||
where “compatible” means that software provided under the Change License can
|
||||
be included in a program with software provided under GPL Version 2.0 or a
|
||||
later version. Licensor may specify additional Change Licenses without
|
||||
limitation.
|
||||
|
||||
2. To either: (a) specify an additional grant of rights to use that does not
|
||||
impose any additional restriction on the right granted in this License, as
|
||||
the Additional Use Grant; or (b) insert the text “None”.
|
||||
|
||||
3. To specify a Change Date.
|
||||
|
||||
4. Not to modify this License in any other way.
|
|
@ -0,0 +1,11 @@
|
|||
# EMQX Connector Aggregator
|
||||
|
||||
This application provides common logic for connector and action implementations of EMQX to aggregate multiple incoming messsages into a container file before sending it to a blob storage backend.
|
||||
|
||||
## Contributing
|
||||
|
||||
Please see our [contributing.md](../../CONTRIBUTING.md).
|
||||
|
||||
## License
|
||||
|
||||
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).
|
|
@ -3,8 +3,8 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
-record(buffer, {
|
||||
since :: emqx_bridge_s3_aggregator:timestamp(),
|
||||
until :: emqx_bridge_s3_aggregator:timestamp(),
|
||||
since :: emqx_connector_aggregator:timestamp(),
|
||||
until :: emqx_connector_aggregator:timestamp(),
|
||||
seq :: non_neg_integer(),
|
||||
filename :: file:filename(),
|
||||
fd :: file:io_device() | undefined,
|
||||
|
@ -13,3 +13,11 @@
|
|||
}).
|
||||
|
||||
-type buffer() :: #buffer{}.
|
||||
|
||||
-type buffer_map() :: #{
|
||||
since := emqx_connector_aggregator:timestamp(),
|
||||
until := emqx_connector_aggregator:timestamp(),
|
||||
seq := non_neg_integer(),
|
||||
filename := file:filename(),
|
||||
max_records := pos_integer() | undefined
|
||||
}.
|
|
@ -0,0 +1,7 @@
|
|||
%% -*- mode: erlang; -*-
|
||||
|
||||
{deps, [
|
||||
{emqx, {path, "../../apps/emqx"}}
|
||||
]}.
|
||||
|
||||
{project_plugins, [erlfmt]}.
|
|
@ -0,0 +1,25 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_connector_aggreg_app).
|
||||
|
||||
-behaviour(application).
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `application` API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
emqx_connector_aggreg_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
|
@ -17,7 +17,7 @@
|
|||
%% ...
|
||||
%% ```
|
||||
%% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`).
|
||||
-module(emqx_bridge_s3_aggreg_buffer).
|
||||
-module(emqx_connector_aggreg_buffer).
|
||||
|
||||
-export([
|
||||
new_writer/2,
|
|
@ -2,8 +2,8 @@
|
|||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% CSV container implementation for `emqx_bridge_s3_aggregator`.
|
||||
-module(emqx_bridge_s3_aggreg_csv).
|
||||
%% CSV container implementation for `emqx_connector_aggregator`.
|
||||
-module(emqx_connector_aggreg_csv).
|
||||
|
||||
%% Container API
|
||||
-export([
|
||||
|
@ -33,7 +33,7 @@
|
|||
column_order => [column()]
|
||||
}.
|
||||
|
||||
-type record() :: emqx_bridge_s3_aggregator:record().
|
||||
-type record() :: emqx_connector_aggregator:record().
|
||||
-type column() :: binary().
|
||||
|
||||
%%
|
|
@ -0,0 +1,195 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% This module takes aggregated records from a buffer and delivers them to a blob storage
|
||||
%% backend, wrapped in a configurable container (though currently there's only CSV).
|
||||
-module(emqx_connector_aggreg_delivery).
|
||||
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include("emqx_connector_aggregator.hrl").
|
||||
|
||||
-export([start_link/3]).
|
||||
|
||||
%% Internal exports
|
||||
-export([
|
||||
init/4,
|
||||
loop/3
|
||||
]).
|
||||
|
||||
%% Sys
|
||||
-export([
|
||||
system_continue/3,
|
||||
system_terminate/4,
|
||||
format_status/2
|
||||
]).
|
||||
|
||||
-record(delivery, {
|
||||
name :: _Name,
|
||||
callback_module :: module(),
|
||||
container :: emqx_connector_aggreg_csv:container(),
|
||||
reader :: emqx_connector_aggreg_buffer:reader(),
|
||||
transfer :: transfer_state(),
|
||||
empty :: boolean()
|
||||
}).
|
||||
|
||||
-type state() :: #delivery{}.
|
||||
|
||||
-type init_opts() :: #{
|
||||
callback_module := module(),
|
||||
any() => term()
|
||||
}.
|
||||
|
||||
-type transfer_state() :: term().
|
||||
|
||||
%% @doc Initialize the transfer state, such as blob storage path, transfer options, client
|
||||
%% credentials, etc. .
|
||||
-callback init_transfer_state(buffer_map(), map()) -> transfer_state().
|
||||
|
||||
%% @doc Append data to the transfer before sending. Usually should not fail.
|
||||
-callback process_append(iodata(), transfer_state()) -> transfer_state().
|
||||
|
||||
%% @doc Push appended transfer data to its destination (e.g.: upload a part of a
|
||||
%% multi-part upload). May fail.
|
||||
-callback process_write(transfer_state()) -> {ok, transfer_state()} | {error, term()}.
|
||||
|
||||
%% @doc Finalize the transfer and clean up any resources. May return a term summarizing
|
||||
%% the transfer.
|
||||
-callback process_complete(transfer_state()) -> {ok, term()}.
|
||||
|
||||
%%
|
||||
|
||||
start_link(Name, Buffer, Opts) ->
|
||||
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
|
||||
|
||||
%%
|
||||
|
||||
-spec init(pid(), _Name, buffer(), init_opts()) -> no_return().
|
||||
init(Parent, Name, Buffer, Opts) ->
|
||||
?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
|
||||
Reader = open_buffer(Buffer),
|
||||
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
|
||||
_ = erlang:process_flag(trap_exit, true),
|
||||
ok = proc_lib:init_ack({ok, self()}),
|
||||
loop(Delivery, Parent, []).
|
||||
|
||||
init_delivery(
|
||||
Name,
|
||||
Reader,
|
||||
Buffer,
|
||||
Opts = #{
|
||||
container := ContainerOpts,
|
||||
callback_module := Mod
|
||||
}
|
||||
) ->
|
||||
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
|
||||
#delivery{
|
||||
name = Name,
|
||||
callback_module = Mod,
|
||||
container = mk_container(ContainerOpts),
|
||||
reader = Reader,
|
||||
transfer = Mod:init_transfer_state(BufferMap, Opts),
|
||||
empty = true
|
||||
}.
|
||||
|
||||
open_buffer(#buffer{filename = Filename}) ->
|
||||
case file:open(Filename, [read, binary, raw]) of
|
||||
{ok, FD} ->
|
||||
{_Meta, Reader} = emqx_connector_aggreg_buffer:new_reader(FD),
|
||||
Reader;
|
||||
{error, Reason} ->
|
||||
error(#{reason => buffer_open_failed, file => Filename, posix => Reason})
|
||||
end.
|
||||
|
||||
mk_container(#{type := csv, column_order := OrderOpt}) ->
|
||||
%% TODO: Deduplicate?
|
||||
ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
|
||||
emqx_connector_aggreg_csv:new(#{column_order => ColumnOrder}).
|
||||
|
||||
%%
|
||||
|
||||
-spec loop(state(), pid(), [sys:debug_option()]) -> no_return().
|
||||
loop(Delivery, Parent, Debug) ->
|
||||
%% NOTE: This function is mocked in tests.
|
||||
receive
|
||||
Msg -> handle_msg(Msg, Delivery, Parent, Debug)
|
||||
after 0 ->
|
||||
process_delivery(Delivery, Parent, Debug)
|
||||
end.
|
||||
|
||||
process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) ->
|
||||
case emqx_connector_aggreg_buffer:read(Reader0) of
|
||||
{Records = [#{} | _], Reader} ->
|
||||
Delivery1 = Delivery0#delivery{reader = Reader},
|
||||
Delivery2 = process_append_records(Records, Delivery1),
|
||||
Delivery = process_write(Delivery2),
|
||||
?MODULE:loop(Delivery, Parent, Debug);
|
||||
{[], Reader} ->
|
||||
Delivery = Delivery0#delivery{reader = Reader},
|
||||
?MODULE:loop(Delivery, Parent, Debug);
|
||||
eof ->
|
||||
process_complete(Delivery0);
|
||||
{Unexpected, _Reader} ->
|
||||
exit({buffer_unexpected_record, Unexpected})
|
||||
end.
|
||||
|
||||
process_append_records(
|
||||
Records,
|
||||
Delivery = #delivery{
|
||||
callback_module = Mod,
|
||||
container = Container0,
|
||||
transfer = Transfer0
|
||||
}
|
||||
) ->
|
||||
{Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0),
|
||||
Transfer = Mod:process_append(Writes, Transfer0),
|
||||
Delivery#delivery{
|
||||
container = Container,
|
||||
transfer = Transfer,
|
||||
empty = false
|
||||
}.
|
||||
|
||||
process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) ->
|
||||
case Mod:process_write(Transfer0) of
|
||||
{ok, Transfer} ->
|
||||
Delivery#delivery{transfer = Transfer};
|
||||
{error, Reason} ->
|
||||
%% Todo: handle more gracefully? Retry?
|
||||
error({transfer_failed, Reason})
|
||||
end.
|
||||
|
||||
process_complete(#delivery{name = Name, empty = true}) ->
|
||||
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}),
|
||||
exit({shutdown, {skipped, empty}});
|
||||
process_complete(#delivery{
|
||||
name = Name, callback_module = Mod, container = Container, transfer = Transfer0
|
||||
}) ->
|
||||
Trailer = emqx_connector_aggreg_csv:close(Container),
|
||||
Transfer = Mod:process_append(Trailer, Transfer0),
|
||||
{ok, Completed} = Mod:process_complete(Transfer),
|
||||
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}),
|
||||
ok.
|
||||
|
||||
%%
|
||||
|
||||
handle_msg({system, From, Msg}, Delivery, Parent, Debug) ->
|
||||
sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery);
|
||||
handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) ->
|
||||
system_terminate(Reason, Parent, Debug, Delivery);
|
||||
handle_msg(_Msg, Delivery, Parent, Debug) ->
|
||||
?MODULE:loop(Parent, Debug, Delivery).
|
||||
|
||||
-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return().
|
||||
system_continue(Parent, Debug, Delivery) ->
|
||||
?MODULE:loop(Delivery, Parent, Debug).
|
||||
|
||||
-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
|
||||
system_terminate(_Reason, _Parent, _Debug, #delivery{callback_module = Mod, transfer = Transfer}) ->
|
||||
Mod:process_terminate(Transfer).
|
||||
|
||||
-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
|
||||
format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
|
||||
#delivery{callback_module = Mod} = Delivery,
|
||||
Delivery#delivery{
|
||||
transfer = Mod:process_format_status(Delivery#delivery.transfer)
|
||||
}.
|
|
@ -2,7 +2,7 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_sup).
|
||||
-module(emqx_connector_aggreg_sup).
|
||||
|
||||
-export([
|
||||
start_link/0,
|
|
@ -2,7 +2,7 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_aggreg_upload_sup).
|
||||
-module(emqx_connector_aggreg_upload_sup).
|
||||
|
||||
-export([
|
||||
start_link/3,
|
||||
|
@ -33,7 +33,7 @@ start_delivery(Name, Buffer) ->
|
|||
supervisor:start_child(?SUPREF(Name), [Buffer]).
|
||||
|
||||
start_delivery_proc(Name, DeliveryOpts, Buffer) ->
|
||||
emqx_bridge_s3_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts).
|
||||
emqx_connector_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts).
|
||||
|
||||
%%
|
||||
|
||||
|
@ -45,7 +45,7 @@ init({root, Name, AggregOpts, DeliveryOpts}) ->
|
|||
},
|
||||
AggregatorChildSpec = #{
|
||||
id => aggregator,
|
||||
start => {emqx_bridge_s3_aggregator, start_link, [Name, AggregOpts]},
|
||||
start => {emqx_connector_aggregator, start_link, [Name, AggregOpts]},
|
||||
type => worker,
|
||||
restart => permanent
|
||||
},
|
|
@ -0,0 +1,13 @@
|
|||
{application, emqx_connector_aggregator, [
|
||||
{description, "EMQX Enterprise Connector Data Aggregator"},
|
||||
{vsn, "0.1.0"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib
|
||||
]},
|
||||
{env, []},
|
||||
{mod, {emqx_connector_aggreg_app, []}},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
]}.
|
|
@ -5,18 +5,19 @@
|
|||
%% This module manages buffers for aggregating records and offloads them
|
||||
%% to separate "delivery" processes when they are full or time interval
|
||||
%% is over.
|
||||
-module(emqx_bridge_s3_aggregator).
|
||||
-module(emqx_connector_aggregator).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
|
||||
-include("emqx_bridge_s3_aggregator.hrl").
|
||||
-include("emqx_connector_aggregator.hrl").
|
||||
|
||||
-export([
|
||||
start_link/2,
|
||||
push_records/3,
|
||||
tick/2,
|
||||
take_error/1
|
||||
take_error/1,
|
||||
buffer_to_map/1
|
||||
]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
@ -72,6 +73,15 @@ tick(Name, Timestamp) ->
|
|||
take_error(Name) ->
|
||||
gen_server:call(?SRVREF(Name), take_error).
|
||||
|
||||
buffer_to_map(#buffer{} = Buffer) ->
|
||||
#{
|
||||
since => Buffer#buffer.since,
|
||||
until => Buffer#buffer.until,
|
||||
seq => Buffer#buffer.seq,
|
||||
filename => Buffer#buffer.filename,
|
||||
max_records => Buffer#buffer.max_records
|
||||
}.
|
||||
|
||||
%%
|
||||
|
||||
write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->
|
||||
|
@ -90,9 +100,9 @@ write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records)
|
|||
end.
|
||||
|
||||
write_records(Name, Buffer = #buffer{fd = Writer}, Records) ->
|
||||
case emqx_bridge_s3_aggreg_buffer:write(Records, Writer) of
|
||||
case emqx_connector_aggreg_buffer:write(Records, Writer) of
|
||||
ok ->
|
||||
?tp(s3_aggreg_records_written, #{action => Name, records => Records}),
|
||||
?tp(connector_aggreg_records_written, #{action => Name, records => Records}),
|
||||
ok;
|
||||
{error, terminated} ->
|
||||
BufferNext = rotate_buffer(Name, Buffer),
|
||||
|
@ -250,9 +260,9 @@ compute_since(Timestamp, PrevSince, Interval) ->
|
|||
allocate_buffer(Since, Seq, St = #st{name = Name}) ->
|
||||
Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St),
|
||||
{ok, FD} = file:open(Filename, [write, binary]),
|
||||
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(FD, _Meta = []),
|
||||
Writer = emqx_connector_aggreg_buffer:new_writer(FD, _Meta = []),
|
||||
_ = add_counter(Counter),
|
||||
?tp(s3_aggreg_buffer_allocated, #{action => Name, filename => Filename}),
|
||||
?tp(connector_aggreg_buffer_allocated, #{action => Name, filename => Filename}),
|
||||
Buffer#buffer{fd = Writer}.
|
||||
|
||||
recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
|
||||
|
@ -274,7 +284,7 @@ recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
|
|||
end.
|
||||
|
||||
recover_buffer_writer(FD, Filename) ->
|
||||
try emqx_bridge_s3_aggreg_buffer:new_reader(FD) of
|
||||
try emqx_connector_aggreg_buffer:new_reader(FD) of
|
||||
{_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0)
|
||||
catch
|
||||
error:Reason ->
|
||||
|
@ -282,7 +292,7 @@ recover_buffer_writer(FD, Filename) ->
|
|||
end.
|
||||
|
||||
recover_buffer_writer(FD, Filename, Reader0, NWritten) ->
|
||||
try emqx_bridge_s3_aggreg_buffer:read(Reader0) of
|
||||
try emqx_connector_aggreg_buffer:read(Reader0) of
|
||||
{Records, Reader} when is_list(Records) ->
|
||||
recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records));
|
||||
{Unexpected, _Reader} ->
|
||||
|
@ -303,7 +313,7 @@ recover_buffer_writer(FD, Filename, Reader0, NWritten) ->
|
|||
"Buffer is truncated or corrupted somewhere in the middle. "
|
||||
"Corrupted records will be discarded."
|
||||
}),
|
||||
Writer = emqx_bridge_s3_aggreg_buffer:takeover(Reader0),
|
||||
Writer = emqx_connector_aggreg_buffer:takeover(Reader0),
|
||||
{ok, Writer, NWritten}
|
||||
end.
|
||||
|
||||
|
@ -362,7 +372,7 @@ lookup_current_buffer(Name) ->
|
|||
%%
|
||||
|
||||
enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
|
||||
{ok, Pid} = emqx_bridge_s3_aggreg_upload_sup:start_delivery(Name, Buffer),
|
||||
{ok, Pid} = emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer),
|
||||
MRef = erlang:monitor(process, Pid),
|
||||
St#st{deliveries = Ds#{MRef => Buffer}}.
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_aggreg_buffer_SUITE).
|
||||
-module(emqx_connector_aggreg_buffer_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
@ -29,7 +29,7 @@ t_write_read_cycle(Config) ->
|
|||
Filename = mk_filename(?FUNCTION_NAME, Config),
|
||||
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
|
||||
{ok, WFD} = file:open(Filename, [write, binary]),
|
||||
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
|
||||
Writer = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata),
|
||||
Terms = [
|
||||
[],
|
||||
[[[[[[[[]]]]]]]],
|
||||
|
@ -43,12 +43,12 @@ t_write_read_cycle(Config) ->
|
|||
{<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}
|
||||
],
|
||||
ok = lists:foreach(
|
||||
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end,
|
||||
fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer)) end,
|
||||
Terms
|
||||
),
|
||||
ok = file:close(WFD),
|
||||
{ok, RFD} = file:open(Filename, [read, binary, raw]),
|
||||
{MetadataRead, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD),
|
||||
{MetadataRead, Reader} = emqx_connector_aggreg_buffer:new_reader(RFD),
|
||||
?assertEqual(Metadata, MetadataRead),
|
||||
TermsRead = read_until_eof(Reader),
|
||||
?assertEqual(Terms, TermsRead).
|
||||
|
@ -60,7 +60,7 @@ t_read_empty(Config) ->
|
|||
{ok, RFD} = file:open(Filename, [read, binary]),
|
||||
?assertError(
|
||||
{buffer_incomplete, header},
|
||||
emqx_bridge_s3_aggreg_buffer:new_reader(RFD)
|
||||
emqx_connector_aggreg_buffer:new_reader(RFD)
|
||||
).
|
||||
|
||||
t_read_garbage(Config) ->
|
||||
|
@ -71,14 +71,14 @@ t_read_garbage(Config) ->
|
|||
{ok, RFD} = file:open(Filename, [read, binary]),
|
||||
?assertError(
|
||||
badarg,
|
||||
emqx_bridge_s3_aggreg_buffer:new_reader(RFD)
|
||||
emqx_connector_aggreg_buffer:new_reader(RFD)
|
||||
).
|
||||
|
||||
t_read_truncated(Config) ->
|
||||
Filename = mk_filename(?FUNCTION_NAME, Config),
|
||||
{ok, WFD} = file:open(Filename, [write, binary]),
|
||||
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
|
||||
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
|
||||
Writer = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata),
|
||||
Terms = [
|
||||
[[[[[[[[[[[]]]]]]]]]]],
|
||||
lists:seq(1, 100000),
|
||||
|
@ -88,36 +88,36 @@ t_read_truncated(Config) ->
|
|||
LastTerm =
|
||||
{<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})},
|
||||
ok = lists:foreach(
|
||||
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end,
|
||||
fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer)) end,
|
||||
Terms
|
||||
),
|
||||
{ok, WPos} = file:position(WFD, cur),
|
||||
?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(LastTerm, Writer)),
|
||||
?assertEqual(ok, emqx_connector_aggreg_buffer:write(LastTerm, Writer)),
|
||||
ok = file:close(WFD),
|
||||
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos + 1),
|
||||
ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos + 1),
|
||||
{ok, RFD1} = file:open(Filename, [read, binary]),
|
||||
{Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD1),
|
||||
{Metadata, Reader0} = emqx_connector_aggreg_buffer:new_reader(RFD1),
|
||||
{ReadTerms1, Reader1} = read_terms(length(Terms), Reader0),
|
||||
?assertEqual(Terms, ReadTerms1),
|
||||
?assertError(
|
||||
badarg,
|
||||
emqx_bridge_s3_aggreg_buffer:read(Reader1)
|
||||
emqx_connector_aggreg_buffer:read(Reader1)
|
||||
),
|
||||
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2),
|
||||
ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos div 2),
|
||||
{ok, RFD2} = file:open(Filename, [read, binary]),
|
||||
{Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD2),
|
||||
{Metadata, Reader2} = emqx_connector_aggreg_buffer:new_reader(RFD2),
|
||||
{ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2),
|
||||
?assertEqual(lists:sublist(Terms, 3), ReadTerms2),
|
||||
?assertError(
|
||||
badarg,
|
||||
emqx_bridge_s3_aggreg_buffer:read(Reader3)
|
||||
emqx_connector_aggreg_buffer:read(Reader3)
|
||||
).
|
||||
|
||||
t_read_truncated_takeover_write(Config) ->
|
||||
Filename = mk_filename(?FUNCTION_NAME, Config),
|
||||
{ok, WFD} = file:open(Filename, [write, binary]),
|
||||
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
|
||||
Writer1 = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
|
||||
Writer1 = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata),
|
||||
Terms1 = [
|
||||
[[[[[[[[[[[]]]]]]]]]]],
|
||||
lists:seq(1, 10000),
|
||||
|
@ -129,14 +129,14 @@ t_read_truncated_takeover_write(Config) ->
|
|||
{<<"application/x-octet-stream">>, rand:bytes(102400)}
|
||||
],
|
||||
ok = lists:foreach(
|
||||
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer1)) end,
|
||||
fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer1)) end,
|
||||
Terms1
|
||||
),
|
||||
{ok, WPos} = file:position(WFD, cur),
|
||||
ok = file:close(WFD),
|
||||
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2),
|
||||
ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos div 2),
|
||||
{ok, RWFD} = file:open(Filename, [read, write, binary]),
|
||||
{Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RWFD),
|
||||
{Metadata, Reader0} = emqx_connector_aggreg_buffer:new_reader(RWFD),
|
||||
{ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0),
|
||||
?assertEqual(
|
||||
lists:sublist(Terms1, 3),
|
||||
|
@ -144,16 +144,16 @@ t_read_truncated_takeover_write(Config) ->
|
|||
),
|
||||
?assertError(
|
||||
badarg,
|
||||
emqx_bridge_s3_aggreg_buffer:read(Reader1)
|
||||
emqx_connector_aggreg_buffer:read(Reader1)
|
||||
),
|
||||
Writer2 = emqx_bridge_s3_aggreg_buffer:takeover(Reader1),
|
||||
Writer2 = emqx_connector_aggreg_buffer:takeover(Reader1),
|
||||
ok = lists:foreach(
|
||||
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer2)) end,
|
||||
fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer2)) end,
|
||||
Terms2
|
||||
),
|
||||
ok = file:close(RWFD),
|
||||
{ok, RFD} = file:open(Filename, [read, binary]),
|
||||
{Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD),
|
||||
{Metadata, Reader2} = emqx_connector_aggreg_buffer:new_reader(RFD),
|
||||
ReadTerms2 = read_until_eof(Reader2),
|
||||
?assertEqual(
|
||||
lists:sublist(Terms1, 3) ++ Terms2,
|
||||
|
@ -168,12 +168,12 @@ mk_filename(Name, Config) ->
|
|||
read_terms(0, Reader) ->
|
||||
{[], Reader};
|
||||
read_terms(N, Reader0) ->
|
||||
{Term, Reader1} = emqx_bridge_s3_aggreg_buffer:read(Reader0),
|
||||
{Term, Reader1} = emqx_connector_aggreg_buffer:read(Reader0),
|
||||
{Terms, Reader} = read_terms(N - 1, Reader1),
|
||||
{[Term | Terms], Reader}.
|
||||
|
||||
read_until_eof(Reader0) ->
|
||||
case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
|
||||
case emqx_connector_aggreg_buffer:read(Reader0) of
|
||||
{Term, Reader} ->
|
||||
[Term | read_until_eof(Reader)];
|
||||
eof ->
|
|
@ -2,12 +2,12 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_aggreg_csv_tests).
|
||||
-module(emqx_connector_aggreg_csv_tests).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
encoding_test() ->
|
||||
CSV = emqx_bridge_s3_aggreg_csv:new(#{}),
|
||||
CSV = emqx_connector_aggreg_csv:new(#{}),
|
||||
?assertEqual(
|
||||
"A,B,Ç\n"
|
||||
"1.2345,string,0.0\n"
|
||||
|
@ -28,7 +28,7 @@ encoding_test() ->
|
|||
|
||||
column_order_test() ->
|
||||
Order = [<<"ID">>, <<"TS">>],
|
||||
CSV = emqx_bridge_s3_aggreg_csv:new(#{column_order => Order}),
|
||||
CSV = emqx_connector_aggreg_csv:new(#{column_order => Order}),
|
||||
?assertEqual(
|
||||
"ID,TS,A,B,D\n"
|
||||
"1,2024-01-01,12.34,str,\"[]\"\n"
|
||||
|
@ -63,10 +63,10 @@ fill_close(CSV, LRecords) ->
|
|||
string(fill_close_(CSV, LRecords)).
|
||||
|
||||
fill_close_(CSV0, [Records | LRest]) ->
|
||||
{Writes, CSV} = emqx_bridge_s3_aggreg_csv:fill(Records, CSV0),
|
||||
{Writes, CSV} = emqx_connector_aggreg_csv:fill(Records, CSV0),
|
||||
[Writes | fill_close_(CSV, LRest)];
|
||||
fill_close_(CSV, []) ->
|
||||
[emqx_bridge_s3_aggreg_csv:close(CSV)].
|
||||
[emqx_connector_aggreg_csv:close(CSV)].
|
||||
|
||||
string(Writes) ->
|
||||
unicode:characters_to_list(Writes).
|
|
@ -0,0 +1,25 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_connector_aggregator_test_helpers).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
%% API
|
||||
-export([]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% File utilities
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
truncate_at(Filename, Pos) ->
|
||||
{ok, FD} = file:open(Filename, [read, write, binary]),
|
||||
{ok, Pos} = file:position(FD, Pos),
|
||||
ok = file:truncate(FD),
|
||||
ok = file:close(FD).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
|
@ -89,6 +89,7 @@
|
|||
emqx_license,
|
||||
emqx_enterprise,
|
||||
emqx_message_validation,
|
||||
emqx_connector_aggregator,
|
||||
emqx_bridge_kafka,
|
||||
emqx_bridge_pulsar,
|
||||
emqx_bridge_gcp_pubsub,
|
||||
|
|
1
mix.exs
1
mix.exs
|
@ -158,6 +158,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
# need to remove those when listing `/apps/`...
|
||||
defp enterprise_umbrella_apps(_release_type) do
|
||||
MapSet.new([
|
||||
:emqx_connector_aggregator,
|
||||
:emqx_bridge_kafka,
|
||||
:emqx_bridge_confluent,
|
||||
:emqx_bridge_gcp_pubsub,
|
||||
|
|
|
@ -76,6 +76,7 @@ is_cover_enabled() ->
|
|||
is_enterprise(ce) -> false;
|
||||
is_enterprise(ee) -> true.
|
||||
|
||||
is_community_umbrella_app("apps/emqx_connector_aggregator") -> false;
|
||||
is_community_umbrella_app("apps/emqx_bridge_kafka") -> false;
|
||||
is_community_umbrella_app("apps/emqx_bridge_confluent") -> false;
|
||||
is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false;
|
||||
|
|
Loading…
Reference in New Issue