feat(s3): separate streaming upload logic into dedicated module

And use it in `emqx_s3_uploader`, while also turning it into a simple
gen_server.
This commit is contained in:
Andrew Mayorov 2024-04-25 18:21:29 +02:00
parent bd7ff8a03f
commit b91ff97170
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 276 additions and 243 deletions

View File

@ -53,7 +53,7 @@
emqx_s3_client:bucket(),
emqx_s3_client:config(),
emqx_s3_client:upload_options(),
emqx_s3_uploader:config()
emqx_s3_upload:config()
}.
-define(DEFAULT_CALL_TIMEOUT, 5000).

View File

@ -0,0 +1,217 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_s3_upload).
-include_lib("emqx/include/types.hrl").
-export([
new/4,
append/2,
write/1,
complete/1,
abort/1
]).
-export([format/1]).
-export_type([t/0, config/0]).
-type config() :: #{
min_part_size => pos_integer(),
max_part_size => pos_integer()
}.
-type t() :: #{
started := boolean(),
client := emqx_s3_client:client(),
key := emqx_s3_client:key(),
upload_opts := emqx_s3_client:upload_options(),
buffer := iodata(),
buffer_size := non_neg_integer(),
min_part_size := pos_integer(),
max_part_size := pos_integer(),
upload_id := undefined | emqx_s3_client:upload_id(),
etags := [emqx_s3_client:etag()],
part_number := emqx_s3_client:part_number()
}.
%% 5MB
-define(DEFAULT_MIN_PART_SIZE, 5242880).
%% 5GB
-define(DEFAULT_MAX_PART_SIZE, 5368709120).
%%
-spec new(
emqx_s3_client:client(),
emqx_s3_client:key(),
emqx_s3_client:upload_options(),
config()
) ->
t().
new(Client, Key, UploadOpts, Config) ->
#{
started => false,
client => Client,
key => Key,
upload_opts => UploadOpts,
buffer => [],
buffer_size => 0,
min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE),
max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE),
upload_id => undefined,
etags => [],
part_number => 1
}.
-spec append(iodata(), t()) -> {ok, t()} | {error, term()}.
append(WriteData, #{buffer := Buffer, buffer_size := BufferSize} = Upload) ->
case is_valid_part(WriteData, Upload) of
true ->
{ok, Upload#{
buffer => [Buffer, WriteData],
buffer_size => BufferSize + iolist_size(WriteData)
}};
false ->
{error, {too_large, iolist_size(WriteData)}}
end.
-spec write(t()) -> {ok, t()} | {cont, t()} | {error, term()}.
write(U0 = #{started := false}) ->
case maybe_start_upload(U0) of
not_started ->
{ok, U0};
{started, U1} ->
{cont, U1#{started := true}};
{error, _} = Error ->
Error
end;
write(U0 = #{started := true}) ->
maybe_upload_part(U0).
-spec complete(t()) -> {ok, t()} | {error, term()}.
complete(
#{
started := true,
client := Client,
key := Key,
upload_id := UploadId
} = U0
) ->
case upload_part(U0) of
{ok, #{etags := ETagsRev} = U1} ->
ETags = lists:reverse(ETagsRev),
case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of
ok ->
{ok, U1};
{error, _} = Error ->
Error
end;
{error, _} = Error ->
Error
end;
complete(#{started := false} = Upload) ->
put_object(Upload).
-spec abort(t()) -> ok_or_error(term()).
abort(#{
started := true,
client := Client,
key := Key,
upload_id := UploadId
}) ->
case emqx_s3_client:abort_multipart(Client, Key, UploadId) of
ok ->
ok;
{error, _} = Error ->
Error
end;
abort(#{started := false}) ->
ok.
%%--------------------------------------------------------------------
-spec format(t()) -> map().
format(Upload = #{client := Client}) ->
Upload#{
client => emqx_s3_client:format(Client),
buffer => [<<"...">>]
}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec maybe_start_upload(t()) -> not_started | {started, t()} | {error, term()}.
maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
start_upload(Data);
false ->
not_started
end.
-spec start_upload(t()) -> {started, t()} | {error, term()}.
start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) ->
case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of
{ok, UploadId} ->
NewData = Data#{upload_id => UploadId},
{started, NewData};
{error, _} = Error ->
Error
end.
-spec maybe_upload_part(t()) -> ok_or_error(t(), term()).
maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
upload_part(Data);
false ->
{ok, Data}
end.
-spec upload_part(t()) -> ok_or_error(t(), term()).
upload_part(#{buffer_size := 0} = Upload) ->
{ok, Upload};
upload_part(
#{
client := Client,
key := Key,
upload_id := UploadId,
buffer := Buffer,
part_number := PartNumber,
etags := ETags
} = Upload
) ->
case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of
{ok, ETag} ->
{ok, Upload#{
buffer => [],
buffer_size => 0,
part_number => PartNumber + 1,
etags => [{PartNumber, ETag} | ETags]
}};
{error, _} = Error ->
Error
end.
-spec put_object(t()) -> ok_or_error(t(), term()).
put_object(
#{
client := Client,
key := Key,
upload_opts := UploadOpts,
buffer := Buffer
} = Upload
) ->
case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of
ok ->
{ok, Upload};
{error, _} = Error ->
Error
end.
is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
BufferSize + iolist_size(WriteData) =< MaxPartSize.

View File

@ -6,7 +6,7 @@
-include_lib("emqx/include/types.hrl").
-behaviour(gen_statem).
-behaviour(gen_server).
-export([
start_link/3,
@ -25,46 +25,23 @@
-export([
init/1,
callback_mode/0,
handle_event/4,
terminate/3,
code_change/4,
format_status/1,
format_status/2
handle_call/3,
handle_cast/2,
terminate/2,
format_status/1
]).
-export_type([config/0]).
-type config() :: #{
min_part_size => pos_integer(),
max_part_size => pos_integer()
}.
-type data() :: #{
profile_id => emqx_s3:profile_id(),
client := emqx_s3_client:client(),
key := emqx_s3_client:key(),
upload_opts := emqx_s3_client:upload_options(),
buffer := iodata(),
buffer_size := non_neg_integer(),
min_part_size := pos_integer(),
max_part_size := pos_integer(),
upload_id := undefined | emqx_s3_client:upload_id(),
etags := [emqx_s3_client:etag()],
part_number := emqx_s3_client:part_number()
upload := emqx_s3_upload:t() | aborted
}.
%% 5MB
-define(DEFAULT_MIN_PART_SIZE, 5242880).
%% 5GB
-define(DEFAULT_MAX_PART_SIZE, 5368709120).
-define(DEFAULT_TIMEOUT, 30000).
-spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
gen_statem:start_ret().
gen_server:start_ret().
start_link(ProfileId, Key, UploadOpts) when is_list(Key) ->
gen_statem:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []).
gen_server:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []).
-spec write(pid(), iodata()) -> ok_or_error(term()).
write(Pid, WriteData) ->
@ -72,7 +49,7 @@ write(Pid, WriteData) ->
-spec write(pid(), iodata(), timeout()) -> ok_or_error(term()).
write(Pid, WriteData, Timeout) ->
gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout).
gen_server:call(Pid, {write, wrap(WriteData)}, Timeout).
-spec complete(pid()) -> ok_or_error(term()).
complete(Pid) ->
@ -80,7 +57,7 @@ complete(Pid) ->
-spec complete(pid(), timeout()) -> ok_or_error(term()).
complete(Pid, Timeout) ->
gen_statem:call(Pid, complete, Timeout).
gen_server:call(Pid, complete, Timeout).
-spec abort(pid()) -> ok_or_error(term()).
abort(Pid) ->
@ -88,7 +65,7 @@ abort(Pid) ->
-spec abort(pid(), timeout()) -> ok_or_error(term()).
abort(Pid, Timeout) ->
gen_statem:call(Pid, abort, Timeout).
gen_server:call(Pid, abort, Timeout).
-spec shutdown(pid()) -> ok.
shutdown(Pid) ->
@ -99,231 +76,73 @@ shutdown(Pid) ->
%% gen_statem callbacks
%%--------------------------------------------------------------------
callback_mode() -> handle_event_function.
init({profile, ProfileId, Key, UploadOpts}) ->
_ = process_flag(trap_exit, true),
{Bucket, ClientConfig, BaseOpts, UploaderConfig} =
emqx_s3_profile_conf:checkout_config(ProfileId),
Upload = #{
profile_id => ProfileId,
client => client(Bucket, ClientConfig),
key => Key,
upload_opts => maps:merge(BaseOpts, UploadOpts)
},
init({upload, UploaderConfig, Upload});
init({upload, Config, Upload}) ->
process_flag(trap_exit, true),
{ok, upload_not_started, Upload#{
buffer => [],
buffer_size => 0,
min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE),
max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE),
upload_id => undefined,
etags => [],
part_number => 1
}}.
Upload = emqx_s3_upload:new(
client(Bucket, ClientConfig),
Key,
maps:merge(BaseOpts, UploadOpts),
UploaderConfig
),
{ok, #{profile_id => ProfileId, upload => Upload}}.
handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) ->
-spec handle_call(_Call, gen_server:from(), data()) ->
{reply, _Result, data()} | {stop, _Reason, _Result, data()}.
handle_call({write, WriteDataWrapped}, _From, St0 = #{upload := U0}) ->
WriteData = unwrap(WriteDataWrapped),
case is_valid_part(WriteData, Data0) of
true ->
handle_write(State, From, WriteData, Data0);
false ->
{keep_state_and_data, {reply, From, {error, {too_large, iolist_size(WriteData)}}}}
case emqx_s3_upload:append(WriteData, U0) of
{ok, U1} ->
handle_write(St0#{upload := U1});
{error, _} = Error ->
{reply, Error, St0}
end;
handle_event({call, From}, complete, upload_not_started, Data0) ->
case put_object(Data0) of
handle_call(complete, _From, St0 = #{upload := U0}) ->
case emqx_s3_upload:complete(U0) of
{ok, U1} ->
{stop, normal, ok, St0#{upload := U1}};
{error, _} = Error ->
{stop, Error, Error, St0}
end;
handle_call(abort, _From, St = #{upload := Upload}) ->
case emqx_s3_upload:abort(Upload) of
ok ->
{stop_and_reply, normal, {reply, From, ok}};
{stop, normal, ok, St};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data0}
end;
handle_event({call, From}, complete, upload_started, Data0) ->
case complete_upload(Data0) of
{ok, Data1} ->
{stop_and_reply, normal, {reply, From, ok}, Data1};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data0}
end;
handle_event({call, From}, abort, upload_not_started, _Data) ->
{stop_and_reply, normal, {reply, From, ok}};
handle_event({call, From}, abort, upload_started, Data0) ->
case abort_upload(Data0) of
ok ->
{stop_and_reply, normal, {reply, From, ok}};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data0}
{stop, Error, Error, St}
end.
handle_write(upload_not_started, From, WriteData, Data0) ->
Data1 = append_buffer(Data0, WriteData),
case maybe_start_upload(Data1) of
not_started ->
{keep_state, Data1, {reply, From, ok}};
{started, Data2} ->
case upload_part(Data2) of
{ok, Data3} ->
{next_state, upload_started, Data3, {reply, From, ok}};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data2}
end;
handle_write(St = #{upload := U0}) ->
case emqx_s3_upload:write(U0) of
{ok, U1} ->
{reply, ok, St#{upload := U1}};
{cont, U1} ->
handle_write(St#{upload := U1});
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data1}
end;
handle_write(upload_started, From, WriteData, Data0) ->
Data1 = append_buffer(Data0, WriteData),
case maybe_upload_part(Data1) of
{ok, Data2} ->
{keep_state, Data2, {reply, From, ok}};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data1}
{stop, Error, Error, St}
end.
terminate(Reason, _State, #{client := Client, upload_id := UploadId, key := Key}) when
(UploadId =/= undefined) andalso (Reason =/= normal)
->
emqx_s3_client:abort_multipart(Client, Key, UploadId);
terminate(_Reason, _State, _Data) ->
ok.
-spec handle_cast(_Cast, data()) -> {noreply, data()}.
handle_cast(_Cast, St) ->
{noreply, St}.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
-spec terminate(_Reason, data()) -> ok.
terminate(normal, _St) ->
ok;
terminate({shutdown, _}, _St) ->
ok;
terminate(_Reason, #{upload := Upload}) ->
emqx_s3_upload:abort(Upload).
format_status(#{data := #{client := Client} = Data} = Status) ->
Status#{
data => Data#{
client => emqx_s3_client:format(Client),
buffer => [<<"...">>]
}
}.
format_status(_Opt, [PDict, State, #{client := Client} = Data]) ->
#{
data => Data#{
client => emqx_s3_client:format(Client),
buffer => [<<"...">>]
},
state => State,
pdict => PDict
}.
format_status(#{state := State = #{upload := Upload}} = Status) ->
StateRedacted = State#{upload := emqx_s3_upload:format(Upload)},
Status#{state := StateRedacted}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec maybe_start_upload(data()) -> not_started | {started, data()} | {error, term()}.
maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
start_upload(Data);
false ->
not_started
end.
-spec start_upload(data()) -> {started, data()} | {error, term()}.
start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) ->
case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of
{ok, UploadId} ->
NewData = Data#{upload_id => UploadId},
{started, NewData};
{error, _} = Error ->
Error
end.
-spec maybe_upload_part(data()) -> ok_or_error(data(), term()).
maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
upload_part(Data);
false ->
{ok, Data}
end.
-spec upload_part(data()) -> ok_or_error(data(), term()).
upload_part(#{buffer_size := 0} = Data) ->
{ok, Data};
upload_part(
#{
client := Client,
key := Key,
upload_id := UploadId,
buffer := Buffer,
part_number := PartNumber,
etags := ETags
} = Data
) ->
case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of
{ok, ETag} ->
NewData = Data#{
buffer => [],
buffer_size => 0,
part_number => PartNumber + 1,
etags => [{PartNumber, ETag} | ETags]
},
{ok, NewData};
{error, _} = Error ->
Error
end.
-spec complete_upload(data()) -> ok_or_error(data(), term()).
complete_upload(
#{
client := Client,
key := Key,
upload_id := UploadId
} = Data0
) ->
case upload_part(Data0) of
{ok, #{etags := ETagsRev} = Data1} ->
ETags = lists:reverse(ETagsRev),
case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of
ok ->
{ok, Data1};
{error, _} = Error ->
Error
end;
{error, _} = Error ->
Error
end.
-spec abort_upload(data()) -> ok_or_error(term()).
abort_upload(
#{
client := Client,
key := Key,
upload_id := UploadId
}
) ->
case emqx_s3_client:abort_multipart(Client, Key, UploadId) of
ok ->
ok;
{error, _} = Error ->
Error
end.
-spec put_object(data()) -> ok_or_error(term()).
put_object(
#{
client := Client,
key := Key,
upload_opts := UploadOpts,
buffer := Buffer
}
) ->
case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of
ok ->
ok;
{error, _} = Error ->
Error
end.
-spec append_buffer(data(), iodata()) -> data().
append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) ->
Data#{
buffer => [Buffer, WriteData],
buffer_size => BufferSize + iolist_size(WriteData)
}.
-compile({inline, [wrap/1, unwrap/1]}).
wrap(Data) ->
fun() -> Data end.
@ -331,8 +150,5 @@ wrap(Data) ->
unwrap(WrappedData) ->
WrappedData().
is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
BufferSize + iolist_size(WriteData) =< MaxPartSize.
client(Bucket, Config) ->
emqx_s3_client:create(Bucket, Config).