From f6e5eea4f7b25e74fcd6264bb820da4c2a893443 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 26 Apr 2024 20:00:28 +0200 Subject: [PATCH] feat(s3-aggreg): handle delivery shutdowns gracefully --- .../src/emqx_bridge_s3_aggreg_delivery.erl | 164 ++++++++++++------ .../src/emqx_bridge_s3_aggregator.erl | 1 + .../emqx_bridge_s3_aggreg_upload_SUITE.erl | 37 +++- .../test/emqx_bridge_s3_test_helpers.erl | 6 + apps/emqx_s3/src/emqx_s3_client.erl | 19 +- 5 files changed, 168 insertions(+), 59 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl index c548226ab..02099dbec 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl @@ -12,11 +12,21 @@ -export([start_link/3]). %% Internal exports --export([run_delivery/3]). +-export([ + init/4, + loop/3 +]). -behaviour(emqx_template). -export([lookup/2]). +%% Sys +-export([ + system_continue/3, + system_terminate/4, + format_status/2 +]). + -record(delivery, { name :: _Name, container :: emqx_bridge_s3_aggreg_csv:container(), @@ -25,19 +35,23 @@ empty :: boolean() }). +-type state() :: #delivery{}. + %% start_link(Name, Buffer, Opts) -> - proc_lib:start_link(?MODULE, run_delivery, [Name, Buffer, Opts]). + proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). %% -run_delivery(Name, Buffer, Opts) -> +-spec init(pid(), _Name, buffer(), _Opts :: map()) -> no_return(). +init(Parent, Name, Buffer, Opts) -> ?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}), Reader = open_buffer(Buffer), Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), + _ = erlang:process_flag(trap_exit, true), ok = proc_lib:init_ack({ok, self()}), - loop_deliver(Delivery). + loop(Delivery, Parent, []). init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) -> #delivery{ @@ -48,53 +62,13 @@ init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) -> empty = true }. -loop_deliver(Delivery = #delivery{reader = Reader0}) -> - case emqx_bridge_s3_aggreg_buffer:read(Reader0) of - {Records = [#{} | _], Reader} -> - loop_deliver_records(Records, Delivery#delivery{reader = Reader}); - {[], Reader} -> - loop_deliver(Delivery#delivery{reader = Reader}); - eof -> - complete_delivery(Delivery); - {Unexpected, _Reader} -> - exit({buffer_unexpected_record, Unexpected}) - end. - -loop_deliver_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) -> - {Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0), - {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), - loop_deliver_upload(Delivery#delivery{ - container = Container, - upload = Upload, - empty = false - }). - -loop_deliver_upload(Delivery = #delivery{upload = Upload0}) -> - case emqx_s3_upload:write(Upload0) of - {ok, Upload} -> - loop_deliver(Delivery#delivery{upload = Upload}); - {cont, Upload} -> - loop_deliver_upload(Delivery#delivery{upload = Upload}); +open_buffer(#buffer{filename = Filename}) -> + case file:open(Filename, [read, binary, raw]) of + {ok, FD} -> + {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD), + Reader; {error, Reason} -> - %% TODO: retries - _ = emqx_s3_upload:abort(Upload0), - exit({upload_failed, Reason}) - end. - -complete_delivery(#delivery{name = Name, empty = true}) -> - ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}), - exit({shutdown, {skipped, empty}}); -complete_delivery(#delivery{name = Name, container = Container, upload = Upload0}) -> - Trailer = emqx_bridge_s3_aggreg_csv:close(Container), - {ok, Upload} = emqx_s3_upload:append(Trailer, Upload0), - case emqx_s3_upload:complete(Upload) of - {ok, Completed} -> - ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}), - ok; - {error, Reason} -> - %% TODO: retries - _ = emqx_s3_upload:abort(Upload), - exit({upload_failed, Reason}) + error({buffer_open_failed, Reason}) end. mk_container(#{type := csv, column_order := OrderOpt}) -> @@ -118,15 +92,91 @@ mk_upload( mk_object_key(Buffer, #{action := Name, key := Template}) -> emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}). -open_buffer(#buffer{filename = Filename}) -> - case file:open(Filename, [read, binary, raw]) of - {ok, FD} -> - {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD), - Reader; - {error, Reason} -> - error({buffer_open_failed, Reason}) +%% + +-spec loop(state(), pid(), [sys:debug_option()]) -> no_return(). +loop(Delivery, Parent, Debug) -> + %% NOTE: This function is mocked in tests. + receive + Msg -> handle_msg(Msg, Delivery, Parent, Debug) + after 0 -> + process_delivery(Delivery, Parent, Debug) end. +process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) -> + case emqx_bridge_s3_aggreg_buffer:read(Reader0) of + {Records = [#{} | _], Reader} -> + Delivery1 = Delivery0#delivery{reader = Reader}, + Delivery2 = process_append_records(Records, Delivery1), + Delivery = process_write(Delivery2), + loop(Delivery, Parent, Debug); + {[], Reader} -> + Delivery = Delivery0#delivery{reader = Reader}, + loop(Delivery, Parent, Debug); + eof -> + process_complete(Delivery0); + {Unexpected, _Reader} -> + exit({buffer_unexpected_record, Unexpected}) + end. + +process_append_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) -> + {Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0), + {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), + Delivery#delivery{ + container = Container, + upload = Upload, + empty = false + }. + +process_write(Delivery = #delivery{upload = Upload0}) -> + case emqx_s3_upload:write(Upload0) of + {ok, Upload} -> + Delivery#delivery{upload = Upload}; + {cont, Upload} -> + process_write(Delivery#delivery{upload = Upload}); + {error, Reason} -> + _ = emqx_s3_upload:abort(Upload0), + exit({upload_failed, Reason}) + end. + +process_complete(#delivery{name = Name, empty = true}) -> + ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}), + exit({shutdown, {skipped, empty}}); +process_complete(#delivery{name = Name, container = Container, upload = Upload0}) -> + Trailer = emqx_bridge_s3_aggreg_csv:close(Container), + {ok, Upload} = emqx_s3_upload:append(Trailer, Upload0), + case emqx_s3_upload:complete(Upload) of + {ok, Completed} -> + ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}), + ok; + {error, Reason} -> + _ = emqx_s3_upload:abort(Upload), + exit({upload_failed, Reason}) + end. + +%% + +handle_msg({system, From, Msg}, Delivery, Parent, Debug) -> + sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery); +handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) -> + system_terminate(Reason, Parent, Debug, Delivery); +handle_msg(_Msg, Delivery, Parent, Debug) -> + loop(Parent, Debug, Delivery). + +-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return(). +system_continue(Parent, Debug, Delivery) -> + loop(Delivery, Parent, Debug). + +-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _. +system_terminate(_Reason, _Parent, _Debug, #delivery{upload = Upload}) -> + emqx_s3_upload:abort(Upload). + +-spec format_status(normal, Args :: [term()]) -> _StateFormatted. +format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) -> + Delivery#delivery{ + upload = emqx_s3_upload:format(Delivery#delivery.upload) + }. + %% -spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl index c49a29cb8..9d1ac5575 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl @@ -393,6 +393,7 @@ handle_delivery_exit(Buffer, Error, St = #st{name = Name}) -> filename => Buffer#buffer.filename, reason => Error }), + %% TODO: Retries? enqueue_status_error(Error, St). enqueue_status_error({upload_failed, Error}, St = #st{errors = QErrors}) -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index c7eeee2bb..07a4c2056 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -219,7 +219,6 @@ t_aggreg_upload_restart(Config) -> %% Check there's still only one upload. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), - %% Verify that column order is respected. ?assertMatch( {ok, [ _Header = [_ | _], @@ -285,6 +284,42 @@ t_aggreg_upload_restart_corrupted(Config) -> CSV ). +t_aggreg_pending_upload_restart(Config) -> + %% NOTE + %% This test verifies that the bridge will finish uploading a buffer file after + %% a restart. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Send few large messages that will require multipart upload. + %% Ensure that they span multiple batch queries. + Payload = iolist_to_binary(lists:duplicate(128 * 1024, "PAYLOAD!")), + Messages = [{integer_to_binary(N), <<"a/b/c">>, Payload} || N <- lists:seq(1, 10)], + ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages), 10), + %% Wait until the multipart upload is started. + {ok, #{key := ObjectKey}} = + ?block_until(#{?snk_kind := s3_client_multipart_started, bucket := Bucket}), + %% Stop the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + %% Verify that pending uploads have been gracefully aborted. + %% NOTE: Minio does not support multipart upload listing w/o prefix. + ?assertEqual( + [], + emqx_bridge_s3_test_helpers:list_pending_uploads(Bucket, ObjectKey) + ), + %% Restart the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + %% Wait until the delivery is completed. + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check that delivery contains all the messages. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + [_Header | Rows] = fetch_parse_csv(Bucket, Key), + ?assertEqual( + Messages, + [{CID, Topic, PL} || [_TS, CID, Topic, PL | _] <- Rows] + ). + t_aggreg_next_rotate(Config) -> %% NOTE %% This is essentially a stress test that tries to verify that buffer rotation diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl index de19c8028..21729369b 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl @@ -43,6 +43,12 @@ get_object(Bucket, Key) -> AwsConfig = emqx_s3_test_helpers:aws_config(tcp), maps:from_list(erlcloud_s3:get_object(Bucket, Key, AwsConfig)). +list_pending_uploads(Bucket, Key) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + {ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig), + Uploads = proplists:get_value(uploads, Props), + lists:map(fun maps:from_list/1, Uploads). + %% File utilities truncate_at(Filename, Pos) -> diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index a415cf8d4..58029ae85 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -6,6 +6,7 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("erlcloud/include/erlcloud_aws.hrl"). -export([ @@ -133,7 +134,13 @@ start_multipart( Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)), case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of {ok, Props} -> - {ok, response_property('uploadId', Props)}; + UploadId = response_property('uploadId', Props), + ?tp(s3_client_multipart_started, #{ + bucket => Bucket, + key => Key, + upload_id => UploadId + }), + {ok, UploadId}; {error, Reason} -> ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), {error, Reason} @@ -177,6 +184,11 @@ complete_multipart( ) of ok -> + ?tp(s3_client_multipart_completed, #{ + bucket => Bucket, + key => Key, + upload_id => UploadId + }), ok; {error, Reason} -> ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}), @@ -193,6 +205,11 @@ abort_multipart( ) -> case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of ok -> + ?tp(s3_client_multipart_aborted, #{ + bucket => Bucket, + key => Key, + upload_id => UploadId + }), ok; {error, Reason} -> ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}),