feat(ft-s3): integrate list API

This commit is contained in:
Ilya Averyanov 2023-04-03 20:35:47 +03:00
parent 5ac3543a76
commit 43f9737420
14 changed files with 333 additions and 162 deletions

View File

@ -118,7 +118,6 @@ decode_filemeta(Map) when is_map(Map) ->
end. end.
encode_filemeta(Meta = #{}) -> encode_filemeta(Meta = #{}) ->
% TODO: Looks like this should be hocon's responsibility.
Schema = emqx_ft_schema:schema(filemeta), Schema = emqx_ft_schema:schema(filemeta),
hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}). hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}).

View File

@ -69,6 +69,7 @@ schema("/file_transfer/files") ->
'/file_transfer/files'(get, #{}) -> '/file_transfer/files'(get, #{}) ->
case emqx_ft_storage:files() of case emqx_ft_storage:files() of
{ok, Files} -> {ok, Files} ->
?SLOG(warning, #{msg => "files", files => Files}),
{200, #{<<"files">> => lists:map(fun format_file_info/1, Files)}}; {200, #{<<"files">> => lists:map(fun format_file_info/1, Files)}};
{error, _} -> {error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
@ -94,7 +95,7 @@ format_file_info(
} }
) -> ) ->
Res = #{ Res = #{
name => iolist_to_binary(Name), name => format_name(Name),
size => Size, size => Size,
timestamp => format_timestamp(Timestamp), timestamp => format_timestamp(Timestamp),
clientid => ClientId, clientid => ClientId,
@ -110,3 +111,8 @@ format_file_info(
format_timestamp(Timestamp) -> format_timestamp(Timestamp) ->
iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])). iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
format_name(NameBin) when is_binary(NameBin) ->
NameBin;
format_name(Name) when is_list(Name) ->
iolist_to_binary(Name).

View File

@ -1,102 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_exporter_s3).
-include_lib("emqx/include/logger.hrl").
%% Exporter API
-export([start_export/3]).
-export([write/2]).
-export([complete/1]).
-export([discard/1]).
-export([list/1]).
-export([
start/1,
stop/1,
update/2
]).
-type options() :: emqx_s3:profile_config().
-type transfer() :: emqx_ft:transfer().
-type filemeta() :: emqx_ft:filemeta().
-type exportinfo() :: #{
transfer := transfer(),
name := file:name(),
uri := uri_string:uri_string(),
timestamp := emqx_datetime:epoch_second(),
size := _Bytes :: non_neg_integer(),
meta => filemeta()
}.
-type export_st() :: #{
pid := pid(),
meta := filemeta()
}.
%%--------------------------------------------------------------------
%% Exporter behaviour
%%--------------------------------------------------------------------
-spec start_export(options(), transfer(), filemeta()) ->
{ok, export_st()} | {error, term()}.
start_export(_Options, _Transfer, Filemeta = #{name := Filename}) ->
Pid = spawn(fun() -> Filename end),
#{meta => Filemeta, pid => Pid}.
-spec write(export_st(), iodata()) ->
{ok, export_st()} | {error, term()}.
write(ExportSt, _IoData) ->
{ok, ExportSt}.
-spec complete(export_st()) ->
ok | {error, term()}.
complete(_ExportSt) ->
ok.
-spec discard(export_st()) ->
ok.
discard(_ExportSt) ->
ok.
-spec list(options()) ->
{ok, [exportinfo()]} | {error, term()}.
list(_Options) ->
{ok, []}.
%%--------------------------------------------------------------------
%% Exporter behaviour (lifecycle)
%%--------------------------------------------------------------------
-spec start(options()) -> ok | {error, term()}.
start(Options) ->
emqx_s3:start_profile(s3_profile_id(), Options).
-spec stop(options()) -> ok.
stop(_Options) ->
ok = emqx_s3:stop_profile(s3_profile_id()).
-spec update(options(), options()) -> ok.
update(_OldOptions, NewOptions) ->
emqx_s3:update_profile(s3_profile_id(), NewOptions).
%%--------------------------------------------------------------------
%% Internal functions
%% -------------------------------------------------------------------
s3_profile_id() ->
atom_to_binary(?MODULE).

View File

@ -0,0 +1,188 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_exporter_s3).
-include_lib("emqx/include/logger.hrl").
%% Exporter API
-export([start_export/3]).
-export([write/2]).
-export([complete/2]).
-export([discard/1]).
-export([list/1]).
-export([
start/1,
stop/1,
update/2
]).
-type options() :: emqx_s3:profile_config().
-type transfer() :: emqx_ft:transfer().
-type filemeta() :: emqx_ft:filemeta().
-type exportinfo() :: #{
transfer := transfer(),
name := file:name(),
uri := uri_string:uri_string(),
timestamp := emqx_datetime:epoch_second(),
size := _Bytes :: non_neg_integer(),
filemeta => filemeta()
}.
-type export_st() :: #{
pid := pid(),
filemeta := filemeta(),
transfer := transfer()
}.
-define(S3_PROFILE_ID, <<"emqx_ft_storage_exporter_s3">>).
-define(FILEMETA_VSN, <<"1">>).
-define(S3_LIST_LIMIT, 500).
%%--------------------------------------------------------------------
%% Exporter behaviour
%%--------------------------------------------------------------------
-spec start_export(options(), transfer(), filemeta()) ->
{ok, export_st()} | {error, term()}.
start_export(_Options, Transfer, Filemeta) ->
Options = #{
key => s3_key(Transfer, Filemeta),
headers => s3_headers(Transfer, Filemeta)
},
case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of
{ok, Pid} ->
{ok, #{filemeta => Filemeta, pid => Pid}};
{error, _Reason} = Error ->
Error
end.
-spec write(export_st(), iodata()) ->
{ok, export_st()} | {error, term()}.
write(#{pid := Pid} = ExportSt, IoData) ->
case emqx_s3_uploader:write(Pid, IoData) of
ok ->
{ok, ExportSt};
{error, _Reason} = Error ->
Error
end.
-spec complete(export_st(), emqx_ft:checksum()) ->
ok | {error, term()}.
complete(#{pid := Pid} = _ExportSt, _Checksum) ->
emqx_s3_uploader:complete(Pid).
-spec discard(export_st()) ->
ok.
discard(#{pid := Pid} = _ExportSt) ->
emqx_s3_uploader:abort(Pid).
-spec list(options()) ->
{ok, [exportinfo()]} | {error, term()}.
list(Options) ->
emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options) end).
%%--------------------------------------------------------------------
%% Exporter behaviour (lifecycle)
%%--------------------------------------------------------------------
-spec start(options()) -> ok | {error, term()}.
start(Options) ->
emqx_s3:start_profile(?S3_PROFILE_ID, Options).
-spec stop(options()) -> ok.
stop(_Options) ->
ok = emqx_s3:stop_profile(?S3_PROFILE_ID).
-spec update(options(), options()) -> ok.
update(_OldOptions, NewOptions) ->
emqx_s3:update_profile(?S3_PROFILE_ID, NewOptions).
%%--------------------------------------------------------------------
%% Internal functions
%% -------------------------------------------------------------------
s3_key({ClientId, FileId} = _Transfer, #{name := Filename}) ->
filename:join([
emqx_ft_fs_util:escape_filename(ClientId),
emqx_ft_fs_util:escape_filename(FileId),
Filename
]).
s3_headers({ClientId, FileId}, Filemeta) ->
#{
%% The ClientID MUST be a UTF-8 Encoded String
<<"x-amz-meta-clientid">> => ClientId,
%% It [Topic Name] MUST be a UTF-8 Encoded String
<<"x-amz-meta-fileid">> => FileId,
<<"x-amz-meta-filemeta">> => emqx_json:encode(emqx_ft:encode_filemeta(Filemeta)),
<<"x-amz-meta-filemeta-vsn">> => ?FILEMETA_VSN
}.
list(Client, Options) ->
case list_key_info(Client, Options) of
{ok, KeyInfos} ->
{ok,
lists:map(
fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos
)};
{error, _Reason} = Error ->
Error
end.
list_key_info(Client, Options) ->
list_key_info(Client, Options, _Marker = [], _Acc = []).
list_key_info(Client, Options, Marker, Acc) ->
ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker,
case emqx_s3_client:list(Client, ListOptions) of
{ok, Result} ->
?SLOG(warning, #{msg => "list_key_info", result => Result}),
KeyInfos = proplists:get_value(contents, Result, []),
case proplists:get_value(is_truncated, Result, false) of
true ->
NewMarker = [{marker, proplists:get_value(next_marker, Result)}],
list_key_info(Client, Options, NewMarker, [KeyInfos | Acc]);
false ->
{ok, lists:append(lists:reverse([KeyInfos | Acc]))}
end;
{error, _Reason} = Error ->
Error
end.
key_info_to_exportinfo(Client, KeyInfo, _Options) ->
Key = proplists:get_value(key, KeyInfo),
{Transfer, Name} = parse_transfer_and_name(Key),
#{
transfer => Transfer,
name => unicode:characters_to_binary(Name),
uri => emqx_s3_client:uri(Client, Key),
timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)),
size => proplists:get_value(size, KeyInfo)
}.
-define(EPOCH_START, 62167219200).
datetime_to_epoch_second(DateTime) ->
calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START.
parse_transfer_and_name(Key) ->
[ClientId, FileId, Name] = string:split(Key, "/", all),
Transfer = {
emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId)
},
{Transfer, Name}.

View File

@ -39,9 +39,17 @@ emqx_s3_schema {
en: "S3 endpoint port" en: "S3 endpoint port"
} }
} }
url_expire_time {
desc {
en: "The time in seconds for which the signed URLs to the S3 objects are valid."
}
label {
en: "Signed URL expiration time"
}
}
min_part_size { min_part_size {
desc { desc {
en: """The minimum part size for multipart uploads. en: """The minimum part size for multipart uploads.<br/>
Uploaded data will be accumulated in memory until this size is reached.""" Uploaded data will be accumulated in memory until this size is reached."""
} }
label { label {
@ -50,7 +58,7 @@ Uploaded data will be accumulated in memory until this size is reached."""
} }
max_part_size { max_part_size {
desc { desc {
en: """The maximum part size for multipart uploads. en: """The maximum part size for multipart uploads.<br/>
S3 uploader won't try to upload parts larger than this size.""" S3 uploader won't try to upload parts larger than this size."""
} }
label { label {

View File

@ -41,8 +41,11 @@
-type profile_config() :: #{ -type profile_config() :: #{
bucket := string(), bucket := string(),
access_key_id => string(),
secret_access_key => string(),
host := string(), host := string(),
port := pos_integer(), port := pos_integer(),
url_expire_time := pos_integer(),
acl => acl(), acl => acl(),
min_part_size => pos_integer(), min_part_size => pos_integer(),
transport_options => transport_options() transport_options => transport_options()

View File

@ -8,9 +8,6 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("erlcloud/include/erlcloud_aws.hrl"). -include_lib("erlcloud/include/erlcloud_aws.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-export([ -export([
create/1, create/1,
@ -21,11 +18,16 @@
complete_multipart/4, complete_multipart/4,
abort_multipart/3, abort_multipart/3,
list/2, list/2,
uri/2,
format/1 format/1,
format_request/1
]). ]).
-export_type([client/0]). -export_type([
client/0,
headers/0
]).
-type s3_bucket_acl() :: -type s3_bucket_acl() ::
private private
@ -35,7 +37,7 @@
| bucket_owner_read | bucket_owner_read
| bucket_owner_full_control. | bucket_owner_full_control.
-type headers() :: #{binary() => binary()}. -type headers() :: #{binary() | string() => binary() | string()}.
-type key() :: string(). -type key() :: string().
-type part_number() :: non_neg_integer(). -type part_number() :: non_neg_integer().
@ -58,6 +60,7 @@
bucket := string(), bucket := string(),
headers := headers(), headers := headers(),
acl := s3_bucket_acl(), acl := s3_bucket_acl(),
url_expire_time := pos_integer(),
access_key_id := string() | undefined, access_key_id := string() | undefined,
secret_access_key := string() | undefined, secret_access_key := string() | undefined,
http_pool := ecpool:pool_name(), http_pool := ecpool:pool_name(),
@ -76,6 +79,7 @@ create(Config) ->
aws_config => aws_config(Config), aws_config => aws_config(Config),
upload_options => upload_options(Config), upload_options => upload_options(Config),
bucket => maps:get(bucket, Config), bucket => maps:get(bucket, Config),
url_expire_time => maps:get(url_expire_time, Config),
headers => headers(Config) headers => headers(Config)
}. }.
@ -85,7 +89,7 @@ put_object(
Key, Key,
Value Value
) -> ) ->
try erlcloud_s3:put_object(Bucket, Key, Value, Options, Headers, AwsConfig) of try erlcloud_s3:put_object(Bucket, key(Key), Value, Options, Headers, AwsConfig) of
Props when is_list(Props) -> Props when is_list(Props) ->
ok ok
catch catch
@ -99,7 +103,7 @@ start_multipart(
#{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig}, #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
Key Key
) -> ) ->
case erlcloud_s3:start_multipart(Bucket, Key, Options, Headers, AwsConfig) of case erlcloud_s3:start_multipart(Bucket, key(Key), Options, Headers, AwsConfig) of
{ok, Props} -> {ok, Props} ->
{ok, proplists:get_value(uploadId, Props)}; {ok, proplists:get_value(uploadId, Props)};
{error, Reason} -> {error, Reason} ->
@ -116,7 +120,9 @@ upload_part(
PartNumber, PartNumber,
Value Value
) -> ) ->
case erlcloud_s3:upload_part(Bucket, Key, UploadId, PartNumber, Value, Headers, AwsConfig) of case
erlcloud_s3:upload_part(Bucket, key(Key), UploadId, PartNumber, Value, Headers, AwsConfig)
of
{ok, Props} -> {ok, Props} ->
{ok, proplists:get_value(etag, Props)}; {ok, proplists:get_value(etag, Props)};
{error, Reason} -> {error, Reason} ->
@ -126,9 +132,12 @@ upload_part(
-spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()). -spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()).
complete_multipart( complete_multipart(
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId, ETags #{bucket := Bucket, headers := Headers, aws_config := AwsConfig},
Key,
UploadId,
ETags
) -> ) ->
case erlcloud_s3:complete_multipart(Bucket, Key, UploadId, ETags, Headers, AwsConfig) of case erlcloud_s3:complete_multipart(Bucket, key(Key), UploadId, ETags, Headers, AwsConfig) of
ok -> ok ->
ok; ok;
{error, Reason} -> {error, Reason} ->
@ -138,7 +147,7 @@ complete_multipart(
-spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()). -spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()).
abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) -> abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) ->
case erlcloud_s3:abort_multipart(Bucket, Key, UploadId, [], Headers, AwsConfig) of case erlcloud_s3:abort_multipart(Bucket, key(Key), UploadId, [], Headers, AwsConfig) of
ok -> ok ->
ok; ok;
{error, Reason} -> {error, Reason} ->
@ -156,6 +165,10 @@ list(#{bucket := Bucket, aws_config := AwsConfig}, Options) ->
{error, Reason} {error, Reason}
end. end.
-spec uri(client(), key()) -> iodata().
uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) ->
erlcloud_s3:make_get_url(ExpireTime, Bucket, key(Key), AwsConfig).
-spec format(client()) -> term(). -spec format(client()) -> term().
format(#{aws_config := AwsConfig} = Client) -> format(#{aws_config := AwsConfig} = Client) ->
Client#{aws_config => AwsConfig#aws_config{secret_access_key = "***"}}. Client#{aws_config => AwsConfig#aws_config{secret_access_key = "***"}}.
@ -170,13 +183,14 @@ upload_options(Config) ->
]. ].
headers(#{headers := Headers}) -> headers(#{headers := Headers}) ->
maps:to_list(Headers). string_headers(maps:to_list(Headers));
headers(#{}) ->
[].
aws_config(#{ aws_config(#{
scheme := Scheme, scheme := Scheme,
host := Host, host := Host,
port := Port, port := Port,
headers := Headers,
access_key_id := AccessKeyId, access_key_id := AccessKeyId,
secret_access_key := SecretAccessKey, secret_access_key := SecretAccessKey,
http_pool := HttpPool, http_pool := HttpPool,
@ -187,23 +201,29 @@ aws_config(#{
s3_host = Host, s3_host = Host,
s3_port = Port, s3_port = Port,
s3_bucket_access_method = path, s3_bucket_access_method = path,
s3_bucket_after_host = true,
access_key_id = AccessKeyId, access_key_id = AccessKeyId,
secret_access_key = SecretAccessKey, secret_access_key = SecretAccessKey,
http_client = request_fun(Headers, HttpPool), http_client = request_fun(HttpPool),
timeout = Timeout timeout = Timeout
}. }.
-type http_headers() :: [{binary(), binary()}].
-type http_pool() :: term(). -type http_pool() :: term().
-spec request_fun(http_headers(), http_pool()) -> erlcloud_httpc:request_fun(). -spec request_fun(http_pool()) -> erlcloud_httpc:request_fun().
request_fun(CustomHeaders, HttpPool) -> request_fun(HttpPool) ->
fun(Url, Method, Headers, Body, Timeout, _Config) -> fun(Url, Method, Headers, Body, Timeout, _Config) ->
with_path_and_query_only(Url, fun(PathQuery) -> with_path_and_query_only(Url, fun(PathQuery) ->
JoinedHeaders = join_headers(Headers, CustomHeaders), Request = make_request(Method, PathQuery, binary_headers(Headers), Body),
Request = make_request(Method, PathQuery, JoinedHeaders, Body), ?SLOG(warning, #{
msg => "s3_ehttpc_request",
timeout => Timeout,
pool => HttpPool,
method => Method,
request => Request
}),
ehttpc_request(HttpPool, Method, Request, Timeout) ehttpc_request(HttpPool, Method, Request, Timeout)
end) end)
end. end.
@ -211,9 +231,9 @@ request_fun(CustomHeaders, HttpPool) ->
ehttpc_request(HttpPool, Method, Request, Timeout) -> ehttpc_request(HttpPool, Method, Request, Timeout) ->
try ehttpc:request(HttpPool, Method, Request, Timeout) of try ehttpc:request(HttpPool, Method, Request, Timeout) of
{ok, StatusCode, RespHeaders} -> {ok, StatusCode, RespHeaders} ->
{ok, {{StatusCode, undefined}, string_headers(RespHeaders), undefined}}; {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), undefined}};
{ok, StatusCode, RespHeaders, RespBody} -> {ok, StatusCode, RespHeaders, RespBody} ->
{ok, {{StatusCode, undefined}, string_headers(RespHeaders), RespBody}}; {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), RespBody}};
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "s3_ehttpc_request_fail", msg => "s3_ehttpc_request_fail",
@ -258,16 +278,6 @@ make_request(_Method, PathQuery, Headers, Body) ->
format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}. format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}.
join_headers(Headers, CustomHeaders) ->
MapHeaders = lists:foldl(
fun({K, V}, MHeaders) ->
maps:put(to_binary(K), V, MHeaders)
end,
#{},
Headers ++ maps:to_list(CustomHeaders)
),
maps:to_list(MapHeaders).
with_path_and_query_only(Url, Fun) -> with_path_and_query_only(Url, Fun) ->
case string:split(Url, "//", leading) of case string:split(Url, "//", leading) of
[_Scheme, UrlRem] -> [_Scheme, UrlRem] ->
@ -281,13 +291,22 @@ with_path_and_query_only(Url, Fun) ->
{error, {invalid_url, Url}} {error, {invalid_url, Url}}
end. end.
string_headers(Headers) ->
[{to_list_string(K), to_list_string(V)} || {K, V} <- Headers].
erlcloud_string_headers(Headers) ->
[{string:to_lower(K), V} || {K, V} <- string_headers(Headers)].
binary_headers(Headers) ->
[{to_binary(K), V} || {K, V} <- Headers].
to_binary(Val) when is_list(Val) -> list_to_binary(Val); to_binary(Val) when is_list(Val) -> list_to_binary(Val);
to_binary(Val) when is_binary(Val) -> Val. to_binary(Val) when is_binary(Val) -> Val.
string_headers(Hdrs) ->
[{string:to_lower(to_list_string(K)), to_list_string(V)} || {K, V} <- Hdrs].
to_list_string(Val) when is_binary(Val) -> to_list_string(Val) when is_binary(Val) ->
binary_to_list(Val); binary_to_list(Val);
to_list_string(Val) when is_list(Val) -> to_list_string(Val) when is_list(Val) ->
Val. Val.
key(Characters) ->
binary_to_list(unicode:characters_to_binary(Characters)).

View File

@ -211,6 +211,7 @@ client_config(ProfileConfig, PoolName) ->
scheme => scheme(HTTPOpts), scheme => scheme(HTTPOpts),
host => maps:get(host, ProfileConfig), host => maps:get(host, ProfileConfig),
port => maps:get(port, ProfileConfig), port => maps:get(port, ProfileConfig),
url_expire_time => maps:get(url_expire_time, ProfileConfig),
headers => maps:get(headers, HTTPOpts, #{}), headers => maps:get(headers, HTTPOpts, #{}),
acl => maps:get(acl, ProfileConfig), acl => maps:get(acl, ProfileConfig),
bucket => maps:get(bucket, ProfileConfig), bucket => maps:get(bucket, ProfileConfig),

View File

@ -63,6 +63,15 @@ fields(s3) ->
required => true required => true
} }
)}, )},
{url_expire_time,
mk(
emqx_schema:duration_s(),
#{
default => "1h",
desc => ?DESC("url_expire_time"),
required => false
}
)},
{min_part_size, {min_part_size,
mk( mk(
emqx_schema:bytesize(), emqx_schema:bytesize(),

View File

@ -12,8 +12,13 @@
start_link/2, start_link/2,
write/2, write/2,
write/3,
complete/1, complete/1,
abort/1 complete/2,
abort/1,
abort/2
]). ]).
-export([ -export([
@ -26,14 +31,11 @@
format_status/2 format_status/2
]). ]).
-export_type([opts/0, config/0]). -export_type([opts/0]).
-type opts() :: #{ -type opts() :: #{
name := string() key := string(),
}. headers => emqx_s3_client:headers()
-type config() :: #{
min_part_size := pos_integer()
}. }.
-type data() :: #{ -type data() :: #{
@ -58,12 +60,12 @@
start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) -> start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) ->
gen_statem:start_link(?MODULE, [ProfileId, Opts], []). gen_statem:start_link(?MODULE, [ProfileId, Opts], []).
-spec write(pid(), binary()) -> ok_or_error(term()). -spec write(pid(), iodata()) -> ok_or_error(term()).
write(Pid, WriteData) when is_binary(WriteData) -> write(Pid, WriteData) ->
write(Pid, WriteData, infinity). write(Pid, WriteData, infinity).
-spec write(pid(), binary(), timeout()) -> ok_or_error(term()). -spec write(pid(), iodata(), timeout()) -> ok_or_error(term()).
write(Pid, WriteData, Timeout) when is_binary(WriteData) -> write(Pid, WriteData, Timeout) ->
gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout). gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout).
-spec complete(pid()) -> ok_or_error(term()). -spec complete(pid()) -> ok_or_error(term()).
@ -88,10 +90,10 @@ abort(Pid, Timeout) ->
callback_mode() -> handle_event_function. callback_mode() -> handle_event_function.
init([ProfileId, #{key := Key}]) -> init([ProfileId, #{key := Key} = Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId), {ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId),
Client = emqx_s3_client:create(ClientConfig), Client = client(ClientConfig, Opts),
{ok, upload_not_started, #{ {ok, upload_not_started, #{
profile_id => ProfileId, profile_id => ProfileId,
client => Client, client => Client,
@ -111,7 +113,7 @@ handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) ->
true -> true ->
handle_write(State, From, WriteData, Data0); handle_write(State, From, WriteData, Data0);
false -> false ->
{keep_state_and_data, {reply, From, {error, {too_large, byte_size(WriteData)}}}} {keep_state_and_data, {reply, From, {error, {too_large, iolist_size(WriteData)}}}}
end; end;
handle_event({call, From}, complete, upload_not_started, Data0) -> handle_event({call, From}, complete, upload_not_started, Data0) ->
case put_object(Data0) of case put_object(Data0) of
@ -218,7 +220,6 @@ maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = D
true -> true ->
upload_part(Data); upload_part(Data);
false -> false ->
% ct:print("buffer size: ~p, max part size: ~p, no upload", [BufferSize, MinPartSize]),
{ok, Data} {ok, Data}
end. end.
@ -237,7 +238,6 @@ upload_part(
) -> ) ->
case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of
{ok, ETag} -> {ok, ETag} ->
% ct:print("upload part ~p, etag: ~p", [PartNumber, ETag]),
NewData = Data#{ NewData = Data#{
buffer => [], buffer => [],
buffer_size => 0, buffer_size => 0,
@ -246,7 +246,6 @@ upload_part(
}, },
{ok, NewData}; {ok, NewData};
{error, _} = Error -> {error, _} = Error ->
% ct:print("upload part ~p failed: ~p", [PartNumber, Error]),
Error Error
end. end.
@ -260,7 +259,11 @@ complete_upload(
) -> ) ->
case upload_part(Data0) of case upload_part(Data0) of
{ok, #{etags := ETags} = Data1} -> {ok, #{etags := ETags} = Data1} ->
case emqx_s3_client:complete_multipart(Client, Key, UploadId, lists:reverse(ETags)) of case
emqx_s3_client:complete_multipart(
Client, Key, UploadId, lists:reverse(ETags)
)
of
ok -> ok ->
{ok, Data1}; {ok, Data1};
{error, _} = Error -> {error, _} = Error ->
@ -300,11 +303,11 @@ put_object(
Error Error
end. end.
-spec append_buffer(data(), binary()) -> data(). -spec append_buffer(data(), iodata()) -> data().
append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) -> append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) ->
Data#{ Data#{
buffer => [WriteData | Buffer], buffer => [WriteData | Buffer],
buffer_size => BufferSize + byte_size(WriteData) buffer_size => BufferSize + iolist_size(WriteData)
}. }.
-compile({inline, [wrap/1, unwrap/1]}). -compile({inline, [wrap/1, unwrap/1]}).
@ -315,4 +318,8 @@ unwrap(WrappedData) ->
WrappedData(). WrappedData().
is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
BufferSize + byte_size(WriteData) =< MaxPartSize. BufferSize + iolist_size(WriteData) =< MaxPartSize.
client(Config, Opts) ->
Headers = maps:get(headers, Opts, #{}),
emqx_s3_client:create(Config#{headers => Headers}).

View File

@ -83,6 +83,39 @@ t_simple_put(Config) ->
ok = emqx_s3_client:put_object(Client, Key, Data). ok = emqx_s3_client:put_object(Client, Key, Data).
t_list(Config) ->
Key = ?config(key, Config),
Client = client(Config),
ok = emqx_s3_client:put_object(Client, Key, <<"data">>),
{ok, List} = emqx_s3_client:list(Client, Key),
[KeyInfo] = proplists:get_value(contents, List),
?assertMatch(
#{
key := Key,
size := 4,
etag := _,
last_modified := _
},
maps:from_list(KeyInfo)
).
t_url(Config) ->
Key = ?config(key, Config),
Client = client(Config),
ok = emqx_s3_client:put_object(Client, Key, <<"data">>),
Url = emqx_s3_client:url(Client, Key),
?assertMatch(
{ok, {{_StatusLine, 200, "OK"}, _Headers, "data"}},
httpc:request(Url)
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------