From 7b80a9aa443849b0161fecb2f0621f606e7ef2b8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 9 May 2024 15:23:58 -0300 Subject: [PATCH] fix(aggregator): namespace aggregator ids with action type Otherwise, actions of different types but same names will clash when starting the aggregator supervision tree. --- .../src/emqx_bridge_s3_connector.erl | 37 +++++++++++-------- .../emqx_bridge_s3_aggreg_upload_SUITE.erl | 29 ++++++++++----- .../src/emqx_connector_aggreg_delivery.erl | 28 +++++++------- 3 files changed, 55 insertions(+), 39 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index f9d3af478..0a155eefb 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -209,6 +209,7 @@ start_channel(State, #{ bucket := Bucket } }) -> + AggregId = {Type, Name}, AggregOpts = #{ time_interval => TimeInterval, max_records => MaxRecords, @@ -223,19 +224,21 @@ start_channel(State, #{ client_config => maps:get(client_config, State), uploader_config => maps:with([min_part_size, max_part_size], Parameters) }, - _ = emqx_connector_aggreg_sup:delete_child({Type, Name}), + _ = emqx_connector_aggreg_sup:delete_child(AggregId), {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{ - id => {Type, Name}, - start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, + id => AggregId, + start => + {emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]}, type => supervisor, restart => permanent }), #{ type => ?ACTION_AGGREGATED_UPLOAD, name => Name, + aggreg_id => AggregId, bucket => Bucket, supervisor => SupPid, - on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end + on_stop => fun() -> emqx_connector_aggreg_sup:delete_child(AggregId) end }. upload_options(Parameters) -> @@ -254,12 +257,14 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) -> %% Since bucket name may be templated, we can't really provide any additional %% information regarding the channel health. ?status_connected; -channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> +channel_status( + #{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State +) -> %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. Timestamp = erlang:system_time(second), - ok = emqx_connector_aggregator:tick(Name, Timestamp), + ok = emqx_connector_aggregator:tick(AggregId, Timestamp), ok = check_bucket_accessible(Bucket, State), - ok = check_aggreg_upload_errors(Name), + ok = check_aggreg_upload_errors(AggregId), ?status_connected. check_bucket_accessible(Bucket, #{client_config := Config}) -> @@ -278,8 +283,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> end end. -check_aggreg_upload_errors(Name) -> - case emqx_connector_aggregator:take_error(Name) of +check_aggreg_upload_errors(AggregId) -> + case emqx_connector_aggregator:take_error(AggregId) of [Error] -> %% TODO %% This approach means that, for example, 3 upload failures will cause @@ -353,11 +358,11 @@ run_simple_upload( {error, map_error(Reason)} end. -run_aggregated_upload(InstId, Records, #{name := Name}) -> +run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> Timestamp = erlang:system_time(second), - case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of + case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of ok -> - ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), + ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ok; {error, Reason} -> {error, {unrecoverable_error, Reason}} @@ -406,8 +411,8 @@ init_transfer_state(BufferMap, Opts) -> Key = mk_object_key(BufferMap, Opts), emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). -mk_object_key(BufferMap, #{action := Name, key := Template}) -> - emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}). +mk_object_key(BufferMap, #{action := AggregId, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}). process_append(Writes, Upload0) -> {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), @@ -443,9 +448,9 @@ process_terminate(Upload) -> -spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> {ok, integer() | string()} | {error, undefined}. -lookup([<<"action">>], {Name, _Buffer}) -> +lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) -> {ok, mk_fs_safe_string(Name)}; -lookup(Accessor, {_Name, Buffer = #{}}) -> +lookup(Accessor, {_AggregId, Buffer = #{}}) -> lookup_buffer_var(Accessor, Buffer); lookup(_Accessor, _Context) -> {error, undefined}. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index 0ae34486f..af121ed8d 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -158,6 +158,7 @@ t_on_get_status(Config) -> t_aggreg_upload(Config) -> Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), BridgeNameString = unicode:characters_to_list(BridgeName), NodeString = atom_to_list(node()), %% Create a bridge with the sample configuration. @@ -170,7 +171,7 @@ t_aggreg_upload(Config) -> ]), ok = send_messages(BridgeName, MessageEvents), %% Wait until the delivery is completed. - ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), ?assertMatch( @@ -196,6 +197,7 @@ t_aggreg_upload(Config) -> t_aggreg_upload_rule(Config) -> Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), ClientID = emqx_utils_conv:bin(?FUNCTION_NAME), %% Create a bridge with the sample configuration and a simple SQL rule. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -217,7 +219,7 @@ t_aggreg_upload_rule(Config) -> emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) ]), - ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -249,6 +251,7 @@ t_aggreg_upload_restart(Config) -> %% after a restart. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), %% Send some sample messages that look like Rule SQL productions. @@ -258,15 +261,15 @@ t_aggreg_upload_restart(Config) -> {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} ]), ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}), %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Send some more messages. ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check there's still only one upload. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), @@ -289,6 +292,7 @@ t_aggreg_upload_restart_corrupted(Config) -> %% and does so while preserving uncompromised data. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), BatchSize = ?CONF_MAX_RECORDS div 2, %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -300,13 +304,13 @@ t_aggreg_upload_restart_corrupted(Config) -> %% Ensure that they span multiple batch queries. ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), {ok, _} = ?block_until( - #{?snk_kind := connector_aggreg_records_written, action := BridgeName}, + #{?snk_kind := connector_aggreg_records_written, action := AggregId}, infinity, 0 ), %% Find out the buffer file. {ok, #{filename := Filename}} = ?block_until( - #{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName} + #{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId} ), %% Stop the bridge, corrupt the buffer file, and restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), @@ -320,7 +324,7 @@ t_aggreg_upload_restart_corrupted(Config) -> ], ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check that upload contains part of the first batch and all of the second batch. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -341,6 +345,7 @@ t_aggreg_pending_upload_restart(Config) -> %% a restart. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), %% Send few large messages that will require multipart upload. @@ -362,7 +367,7 @@ t_aggreg_pending_upload_restart(Config) -> %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check that delivery contains all the messages. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -377,6 +382,7 @@ t_aggreg_next_rotate(Config) -> %% and windowing work correctly under high rate, high concurrency conditions. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), NSenders = 4, %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -393,7 +399,7 @@ t_aggreg_next_rotate(Config) -> %% Wait for the last delivery to complete. ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), ?block_until( - #{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0 + #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}, infinity, 0 ), %% There should be at least 2 time windows of aggregated records. Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], @@ -465,3 +471,6 @@ fetch_parse_csv(Bucket, Key) -> #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), {ok, CSV} = erl_csv:decode(Content), CSV. + +aggreg_id(BridgeName) -> + {?BRIDGE_TYPE, BridgeName}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl index 071c28ee5..c2b4549c1 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -25,7 +25,7 @@ ]). -record(delivery, { - name :: _Name, + id :: id(), callback_module :: module(), container :: emqx_connector_aggreg_csv:container(), reader :: emqx_connector_aggreg_buffer:reader(), @@ -33,6 +33,8 @@ empty :: boolean() }). +-type id() :: term(). + -type state() :: #delivery{}. -type init_opts() :: #{ @@ -59,22 +61,22 @@ %% -start_link(Name, Buffer, Opts) -> - proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). +start_link(Id, Buffer, Opts) -> + proc_lib:start_link(?MODULE, init, [self(), Id, Buffer, Opts]). %% --spec init(pid(), _Name, buffer(), init_opts()) -> no_return(). -init(Parent, Name, Buffer, Opts) -> - ?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}), +-spec init(pid(), id(), buffer(), init_opts()) -> no_return(). +init(Parent, Id, Buffer, Opts) -> + ?tp(connector_aggreg_delivery_started, #{action => Id, buffer => Buffer}), Reader = open_buffer(Buffer), - Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), + Delivery = init_delivery(Id, Reader, Buffer, Opts#{action => Id}), _ = erlang:process_flag(trap_exit, true), ok = proc_lib:init_ack({ok, self()}), loop(Delivery, Parent, []). init_delivery( - Name, + Id, Reader, Buffer, Opts = #{ @@ -84,7 +86,7 @@ init_delivery( ) -> BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer), #delivery{ - name = Name, + id = Id, callback_module = Mod, container = mk_container(ContainerOpts), reader = Reader, @@ -158,16 +160,16 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) error({transfer_failed, Reason}) end. -process_complete(#delivery{name = Name, empty = true}) -> - ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}), +process_complete(#delivery{id = Id, empty = true}) -> + ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => empty}), exit({shutdown, {skipped, empty}}); process_complete(#delivery{ - name = Name, callback_module = Mod, container = Container, transfer = Transfer0 + id = Id, callback_module = Mod, container = Container, transfer = Transfer0 }) -> Trailer = emqx_connector_aggreg_csv:close(Container), Transfer = Mod:process_append(Trailer, Transfer0), {ok, Completed} = Mod:process_complete(Transfer), - ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}), + ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}), ok. %%