diff --git a/apps/emqx_bridge_s3/rebar.config b/apps/emqx_bridge_s3/rebar.config index 51bf0e0b6..34f7bb51e 100644 --- a/apps/emqx_bridge_s3/rebar.config +++ b/apps/emqx_bridge_s3/rebar.config @@ -2,5 +2,6 @@ {erl_opts, [debug_info]}. {deps, [ - {emqx_resource, {path, "../../apps/emqx_resource"}} + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}} ]}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index 5cdf3fb82..46f8db64b 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -7,6 +7,7 @@ stdlib, erlcloud, emqx_resource, + emqx_connector_aggregator, emqx_s3 ]}, {env, [ @@ -18,7 +19,6 @@ emqx_bridge_s3_connector_info ]} ]}, - {mod, {emqx_bridge_s3_app, []}}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl deleted file mode 100644 index 02099dbec..000000000 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl +++ /dev/null @@ -1,212 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - -%% This module takes aggregated records from a buffer and delivers them to S3, -%% wrapped in a configurable container (though currently there's only CSV). --module(emqx_bridge_s3_aggreg_delivery). - --include_lib("snabbkaffe/include/trace.hrl"). --include("emqx_bridge_s3_aggregator.hrl"). - --export([start_link/3]). - -%% Internal exports --export([ - init/4, - loop/3 -]). - --behaviour(emqx_template). --export([lookup/2]). - -%% Sys --export([ - system_continue/3, - system_terminate/4, - format_status/2 -]). - --record(delivery, { - name :: _Name, - container :: emqx_bridge_s3_aggreg_csv:container(), - reader :: emqx_bridge_s3_aggreg_buffer:reader(), - upload :: emqx_s3_upload:t(), - empty :: boolean() -}). - --type state() :: #delivery{}. - -%% - -start_link(Name, Buffer, Opts) -> - proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). - -%% - --spec init(pid(), _Name, buffer(), _Opts :: map()) -> no_return(). -init(Parent, Name, Buffer, Opts) -> - ?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}), - Reader = open_buffer(Buffer), - Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), - _ = erlang:process_flag(trap_exit, true), - ok = proc_lib:init_ack({ok, self()}), - loop(Delivery, Parent, []). - -init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) -> - #delivery{ - name = Name, - container = mk_container(ContainerOpts), - reader = Reader, - upload = mk_upload(Buffer, Opts), - empty = true - }. - -open_buffer(#buffer{filename = Filename}) -> - case file:open(Filename, [read, binary, raw]) of - {ok, FD} -> - {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD), - Reader; - {error, Reason} -> - error({buffer_open_failed, Reason}) - end. - -mk_container(#{type := csv, column_order := OrderOpt}) -> - %% TODO: Deduplicate? - ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt), - emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}). - -mk_upload( - Buffer, - Opts = #{ - bucket := Bucket, - upload_options := UploadOpts, - client_config := Config, - uploader_config := UploaderConfig - } -) -> - Client = emqx_s3_client:create(Bucket, Config), - Key = mk_object_key(Buffer, Opts), - emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). - -mk_object_key(Buffer, #{action := Name, key := Template}) -> - emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}). - -%% - --spec loop(state(), pid(), [sys:debug_option()]) -> no_return(). -loop(Delivery, Parent, Debug) -> - %% NOTE: This function is mocked in tests. - receive - Msg -> handle_msg(Msg, Delivery, Parent, Debug) - after 0 -> - process_delivery(Delivery, Parent, Debug) - end. - -process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) -> - case emqx_bridge_s3_aggreg_buffer:read(Reader0) of - {Records = [#{} | _], Reader} -> - Delivery1 = Delivery0#delivery{reader = Reader}, - Delivery2 = process_append_records(Records, Delivery1), - Delivery = process_write(Delivery2), - loop(Delivery, Parent, Debug); - {[], Reader} -> - Delivery = Delivery0#delivery{reader = Reader}, - loop(Delivery, Parent, Debug); - eof -> - process_complete(Delivery0); - {Unexpected, _Reader} -> - exit({buffer_unexpected_record, Unexpected}) - end. - -process_append_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) -> - {Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0), - {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), - Delivery#delivery{ - container = Container, - upload = Upload, - empty = false - }. - -process_write(Delivery = #delivery{upload = Upload0}) -> - case emqx_s3_upload:write(Upload0) of - {ok, Upload} -> - Delivery#delivery{upload = Upload}; - {cont, Upload} -> - process_write(Delivery#delivery{upload = Upload}); - {error, Reason} -> - _ = emqx_s3_upload:abort(Upload0), - exit({upload_failed, Reason}) - end. - -process_complete(#delivery{name = Name, empty = true}) -> - ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}), - exit({shutdown, {skipped, empty}}); -process_complete(#delivery{name = Name, container = Container, upload = Upload0}) -> - Trailer = emqx_bridge_s3_aggreg_csv:close(Container), - {ok, Upload} = emqx_s3_upload:append(Trailer, Upload0), - case emqx_s3_upload:complete(Upload) of - {ok, Completed} -> - ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}), - ok; - {error, Reason} -> - _ = emqx_s3_upload:abort(Upload), - exit({upload_failed, Reason}) - end. - -%% - -handle_msg({system, From, Msg}, Delivery, Parent, Debug) -> - sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery); -handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) -> - system_terminate(Reason, Parent, Debug, Delivery); -handle_msg(_Msg, Delivery, Parent, Debug) -> - loop(Parent, Debug, Delivery). - --spec system_continue(pid(), [sys:debug_option()], state()) -> no_return(). -system_continue(Parent, Debug, Delivery) -> - loop(Delivery, Parent, Debug). - --spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _. -system_terminate(_Reason, _Parent, _Debug, #delivery{upload = Upload}) -> - emqx_s3_upload:abort(Upload). - --spec format_status(normal, Args :: [term()]) -> _StateFormatted. -format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) -> - Delivery#delivery{ - upload = emqx_s3_upload:format(Delivery#delivery.upload) - }. - -%% - --spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> - {ok, integer() | string()} | {error, undefined}. -lookup([<<"action">>], {Name, _Buffer}) -> - {ok, mk_fs_safe_string(Name)}; -lookup(Accessor, {_Name, Buffer = #buffer{}}) -> - lookup_buffer_var(Accessor, Buffer); -lookup(_Accessor, _Context) -> - {error, undefined}. - -lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) -> - {ok, format_timestamp(Since, Format)}; -lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) -> - {ok, format_timestamp(Until, Format)}; -lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) -> - {ok, Seq}; -lookup_buffer_var([<<"node">>], #buffer{}) -> - {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; -lookup_buffer_var(_Binding, _Context) -> - {error, undefined}. - -format_timestamp(Timestamp, <<"rfc3339utc">>) -> - String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]), - mk_fs_safe_string(String); -format_timestamp(Timestamp, <<"rfc3339">>) -> - String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]), - mk_fs_safe_string(String); -format_timestamp(Timestamp, <<"unix">>) -> - Timestamp. - -mk_fs_safe_string(String) -> - unicode:characters_to_binary(string:replace(String, ":", "_", all)). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl deleted file mode 100644 index e5b77f9d6..000000000 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl +++ /dev/null @@ -1,16 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_s3_app). - --behaviour(application). --export([start/2, stop/1]). - -%% - -start(_StartType, _StartArgs) -> - emqx_bridge_s3_sup:start_link(). - -stop(_State) -> - ok. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 83d5c47a9..3bac17cfb 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -8,6 +8,7 @@ -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx/include/emqx_trace.hrl"). +-include_lib("emqx_connector_aggregator/include/emqx_connector_aggregator.hrl"). -include("emqx_bridge_s3.hrl"). -behaviour(emqx_resource). @@ -24,6 +25,19 @@ on_get_channel_status/3 ]). +-behaviour(emqx_connector_aggreg_delivery). +-export([ + init_upload_state/2, + process_append/2, + process_write/1, + process_complete/1, + process_format_status/1, + process_terminate/1 +]). + +-behaviour(emqx_template). +-export([lookup/2]). + -type config() :: #{ access_key_id => string(), secret_access_key => emqx_secret:t(string()), @@ -205,13 +219,14 @@ start_channel(State, #{ key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters), container => Container, upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters), + callback_module => ?MODULE, client_config => maps:get(client_config, State), uploader_config => maps:with([min_part_size, max_part_size], Parameters) }, - _ = emqx_bridge_s3_sup:delete_child({Type, Name}), - {ok, SupPid} = emqx_bridge_s3_sup:start_child(#{ + _ = emqx_connector_aggreg_sup:delete_child({Type, Name}), + {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{ id => {Type, Name}, - start => {emqx_bridge_s3_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, + start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, type => supervisor, restart => permanent }), @@ -220,7 +235,7 @@ start_channel(State, #{ name => Name, bucket => Bucket, supervisor => SupPid, - on_stop => fun() -> emqx_bridge_s3_sup:delete_child({Type, Name}) end + on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end }. upload_options(Parameters) -> @@ -242,7 +257,7 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) -> channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. Timestamp = erlang:system_time(second), - ok = emqx_bridge_s3_aggregator:tick(Name, Timestamp), + ok = emqx_connector_aggregator:tick(Name, Timestamp), ok = check_bucket_accessible(Bucket, State), ok = check_aggreg_upload_errors(Name), ?status_connected. @@ -264,7 +279,7 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> end. check_aggreg_upload_errors(Name) -> - case emqx_bridge_s3_aggregator:take_error(Name) of + case emqx_connector_aggregator:take_error(Name) of [Error] -> %% TODO %% This approach means that, for example, 3 upload failures will cause @@ -340,7 +355,7 @@ run_simple_upload( run_aggregated_upload(InstId, Records, #{name := Name}) -> Timestamp = erlang:system_time(second), - case emqx_bridge_s3_aggregator:push_records(Name, Timestamp, Records) of + case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of ok -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), ok; @@ -376,3 +391,83 @@ render_content(Template, Data) -> iolist_to_string(IOList) -> unicode:characters_to_list(IOList). + +%% `emqx_connector_aggreg_delivery` APIs + +-spec init_upload_state(buffer(), map()) -> emqx_s3_upload:t(). +init_upload_state(Buffer, Opts) -> + #{ + bucket := Bucket, + upload_options := UploadOpts, + client_config := Config, + uploader_config := UploaderConfig + } = Opts, + Client = emqx_s3_client:create(Bucket, Config), + Key = mk_object_key(Buffer, Opts), + emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). + +mk_object_key(Buffer, #{action := Name, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}). + +process_append(Writes, Upload) -> + emqx_s3_upload:append(Writes, Upload). + +process_write(Upload0) -> + case emqx_s3_upload:write(Upload0) of + {ok, Upload} -> + Upload; + {cont, Upload} -> + process_write(Upload); + {error, Reason} -> + _ = emqx_s3_upload:abort(Upload0), + exit({upload_failed, Reason}) + end. + +process_complete(Upload) -> + case emqx_s3_upload:complete(Upload) of + {ok, Completed} -> + {ok, Completed}; + {error, Reason} -> + _ = emqx_s3_upload:abort(Upload), + exit({upload_failed, Reason}) + end. + +process_format_status(Upload) -> + emqx_s3_upload:format(Upload). + +process_terminate(Upload) -> + emqx_s3_upload:abort(Upload). + +%% `emqx_template` APIs + +-spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> + {ok, integer() | string()} | {error, undefined}. +lookup([<<"action">>], {Name, _Buffer}) -> + {ok, mk_fs_safe_string(Name)}; +lookup(Accessor, {_Name, Buffer = #buffer{}}) -> + lookup_buffer_var(Accessor, Buffer); +lookup(_Accessor, _Context) -> + {error, undefined}. + +lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) -> + {ok, format_timestamp(Since, Format)}; +lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) -> + {ok, format_timestamp(Until, Format)}; +lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) -> + {ok, Seq}; +lookup_buffer_var([<<"node">>], #buffer{}) -> + {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; +lookup_buffer_var(_Binding, _Context) -> + {error, undefined}. + +format_timestamp(Timestamp, <<"rfc3339utc">>) -> + String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]), + mk_fs_safe_string(String); +format_timestamp(Timestamp, <<"rfc3339">>) -> + String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]), + mk_fs_safe_string(String); +format_timestamp(Timestamp, <<"unix">>) -> + Timestamp. + +mk_fs_safe_string(String) -> + unicode:characters_to_binary(string:replace(String, ":", "_", all)). diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index 6577b45ed..0ae34486f 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -170,7 +170,7 @@ t_aggreg_upload(Config) -> ]), ok = send_messages(BridgeName, MessageEvents), %% Wait until the delivery is completed. - ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), ?assertMatch( @@ -217,7 +217,7 @@ t_aggreg_upload_rule(Config) -> emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) ]), - ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -258,15 +258,15 @@ t_aggreg_upload_restart(Config) -> {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} ]), ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Send some more messages. ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), %% Check there's still only one upload. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), @@ -300,18 +300,18 @@ t_aggreg_upload_restart_corrupted(Config) -> %% Ensure that they span multiple batch queries. ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), {ok, _} = ?block_until( - #{?snk_kind := s3_aggreg_records_written, action := BridgeName}, + #{?snk_kind := connector_aggreg_records_written, action := BridgeName}, infinity, 0 ), %% Find out the buffer file. {ok, #{filename := Filename}} = ?block_until( - #{?snk_kind := s3_aggreg_buffer_allocated, action := BridgeName} + #{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName} ), %% Stop the bridge, corrupt the buffer file, and restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), BufferFileSize = filelib:file_size(Filename), - ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, BufferFileSize div 2), + ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, BufferFileSize div 2), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Send some more messages. Messages2 = [ @@ -320,7 +320,7 @@ t_aggreg_upload_restart_corrupted(Config) -> ], ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), %% Check that upload contains part of the first batch and all of the second batch. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -362,7 +362,7 @@ t_aggreg_pending_upload_restart(Config) -> %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), %% Check that delivery contains all the messages. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -392,7 +392,9 @@ t_aggreg_next_rotate(Config) -> NSent = receive_sender_reports(Senders), %% Wait for the last delivery to complete. ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), - ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}, infinity, 0), + ?block_until( + #{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0 + ), %% There should be at least 2 time windows of aggregated records. Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]], diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl index 21729369b..5ebeaf681 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl @@ -48,11 +48,3 @@ list_pending_uploads(Bucket, Key) -> {ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig), Uploads = proplists:get_value(uploads, Props), lists:map(fun maps:from_list/1, Uploads). - -%% File utilities - -truncate_at(Filename, Pos) -> - {ok, FD} = file:open(Filename, [read, write, binary]), - {ok, Pos} = file:position(FD, Pos), - ok = file:truncate(FD), - ok = file:close(FD). diff --git a/apps/emqx_connector_aggregator/BSL.txt b/apps/emqx_connector_aggregator/BSL.txt new file mode 100644 index 000000000..f0cd31c6f --- /dev/null +++ b/apps/emqx_connector_aggregator/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2028-01-26 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_connector_aggregator/README.md b/apps/emqx_connector_aggregator/README.md new file mode 100644 index 000000000..e495cc651 --- /dev/null +++ b/apps/emqx_connector_aggregator/README.md @@ -0,0 +1,11 @@ +# EMQX Connector Aggregator + +This application provides common logic for connector and action implementations of EMQX to aggregate multiple incoming messsages into a container file before sending it to a blob storage backend. + +## Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +## License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl b/apps/emqx_connector_aggregator/include/emqx_connector_aggregator.hrl similarity index 81% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl rename to apps/emqx_connector_aggregator/include/emqx_connector_aggregator.hrl index 7ac62c6b5..eeef351f2 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl +++ b/apps/emqx_connector_aggregator/include/emqx_connector_aggregator.hrl @@ -3,8 +3,8 @@ %%-------------------------------------------------------------------- -record(buffer, { - since :: emqx_bridge_s3_aggregator:timestamp(), - until :: emqx_bridge_s3_aggregator:timestamp(), + since :: emqx_connector_aggregator:timestamp(), + until :: emqx_connector_aggregator:timestamp(), seq :: non_neg_integer(), filename :: file:filename(), fd :: file:io_device() | undefined, diff --git a/apps/emqx_connector_aggregator/rebar.config b/apps/emqx_connector_aggregator/rebar.config new file mode 100644 index 000000000..592bd4e41 --- /dev/null +++ b/apps/emqx_connector_aggregator/rebar.config @@ -0,0 +1,7 @@ +%% -*- mode: erlang; -*- + +{deps, [ + {emqx, {path, "../../apps/emqx"}} +]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl new file mode 100644 index 000000000..ce6e11ab2 --- /dev/null +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl @@ -0,0 +1,25 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_connector_aggreg_app). + +-behaviour(application). +-export([start/2, stop/1]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% `application` API +%%------------------------------------------------------------------------------ + +start(_StartType, _StartArgs) -> + emqx_connector_aggreg_sup:start_link(). + +stop(_State) -> + ok. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_buffer.erl similarity index 99% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl rename to apps/emqx_connector_aggregator/src/emqx_connector_aggreg_buffer.erl index 503b864a7..3d0aa3157 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_buffer.erl @@ -17,7 +17,7 @@ %% ... %% ``` %% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`). --module(emqx_bridge_s3_aggreg_buffer). +-module(emqx_connector_aggreg_buffer). -export([ new_writer/2, diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_csv.erl similarity index 95% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl rename to apps/emqx_connector_aggregator/src/emqx_connector_aggreg_csv.erl index 33895d8c1..bfca6f336 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_csv.erl @@ -2,8 +2,8 @@ %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -%% CSV container implementation for `emqx_bridge_s3_aggregator`. --module(emqx_bridge_s3_aggreg_csv). +%% CSV container implementation for `emqx_connector_aggregator`. +-module(emqx_connector_aggreg_csv). %% Container API -export([ @@ -33,7 +33,7 @@ column_order => [column()] }. --type record() :: emqx_bridge_s3_aggregator:record(). +-type record() :: emqx_connector_aggregator:record(). -type column() :: binary(). %% diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl new file mode 100644 index 000000000..ed74311b5 --- /dev/null +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -0,0 +1,183 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% This module takes aggregated records from a buffer and delivers them to a blob storage +%% backend, wrapped in a configurable container (though currently there's only CSV). +-module(emqx_connector_aggreg_delivery). + +-include_lib("snabbkaffe/include/trace.hrl"). +-include("emqx_connector_aggregator.hrl"). + +-export([start_link/3]). + +%% Internal exports +-export([ + init/4, + loop/3 +]). + +%% Sys +-export([ + system_continue/3, + system_terminate/4, + format_status/2 +]). + +-record(delivery, { + name :: _Name, + callback_module :: module(), + container :: emqx_connector_aggreg_csv:container(), + reader :: emqx_connector_aggreg_buffer:reader(), + upload :: impl_specific_upload_state(), + empty :: boolean() +}). + +-type state() :: #delivery{}. + +-type init_opts() :: #{ + callback_module := module(), + any() => term() +}. + +-type impl_specific_upload_state() :: term(). + +-callback init_upload_state(buffer(), map()) -> impl_specific_upload_state(). + +-callback process_append(iodata(), impl_specific_upload_state()) -> + {ok, impl_specific_upload_state()}. + +-callback process_write(impl_specific_upload_state()) -> impl_specific_upload_state(). + +-callback process_complete(impl_specific_upload_state()) -> {ok, term()}. + +%% + +start_link(Name, Buffer, Opts) -> + proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). + +%% + +-spec init(pid(), _Name, buffer(), init_opts()) -> no_return(). +init(Parent, Name, Buffer, Opts) -> + ?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}), + Reader = open_buffer(Buffer), + Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), + _ = erlang:process_flag(trap_exit, true), + ok = proc_lib:init_ack({ok, self()}), + loop(Delivery, Parent, []). + +init_delivery( + Name, + Reader, + Buffer, + Opts = #{ + container := ContainerOpts, + callback_module := Mod + } +) -> + #delivery{ + name = Name, + callback_module = Mod, + container = mk_container(ContainerOpts), + reader = Reader, + upload = Mod:init_upload_state(Buffer, Opts), + empty = true + }. + +open_buffer(#buffer{filename = Filename}) -> + case file:open(Filename, [read, binary, raw]) of + {ok, FD} -> + {_Meta, Reader} = emqx_connector_aggreg_buffer:new_reader(FD), + Reader; + {error, Reason} -> + error({buffer_open_failed, Reason}) + end. + +mk_container(#{type := csv, column_order := OrderOpt}) -> + %% TODO: Deduplicate? + ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt), + emqx_connector_aggreg_csv:new(#{column_order => ColumnOrder}). + +%% + +-spec loop(state(), pid(), [sys:debug_option()]) -> no_return(). +loop(Delivery, Parent, Debug) -> + %% NOTE: This function is mocked in tests. + receive + Msg -> handle_msg(Msg, Delivery, Parent, Debug) + after 0 -> + process_delivery(Delivery, Parent, Debug) + end. + +process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) -> + case emqx_connector_aggreg_buffer:read(Reader0) of + {Records = [#{} | _], Reader} -> + Delivery1 = Delivery0#delivery{reader = Reader}, + Delivery2 = process_append_records(Records, Delivery1), + Delivery = process_write(Delivery2), + loop(Delivery, Parent, Debug); + {[], Reader} -> + Delivery = Delivery0#delivery{reader = Reader}, + loop(Delivery, Parent, Debug); + eof -> + process_complete(Delivery0); + {Unexpected, _Reader} -> + exit({buffer_unexpected_record, Unexpected}) + end. + +process_append_records( + Records, + Delivery = #delivery{ + callback_module = Mod, + container = Container0, + upload = Upload0 + } +) -> + {Writes, Container} = emqx_connector_aggreg_csv:fill(Records, Container0), + {ok, Upload} = Mod:process_append(Writes, Upload0), + Delivery#delivery{ + container = Container, + upload = Upload, + empty = false + }. + +process_write(Delivery = #delivery{callback_module = Mod, upload = Upload0}) -> + Upload = Mod:process_write(Upload0), + Delivery#delivery{upload = Upload}. + +process_complete(#delivery{name = Name, empty = true}) -> + ?tp(connector_aggreg_delivery_completed, #{action => Name, upload => empty}), + exit({shutdown, {skipped, empty}}); +process_complete(#delivery{ + name = Name, callback_module = Mod, container = Container, upload = Upload0 +}) -> + Trailer = emqx_connector_aggreg_csv:close(Container), + {ok, Upload} = Mod:process_append(Trailer, Upload0), + {ok, Completed} = Mod:process_complete(Upload), + ?tp(connector_aggreg_delivery_completed, #{action => Name, upload => Completed}), + ok. + +%% + +handle_msg({system, From, Msg}, Delivery, Parent, Debug) -> + sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery); +handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) -> + system_terminate(Reason, Parent, Debug, Delivery); +handle_msg(_Msg, Delivery, Parent, Debug) -> + loop(Parent, Debug, Delivery). + +-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return(). +system_continue(Parent, Debug, Delivery) -> + loop(Delivery, Parent, Debug). + +-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _. +system_terminate(_Reason, _Parent, _Debug, #delivery{callback_module = Mod, upload = Upload}) -> + Mod:process_terminate(Upload). + +-spec format_status(normal, Args :: [term()]) -> _StateFormatted. +format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) -> + #delivery{callback_module = Mod} = Delivery, + Delivery#delivery{ + upload = Mod:process_format_status(Delivery#delivery.upload) + }. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl similarity index 95% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl rename to apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl index 230711a76..e80652542 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_s3_sup). +-module(emqx_connector_aggreg_sup). -export([ start_link/0, diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_upload_sup.erl similarity index 91% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl rename to apps/emqx_connector_aggregator/src/emqx_connector_aggreg_upload_sup.erl index 973187b7e..d08b3b8ed 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_upload_sup.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_s3_aggreg_upload_sup). +-module(emqx_connector_aggreg_upload_sup). -export([ start_link/3, @@ -33,7 +33,7 @@ start_delivery(Name, Buffer) -> supervisor:start_child(?SUPREF(Name), [Buffer]). start_delivery_proc(Name, DeliveryOpts, Buffer) -> - emqx_bridge_s3_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts). + emqx_connector_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts). %% @@ -45,7 +45,7 @@ init({root, Name, AggregOpts, DeliveryOpts}) -> }, AggregatorChildSpec = #{ id => aggregator, - start => {emqx_bridge_s3_aggregator, start_link, [Name, AggregOpts]}, + start => {emqx_connector_aggregator, start_link, [Name, AggregOpts]}, type => worker, restart => permanent }, diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src new file mode 100644 index 000000000..550bf96a5 --- /dev/null +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src @@ -0,0 +1,14 @@ +{application, emqx_connector_aggregator, [ + {description, "EMQX Enterprise Connector Data Aggregator"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_resource + ]}, + {env, []}, + {mod, {emqx_connector_aggreg_app, []}}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl similarity index 95% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl rename to apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl index 47ecdeb4a..01b330646 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl @@ -5,12 +5,12 @@ %% This module manages buffers for aggregating records and offloads them %% to separate "delivery" processes when they are full or time interval %% is over. --module(emqx_bridge_s3_aggregator). +-module(emqx_connector_aggregator). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). --include("emqx_bridge_s3_aggregator.hrl"). +-include("emqx_connector_aggregator.hrl"). -export([ start_link/2, @@ -90,9 +90,9 @@ write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records) end. write_records(Name, Buffer = #buffer{fd = Writer}, Records) -> - case emqx_bridge_s3_aggreg_buffer:write(Records, Writer) of + case emqx_connector_aggreg_buffer:write(Records, Writer) of ok -> - ?tp(s3_aggreg_records_written, #{action => Name, records => Records}), + ?tp(connector_aggreg_records_written, #{action => Name, records => Records}), ok; {error, terminated} -> BufferNext = rotate_buffer(Name, Buffer), @@ -250,9 +250,9 @@ compute_since(Timestamp, PrevSince, Interval) -> allocate_buffer(Since, Seq, St = #st{name = Name}) -> Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St), {ok, FD} = file:open(Filename, [write, binary]), - Writer = emqx_bridge_s3_aggreg_buffer:new_writer(FD, _Meta = []), + Writer = emqx_connector_aggreg_buffer:new_writer(FD, _Meta = []), _ = add_counter(Counter), - ?tp(s3_aggreg_buffer_allocated, #{action => Name, filename => Filename}), + ?tp(connector_aggreg_buffer_allocated, #{action => Name, filename => Filename}), Buffer#buffer{fd = Writer}. recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) -> @@ -274,7 +274,7 @@ recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) -> end. recover_buffer_writer(FD, Filename) -> - try emqx_bridge_s3_aggreg_buffer:new_reader(FD) of + try emqx_connector_aggreg_buffer:new_reader(FD) of {_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0) catch error:Reason -> @@ -282,7 +282,7 @@ recover_buffer_writer(FD, Filename) -> end. recover_buffer_writer(FD, Filename, Reader0, NWritten) -> - try emqx_bridge_s3_aggreg_buffer:read(Reader0) of + try emqx_connector_aggreg_buffer:read(Reader0) of {Records, Reader} when is_list(Records) -> recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records)); {Unexpected, _Reader} -> @@ -303,7 +303,7 @@ recover_buffer_writer(FD, Filename, Reader0, NWritten) -> "Buffer is truncated or corrupted somewhere in the middle. " "Corrupted records will be discarded." }), - Writer = emqx_bridge_s3_aggreg_buffer:takeover(Reader0), + Writer = emqx_connector_aggreg_buffer:takeover(Reader0), {ok, Writer, NWritten} end. @@ -362,7 +362,7 @@ lookup_current_buffer(Name) -> %% enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) -> - {ok, Pid} = emqx_bridge_s3_aggreg_upload_sup:start_delivery(Name, Buffer), + {ok, Pid} = emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer), MRef = erlang:monitor(process, Pid), St#st{deliveries = Ds#{MRef => Buffer}}. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl b/apps/emqx_connector_aggregator/test/emqx_connector_aggreg_buffer_SUITE.erl similarity index 75% rename from apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl rename to apps/emqx_connector_aggregator/test/emqx_connector_aggreg_buffer_SUITE.erl index 199e070d3..0c172cc83 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl +++ b/apps/emqx_connector_aggregator/test/emqx_connector_aggreg_buffer_SUITE.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_s3_aggreg_buffer_SUITE). +-module(emqx_connector_aggreg_buffer_SUITE). -compile(nowarn_export_all). -compile(export_all). @@ -29,7 +29,7 @@ t_write_read_cycle(Config) -> Filename = mk_filename(?FUNCTION_NAME, Config), Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, {ok, WFD} = file:open(Filename, [write, binary]), - Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Writer = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata), Terms = [ [], [[[[[[[[]]]]]]]], @@ -43,12 +43,12 @@ t_write_read_cycle(Config) -> {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})} ], ok = lists:foreach( - fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, + fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer)) end, Terms ), ok = file:close(WFD), {ok, RFD} = file:open(Filename, [read, binary, raw]), - {MetadataRead, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), + {MetadataRead, Reader} = emqx_connector_aggreg_buffer:new_reader(RFD), ?assertEqual(Metadata, MetadataRead), TermsRead = read_until_eof(Reader), ?assertEqual(Terms, TermsRead). @@ -60,7 +60,7 @@ t_read_empty(Config) -> {ok, RFD} = file:open(Filename, [read, binary]), ?assertError( {buffer_incomplete, header}, - emqx_bridge_s3_aggreg_buffer:new_reader(RFD) + emqx_connector_aggreg_buffer:new_reader(RFD) ). t_read_garbage(Config) -> @@ -71,14 +71,14 @@ t_read_garbage(Config) -> {ok, RFD} = file:open(Filename, [read, binary]), ?assertError( badarg, - emqx_bridge_s3_aggreg_buffer:new_reader(RFD) + emqx_connector_aggreg_buffer:new_reader(RFD) ). t_read_truncated(Config) -> Filename = mk_filename(?FUNCTION_NAME, Config), {ok, WFD} = file:open(Filename, [write, binary]), Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, - Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Writer = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata), Terms = [ [[[[[[[[[[[]]]]]]]]]]], lists:seq(1, 100000), @@ -88,36 +88,36 @@ t_read_truncated(Config) -> LastTerm = {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}, ok = lists:foreach( - fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, + fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer)) end, Terms ), {ok, WPos} = file:position(WFD, cur), - ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(LastTerm, Writer)), + ?assertEqual(ok, emqx_connector_aggreg_buffer:write(LastTerm, Writer)), ok = file:close(WFD), - ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos + 1), + ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos + 1), {ok, RFD1} = file:open(Filename, [read, binary]), - {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD1), + {Metadata, Reader0} = emqx_connector_aggreg_buffer:new_reader(RFD1), {ReadTerms1, Reader1} = read_terms(length(Terms), Reader0), ?assertEqual(Terms, ReadTerms1), ?assertError( badarg, - emqx_bridge_s3_aggreg_buffer:read(Reader1) + emqx_connector_aggreg_buffer:read(Reader1) ), - ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), + ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos div 2), {ok, RFD2} = file:open(Filename, [read, binary]), - {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD2), + {Metadata, Reader2} = emqx_connector_aggreg_buffer:new_reader(RFD2), {ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2), ?assertEqual(lists:sublist(Terms, 3), ReadTerms2), ?assertError( badarg, - emqx_bridge_s3_aggreg_buffer:read(Reader3) + emqx_connector_aggreg_buffer:read(Reader3) ). t_read_truncated_takeover_write(Config) -> Filename = mk_filename(?FUNCTION_NAME, Config), {ok, WFD} = file:open(Filename, [write, binary]), Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, - Writer1 = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Writer1 = emqx_connector_aggreg_buffer:new_writer(WFD, Metadata), Terms1 = [ [[[[[[[[[[[]]]]]]]]]]], lists:seq(1, 10000), @@ -129,14 +129,14 @@ t_read_truncated_takeover_write(Config) -> {<<"application/x-octet-stream">>, rand:bytes(102400)} ], ok = lists:foreach( - fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer1)) end, + fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer1)) end, Terms1 ), {ok, WPos} = file:position(WFD, cur), ok = file:close(WFD), - ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), + ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, WPos div 2), {ok, RWFD} = file:open(Filename, [read, write, binary]), - {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RWFD), + {Metadata, Reader0} = emqx_connector_aggreg_buffer:new_reader(RWFD), {ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0), ?assertEqual( lists:sublist(Terms1, 3), @@ -144,16 +144,16 @@ t_read_truncated_takeover_write(Config) -> ), ?assertError( badarg, - emqx_bridge_s3_aggreg_buffer:read(Reader1) + emqx_connector_aggreg_buffer:read(Reader1) ), - Writer2 = emqx_bridge_s3_aggreg_buffer:takeover(Reader1), + Writer2 = emqx_connector_aggreg_buffer:takeover(Reader1), ok = lists:foreach( - fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer2)) end, + fun(T) -> ?assertEqual(ok, emqx_connector_aggreg_buffer:write(T, Writer2)) end, Terms2 ), ok = file:close(RWFD), {ok, RFD} = file:open(Filename, [read, binary]), - {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), + {Metadata, Reader2} = emqx_connector_aggreg_buffer:new_reader(RFD), ReadTerms2 = read_until_eof(Reader2), ?assertEqual( lists:sublist(Terms1, 3) ++ Terms2, @@ -168,12 +168,12 @@ mk_filename(Name, Config) -> read_terms(0, Reader) -> {[], Reader}; read_terms(N, Reader0) -> - {Term, Reader1} = emqx_bridge_s3_aggreg_buffer:read(Reader0), + {Term, Reader1} = emqx_connector_aggreg_buffer:read(Reader0), {Terms, Reader} = read_terms(N - 1, Reader1), {[Term | Terms], Reader}. read_until_eof(Reader0) -> - case emqx_bridge_s3_aggreg_buffer:read(Reader0) of + case emqx_connector_aggreg_buffer:read(Reader0) of {Term, Reader} -> [Term | read_until_eof(Reader)]; eof -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl b/apps/emqx_connector_aggregator/test/emqx_connector_aggreg_csv_tests.erl similarity index 89% rename from apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl rename to apps/emqx_connector_aggregator/test/emqx_connector_aggreg_csv_tests.erl index 6da70c6fe..cd7ac8bcb 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl +++ b/apps/emqx_connector_aggregator/test/emqx_connector_aggreg_csv_tests.erl @@ -2,12 +2,12 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_s3_aggreg_csv_tests). +-module(emqx_connector_aggreg_csv_tests). -include_lib("eunit/include/eunit.hrl"). encoding_test() -> - CSV = emqx_bridge_s3_aggreg_csv:new(#{}), + CSV = emqx_connector_aggreg_csv:new(#{}), ?assertEqual( "A,B,Ç\n" "1.2345,string,0.0\n" @@ -28,7 +28,7 @@ encoding_test() -> column_order_test() -> Order = [<<"ID">>, <<"TS">>], - CSV = emqx_bridge_s3_aggreg_csv:new(#{column_order => Order}), + CSV = emqx_connector_aggreg_csv:new(#{column_order => Order}), ?assertEqual( "ID,TS,A,B,D\n" "1,2024-01-01,12.34,str,\"[]\"\n" @@ -63,10 +63,10 @@ fill_close(CSV, LRecords) -> string(fill_close_(CSV, LRecords)). fill_close_(CSV0, [Records | LRest]) -> - {Writes, CSV} = emqx_bridge_s3_aggreg_csv:fill(Records, CSV0), + {Writes, CSV} = emqx_connector_aggreg_csv:fill(Records, CSV0), [Writes | fill_close_(CSV, LRest)]; fill_close_(CSV, []) -> - [emqx_bridge_s3_aggreg_csv:close(CSV)]. + [emqx_connector_aggreg_csv:close(CSV)]. string(Writes) -> unicode:characters_to_list(Writes). diff --git a/apps/emqx_connector_aggregator/test/emqx_connector_aggregator_test_helpers.erl b/apps/emqx_connector_aggregator/test/emqx_connector_aggregator_test_helpers.erl new file mode 100644 index 000000000..c4e236c49 --- /dev/null +++ b/apps/emqx_connector_aggregator/test/emqx_connector_aggregator_test_helpers.erl @@ -0,0 +1,25 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_connector_aggregator_test_helpers). + +-compile(nowarn_export_all). +-compile(export_all). + +%% API +-export([]). + +%%------------------------------------------------------------------------------ +%% File utilities +%%------------------------------------------------------------------------------ + +truncate_at(Filename, Pos) -> + {ok, FD} = file:open(Filename, [read, write, binary]), + {ok, Pos} = file:position(FD, Pos), + ok = file:truncate(FD), + ok = file:close(FD). + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index ef42c0fc9..6b04e71f6 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -89,6 +89,7 @@ emqx_license, emqx_enterprise, emqx_message_validation, + emqx_connector_aggregator, emqx_bridge_kafka, emqx_bridge_pulsar, emqx_bridge_gcp_pubsub, diff --git a/mix.exs b/mix.exs index 8fb920c87..edcef686d 100644 --- a/mix.exs +++ b/mix.exs @@ -158,6 +158,7 @@ defmodule EMQXUmbrella.MixProject do # need to remove those when listing `/apps/`... defp enterprise_umbrella_apps(_release_type) do MapSet.new([ + :emqx_connector_aggregator, :emqx_bridge_kafka, :emqx_bridge_confluent, :emqx_bridge_gcp_pubsub, diff --git a/rebar.config.erl b/rebar.config.erl index e2e86bc29..399f34b18 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -76,6 +76,7 @@ is_cover_enabled() -> is_enterprise(ce) -> false; is_enterprise(ee) -> true. +is_community_umbrella_app("apps/emqx_connector_aggregator") -> false; is_community_umbrella_app("apps/emqx_bridge_kafka") -> false; is_community_umbrella_app("apps/emqx_bridge_confluent") -> false; is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false;