From b91ff971702a9dbc99e0672f0b3ad49e9b96552d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 25 Apr 2024 18:21:29 +0200 Subject: [PATCH] feat(s3): separate streaming upload logic into dedicated module And use it in `emqx_s3_uploader`, while also turning it into a simple gen_server. --- apps/emqx_s3/src/emqx_s3_profile_conf.erl | 2 +- apps/emqx_s3/src/emqx_s3_upload.erl | 217 ++++++++++++++++ apps/emqx_s3/src/emqx_s3_uploader.erl | 300 +++++----------------- 3 files changed, 276 insertions(+), 243 deletions(-) create mode 100644 apps/emqx_s3/src/emqx_s3_upload.erl diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 67f9d33b3..7bb5d87f1 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -53,7 +53,7 @@ emqx_s3_client:bucket(), emqx_s3_client:config(), emqx_s3_client:upload_options(), - emqx_s3_uploader:config() + emqx_s3_upload:config() }. -define(DEFAULT_CALL_TIMEOUT, 5000). diff --git a/apps/emqx_s3/src/emqx_s3_upload.erl b/apps/emqx_s3/src/emqx_s3_upload.erl new file mode 100644 index 000000000..565c6b8bc --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_upload.erl @@ -0,0 +1,217 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_upload). + +-include_lib("emqx/include/types.hrl"). + +-export([ + new/4, + append/2, + write/1, + complete/1, + abort/1 +]). + +-export([format/1]). + +-export_type([t/0, config/0]). + +-type config() :: #{ + min_part_size => pos_integer(), + max_part_size => pos_integer() +}. + +-type t() :: #{ + started := boolean(), + client := emqx_s3_client:client(), + key := emqx_s3_client:key(), + upload_opts := emqx_s3_client:upload_options(), + buffer := iodata(), + buffer_size := non_neg_integer(), + min_part_size := pos_integer(), + max_part_size := pos_integer(), + upload_id := undefined | emqx_s3_client:upload_id(), + etags := [emqx_s3_client:etag()], + part_number := emqx_s3_client:part_number() +}. + +%% 5MB +-define(DEFAULT_MIN_PART_SIZE, 5242880). +%% 5GB +-define(DEFAULT_MAX_PART_SIZE, 5368709120). + +%% + +-spec new( + emqx_s3_client:client(), + emqx_s3_client:key(), + emqx_s3_client:upload_options(), + config() +) -> + t(). +new(Client, Key, UploadOpts, Config) -> + #{ + started => false, + client => Client, + key => Key, + upload_opts => UploadOpts, + buffer => [], + buffer_size => 0, + min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE), + max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE), + upload_id => undefined, + etags => [], + part_number => 1 + }. + +-spec append(iodata(), t()) -> {ok, t()} | {error, term()}. +append(WriteData, #{buffer := Buffer, buffer_size := BufferSize} = Upload) -> + case is_valid_part(WriteData, Upload) of + true -> + {ok, Upload#{ + buffer => [Buffer, WriteData], + buffer_size => BufferSize + iolist_size(WriteData) + }}; + false -> + {error, {too_large, iolist_size(WriteData)}} + end. + +-spec write(t()) -> {ok, t()} | {cont, t()} | {error, term()}. +write(U0 = #{started := false}) -> + case maybe_start_upload(U0) of + not_started -> + {ok, U0}; + {started, U1} -> + {cont, U1#{started := true}}; + {error, _} = Error -> + Error + end; +write(U0 = #{started := true}) -> + maybe_upload_part(U0). + +-spec complete(t()) -> {ok, t()} | {error, term()}. +complete( + #{ + started := true, + client := Client, + key := Key, + upload_id := UploadId + } = U0 +) -> + case upload_part(U0) of + {ok, #{etags := ETagsRev} = U1} -> + ETags = lists:reverse(ETagsRev), + case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of + ok -> + {ok, U1}; + {error, _} = Error -> + Error + end; + {error, _} = Error -> + Error + end; +complete(#{started := false} = Upload) -> + put_object(Upload). + +-spec abort(t()) -> ok_or_error(term()). +abort(#{ + started := true, + client := Client, + key := Key, + upload_id := UploadId +}) -> + case emqx_s3_client:abort_multipart(Client, Key, UploadId) of + ok -> + ok; + {error, _} = Error -> + Error + end; +abort(#{started := false}) -> + ok. + +%%-------------------------------------------------------------------- + +-spec format(t()) -> map(). +format(Upload = #{client := Client}) -> + Upload#{ + client => emqx_s3_client:format(Client), + buffer => [<<"...">>] + }. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +-spec maybe_start_upload(t()) -> not_started | {started, t()} | {error, term()}. +maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> + case BufferSize >= MinPartSize of + true -> + start_upload(Data); + false -> + not_started + end. + +-spec start_upload(t()) -> {started, t()} | {error, term()}. +start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) -> + case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of + {ok, UploadId} -> + NewData = Data#{upload_id => UploadId}, + {started, NewData}; + {error, _} = Error -> + Error + end. + +-spec maybe_upload_part(t()) -> ok_or_error(t(), term()). +maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> + case BufferSize >= MinPartSize of + true -> + upload_part(Data); + false -> + {ok, Data} + end. + +-spec upload_part(t()) -> ok_or_error(t(), term()). +upload_part(#{buffer_size := 0} = Upload) -> + {ok, Upload}; +upload_part( + #{ + client := Client, + key := Key, + upload_id := UploadId, + buffer := Buffer, + part_number := PartNumber, + etags := ETags + } = Upload +) -> + case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of + {ok, ETag} -> + {ok, Upload#{ + buffer => [], + buffer_size => 0, + part_number => PartNumber + 1, + etags => [{PartNumber, ETag} | ETags] + }}; + {error, _} = Error -> + Error + end. + +-spec put_object(t()) -> ok_or_error(t(), term()). +put_object( + #{ + client := Client, + key := Key, + upload_opts := UploadOpts, + buffer := Buffer + } = Upload +) -> + case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of + ok -> + {ok, Upload}; + {error, _} = Error -> + Error + end. + +is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> + BufferSize + iolist_size(WriteData) =< MaxPartSize. diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl index 160ecbfef..99a89eb92 100644 --- a/apps/emqx_s3/src/emqx_s3_uploader.erl +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -6,7 +6,7 @@ -include_lib("emqx/include/types.hrl"). --behaviour(gen_statem). +-behaviour(gen_server). -export([ start_link/3, @@ -25,46 +25,23 @@ -export([ init/1, - callback_mode/0, - handle_event/4, - terminate/3, - code_change/4, - format_status/1, - format_status/2 + handle_call/3, + handle_cast/2, + terminate/2, + format_status/1 ]). --export_type([config/0]). - --type config() :: #{ - min_part_size => pos_integer(), - max_part_size => pos_integer() -}. - -type data() :: #{ profile_id => emqx_s3:profile_id(), - client := emqx_s3_client:client(), - key := emqx_s3_client:key(), - upload_opts := emqx_s3_client:upload_options(), - buffer := iodata(), - buffer_size := non_neg_integer(), - min_part_size := pos_integer(), - max_part_size := pos_integer(), - upload_id := undefined | emqx_s3_client:upload_id(), - etags := [emqx_s3_client:etag()], - part_number := emqx_s3_client:part_number() + upload := emqx_s3_upload:t() | aborted }. -%% 5MB --define(DEFAULT_MIN_PART_SIZE, 5242880). -%% 5GB --define(DEFAULT_MAX_PART_SIZE, 5368709120). - -define(DEFAULT_TIMEOUT, 30000). -spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) -> - gen_statem:start_ret(). + gen_server:start_ret(). start_link(ProfileId, Key, UploadOpts) when is_list(Key) -> - gen_statem:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []). + gen_server:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []). -spec write(pid(), iodata()) -> ok_or_error(term()). write(Pid, WriteData) -> @@ -72,7 +49,7 @@ write(Pid, WriteData) -> -spec write(pid(), iodata(), timeout()) -> ok_or_error(term()). write(Pid, WriteData, Timeout) -> - gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout). + gen_server:call(Pid, {write, wrap(WriteData)}, Timeout). -spec complete(pid()) -> ok_or_error(term()). complete(Pid) -> @@ -80,7 +57,7 @@ complete(Pid) -> -spec complete(pid(), timeout()) -> ok_or_error(term()). complete(Pid, Timeout) -> - gen_statem:call(Pid, complete, Timeout). + gen_server:call(Pid, complete, Timeout). -spec abort(pid()) -> ok_or_error(term()). abort(Pid) -> @@ -88,7 +65,7 @@ abort(Pid) -> -spec abort(pid(), timeout()) -> ok_or_error(term()). abort(Pid, Timeout) -> - gen_statem:call(Pid, abort, Timeout). + gen_server:call(Pid, abort, Timeout). -spec shutdown(pid()) -> ok. shutdown(Pid) -> @@ -99,231 +76,73 @@ shutdown(Pid) -> %% gen_statem callbacks %%-------------------------------------------------------------------- -callback_mode() -> handle_event_function. - init({profile, ProfileId, Key, UploadOpts}) -> + _ = process_flag(trap_exit, true), {Bucket, ClientConfig, BaseOpts, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId), - Upload = #{ - profile_id => ProfileId, - client => client(Bucket, ClientConfig), - key => Key, - upload_opts => maps:merge(BaseOpts, UploadOpts) - }, - init({upload, UploaderConfig, Upload}); -init({upload, Config, Upload}) -> - process_flag(trap_exit, true), - {ok, upload_not_started, Upload#{ - buffer => [], - buffer_size => 0, - min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE), - max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE), - upload_id => undefined, - etags => [], - part_number => 1 - }}. + Upload = emqx_s3_upload:new( + client(Bucket, ClientConfig), + Key, + maps:merge(BaseOpts, UploadOpts), + UploaderConfig + ), + {ok, #{profile_id => ProfileId, upload => Upload}}. -handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) -> +-spec handle_call(_Call, gen_server:from(), data()) -> + {reply, _Result, data()} | {stop, _Reason, _Result, data()}. +handle_call({write, WriteDataWrapped}, _From, St0 = #{upload := U0}) -> WriteData = unwrap(WriteDataWrapped), - case is_valid_part(WriteData, Data0) of - true -> - handle_write(State, From, WriteData, Data0); - false -> - {keep_state_and_data, {reply, From, {error, {too_large, iolist_size(WriteData)}}}} + case emqx_s3_upload:append(WriteData, U0) of + {ok, U1} -> + handle_write(St0#{upload := U1}); + {error, _} = Error -> + {reply, Error, St0} end; -handle_event({call, From}, complete, upload_not_started, Data0) -> - case put_object(Data0) of +handle_call(complete, _From, St0 = #{upload := U0}) -> + case emqx_s3_upload:complete(U0) of + {ok, U1} -> + {stop, normal, ok, St0#{upload := U1}}; + {error, _} = Error -> + {stop, Error, Error, St0} + end; +handle_call(abort, _From, St = #{upload := Upload}) -> + case emqx_s3_upload:abort(Upload) of ok -> - {stop_and_reply, normal, {reply, From, ok}}; + {stop, normal, ok, St}; {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data0} - end; -handle_event({call, From}, complete, upload_started, Data0) -> - case complete_upload(Data0) of - {ok, Data1} -> - {stop_and_reply, normal, {reply, From, ok}, Data1}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data0} - end; -handle_event({call, From}, abort, upload_not_started, _Data) -> - {stop_and_reply, normal, {reply, From, ok}}; -handle_event({call, From}, abort, upload_started, Data0) -> - case abort_upload(Data0) of - ok -> - {stop_and_reply, normal, {reply, From, ok}}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data0} + {stop, Error, Error, St} end. -handle_write(upload_not_started, From, WriteData, Data0) -> - Data1 = append_buffer(Data0, WriteData), - case maybe_start_upload(Data1) of - not_started -> - {keep_state, Data1, {reply, From, ok}}; - {started, Data2} -> - case upload_part(Data2) of - {ok, Data3} -> - {next_state, upload_started, Data3, {reply, From, ok}}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data2} - end; +handle_write(St = #{upload := U0}) -> + case emqx_s3_upload:write(U0) of + {ok, U1} -> + {reply, ok, St#{upload := U1}}; + {cont, U1} -> + handle_write(St#{upload := U1}); {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data1} - end; -handle_write(upload_started, From, WriteData, Data0) -> - Data1 = append_buffer(Data0, WriteData), - case maybe_upload_part(Data1) of - {ok, Data2} -> - {keep_state, Data2, {reply, From, ok}}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data1} + {stop, Error, Error, St} end. -terminate(Reason, _State, #{client := Client, upload_id := UploadId, key := Key}) when - (UploadId =/= undefined) andalso (Reason =/= normal) --> - emqx_s3_client:abort_multipart(Client, Key, UploadId); -terminate(_Reason, _State, _Data) -> - ok. +-spec handle_cast(_Cast, data()) -> {noreply, data()}. +handle_cast(_Cast, St) -> + {noreply, St}. -code_change(_OldVsn, StateName, State, _Extra) -> - {ok, StateName, State}. +-spec terminate(_Reason, data()) -> ok. +terminate(normal, _St) -> + ok; +terminate({shutdown, _}, _St) -> + ok; +terminate(_Reason, #{upload := Upload}) -> + emqx_s3_upload:abort(Upload). -format_status(#{data := #{client := Client} = Data} = Status) -> - Status#{ - data => Data#{ - client => emqx_s3_client:format(Client), - buffer => [<<"...">>] - } - }. - -format_status(_Opt, [PDict, State, #{client := Client} = Data]) -> - #{ - data => Data#{ - client => emqx_s3_client:format(Client), - buffer => [<<"...">>] - }, - state => State, - pdict => PDict - }. +format_status(#{state := State = #{upload := Upload}} = Status) -> + StateRedacted = State#{upload := emqx_s3_upload:format(Upload)}, + Status#{state := StateRedacted}. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- --spec maybe_start_upload(data()) -> not_started | {started, data()} | {error, term()}. -maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> - case BufferSize >= MinPartSize of - true -> - start_upload(Data); - false -> - not_started - end. - --spec start_upload(data()) -> {started, data()} | {error, term()}. -start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) -> - case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of - {ok, UploadId} -> - NewData = Data#{upload_id => UploadId}, - {started, NewData}; - {error, _} = Error -> - Error - end. - --spec maybe_upload_part(data()) -> ok_or_error(data(), term()). -maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> - case BufferSize >= MinPartSize of - true -> - upload_part(Data); - false -> - {ok, Data} - end. - --spec upload_part(data()) -> ok_or_error(data(), term()). -upload_part(#{buffer_size := 0} = Data) -> - {ok, Data}; -upload_part( - #{ - client := Client, - key := Key, - upload_id := UploadId, - buffer := Buffer, - part_number := PartNumber, - etags := ETags - } = Data -) -> - case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of - {ok, ETag} -> - NewData = Data#{ - buffer => [], - buffer_size => 0, - part_number => PartNumber + 1, - etags => [{PartNumber, ETag} | ETags] - }, - {ok, NewData}; - {error, _} = Error -> - Error - end. - --spec complete_upload(data()) -> ok_or_error(data(), term()). -complete_upload( - #{ - client := Client, - key := Key, - upload_id := UploadId - } = Data0 -) -> - case upload_part(Data0) of - {ok, #{etags := ETagsRev} = Data1} -> - ETags = lists:reverse(ETagsRev), - case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of - ok -> - {ok, Data1}; - {error, _} = Error -> - Error - end; - {error, _} = Error -> - Error - end. - --spec abort_upload(data()) -> ok_or_error(term()). -abort_upload( - #{ - client := Client, - key := Key, - upload_id := UploadId - } -) -> - case emqx_s3_client:abort_multipart(Client, Key, UploadId) of - ok -> - ok; - {error, _} = Error -> - Error - end. - --spec put_object(data()) -> ok_or_error(term()). -put_object( - #{ - client := Client, - key := Key, - upload_opts := UploadOpts, - buffer := Buffer - } -) -> - case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of - ok -> - ok; - {error, _} = Error -> - Error - end. - --spec append_buffer(data(), iodata()) -> data(). -append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) -> - Data#{ - buffer => [Buffer, WriteData], - buffer_size => BufferSize + iolist_size(WriteData) - }. - -compile({inline, [wrap/1, unwrap/1]}). wrap(Data) -> fun() -> Data end. @@ -331,8 +150,5 @@ wrap(Data) -> unwrap(WrappedData) -> WrappedData(). -is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> - BufferSize + iolist_size(WriteData) =< MaxPartSize. - client(Bucket, Config) -> emqx_s3_client:create(Bucket, Config).