diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl
index 064b6e066..c5522cdbd 100644
--- a/apps/emqx_ft/src/emqx_ft.erl
+++ b/apps/emqx_ft/src/emqx_ft.erl
@@ -118,7 +118,6 @@ decode_filemeta(Map) when is_map(Map) ->
end.
encode_filemeta(Meta = #{}) ->
- % TODO: Looks like this should be hocon's responsibility.
Schema = emqx_ft_schema:schema(filemeta),
hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}).
diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl
index a667454b6..390c10557 100644
--- a/apps/emqx_ft/src/emqx_ft_api.erl
+++ b/apps/emqx_ft/src/emqx_ft_api.erl
@@ -69,6 +69,7 @@ schema("/file_transfer/files") ->
'/file_transfer/files'(get, #{}) ->
case emqx_ft_storage:files() of
{ok, Files} ->
+ ?SLOG(warning, #{msg => "files", files => Files}),
{200, #{<<"files">> => lists:map(fun format_file_info/1, Files)}};
{error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
@@ -94,7 +95,7 @@ format_file_info(
}
) ->
Res = #{
- name => iolist_to_binary(Name),
+ name => format_name(Name),
size => Size,
timestamp => format_timestamp(Timestamp),
clientid => ClientId,
@@ -110,3 +111,8 @@ format_file_info(
format_timestamp(Timestamp) ->
iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
+
+format_name(NameBin) when is_binary(NameBin) ->
+ NameBin;
+format_name(Name) when is_list(Name) ->
+ iolist_to_binary(Name).
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_s3.erl
deleted file mode 100644
index 977ff576a..000000000
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_s3.erl
+++ /dev/null
@@ -1,102 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_ft_storage_exporter_s3).
-
--include_lib("emqx/include/logger.hrl").
-
-%% Exporter API
--export([start_export/3]).
--export([write/2]).
--export([complete/1]).
--export([discard/1]).
--export([list/1]).
-
--export([
- start/1,
- stop/1,
- update/2
-]).
-
--type options() :: emqx_s3:profile_config().
--type transfer() :: emqx_ft:transfer().
--type filemeta() :: emqx_ft:filemeta().
--type exportinfo() :: #{
- transfer := transfer(),
- name := file:name(),
- uri := uri_string:uri_string(),
- timestamp := emqx_datetime:epoch_second(),
- size := _Bytes :: non_neg_integer(),
- meta => filemeta()
-}.
-
--type export_st() :: #{
- pid := pid(),
- meta := filemeta()
-}.
-
-%%--------------------------------------------------------------------
-%% Exporter behaviour
-%%--------------------------------------------------------------------
-
--spec start_export(options(), transfer(), filemeta()) ->
- {ok, export_st()} | {error, term()}.
-start_export(_Options, _Transfer, Filemeta = #{name := Filename}) ->
- Pid = spawn(fun() -> Filename end),
- #{meta => Filemeta, pid => Pid}.
-
--spec write(export_st(), iodata()) ->
- {ok, export_st()} | {error, term()}.
-write(ExportSt, _IoData) ->
- {ok, ExportSt}.
-
--spec complete(export_st()) ->
- ok | {error, term()}.
-complete(_ExportSt) ->
- ok.
-
--spec discard(export_st()) ->
- ok.
-discard(_ExportSt) ->
- ok.
-
--spec list(options()) ->
- {ok, [exportinfo()]} | {error, term()}.
-list(_Options) ->
- {ok, []}.
-
-%%--------------------------------------------------------------------
-%% Exporter behaviour (lifecycle)
-%%--------------------------------------------------------------------
-
--spec start(options()) -> ok | {error, term()}.
-start(Options) ->
- emqx_s3:start_profile(s3_profile_id(), Options).
-
--spec stop(options()) -> ok.
-stop(_Options) ->
- ok = emqx_s3:stop_profile(s3_profile_id()).
-
--spec update(options(), options()) -> ok.
-update(_OldOptions, NewOptions) ->
- emqx_s3:update_profile(s3_profile_id(), NewOptions).
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%% -------------------------------------------------------------------
-
-s3_profile_id() ->
- atom_to_binary(?MODULE).
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl
similarity index 100%
rename from apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs.erl
rename to apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
similarity index 100%
rename from apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_api.erl
rename to apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl
similarity index 100%
rename from apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_proxy.erl
rename to apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl
new file mode 100644
index 000000000..1c97520e3
--- /dev/null
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl
@@ -0,0 +1,188 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_exporter_s3).
+
+-include_lib("emqx/include/logger.hrl").
+
+%% Exporter API
+-export([start_export/3]).
+-export([write/2]).
+-export([complete/2]).
+-export([discard/1]).
+-export([list/1]).
+
+-export([
+ start/1,
+ stop/1,
+ update/2
+]).
+
+-type options() :: emqx_s3:profile_config().
+-type transfer() :: emqx_ft:transfer().
+-type filemeta() :: emqx_ft:filemeta().
+-type exportinfo() :: #{
+ transfer := transfer(),
+ name := file:name(),
+ uri := uri_string:uri_string(),
+ timestamp := emqx_datetime:epoch_second(),
+ size := _Bytes :: non_neg_integer(),
+ filemeta => filemeta()
+}.
+
+-type export_st() :: #{
+ pid := pid(),
+ filemeta := filemeta(),
+ transfer := transfer()
+}.
+
+-define(S3_PROFILE_ID, <<"emqx_ft_storage_exporter_s3">>).
+-define(FILEMETA_VSN, <<"1">>).
+-define(S3_LIST_LIMIT, 500).
+
+%%--------------------------------------------------------------------
+%% Exporter behaviour
+%%--------------------------------------------------------------------
+
+-spec start_export(options(), transfer(), filemeta()) ->
+ {ok, export_st()} | {error, term()}.
+start_export(_Options, Transfer, Filemeta) ->
+ Options = #{
+ key => s3_key(Transfer, Filemeta),
+ headers => s3_headers(Transfer, Filemeta)
+ },
+ case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of
+ {ok, Pid} ->
+ {ok, #{filemeta => Filemeta, pid => Pid}};
+ {error, _Reason} = Error ->
+ Error
+ end.
+
+-spec write(export_st(), iodata()) ->
+ {ok, export_st()} | {error, term()}.
+write(#{pid := Pid} = ExportSt, IoData) ->
+ case emqx_s3_uploader:write(Pid, IoData) of
+ ok ->
+ {ok, ExportSt};
+ {error, _Reason} = Error ->
+ Error
+ end.
+
+-spec complete(export_st(), emqx_ft:checksum()) ->
+ ok | {error, term()}.
+complete(#{pid := Pid} = _ExportSt, _Checksum) ->
+ emqx_s3_uploader:complete(Pid).
+
+-spec discard(export_st()) ->
+ ok.
+discard(#{pid := Pid} = _ExportSt) ->
+ emqx_s3_uploader:abort(Pid).
+
+-spec list(options()) ->
+ {ok, [exportinfo()]} | {error, term()}.
+list(Options) ->
+ emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options) end).
+
+%%--------------------------------------------------------------------
+%% Exporter behaviour (lifecycle)
+%%--------------------------------------------------------------------
+
+-spec start(options()) -> ok | {error, term()}.
+start(Options) ->
+ emqx_s3:start_profile(?S3_PROFILE_ID, Options).
+
+-spec stop(options()) -> ok.
+stop(_Options) ->
+ ok = emqx_s3:stop_profile(?S3_PROFILE_ID).
+
+-spec update(options(), options()) -> ok.
+update(_OldOptions, NewOptions) ->
+ emqx_s3:update_profile(?S3_PROFILE_ID, NewOptions).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%% -------------------------------------------------------------------
+
+s3_key({ClientId, FileId} = _Transfer, #{name := Filename}) ->
+ filename:join([
+ emqx_ft_fs_util:escape_filename(ClientId),
+ emqx_ft_fs_util:escape_filename(FileId),
+ Filename
+ ]).
+
+s3_headers({ClientId, FileId}, Filemeta) ->
+ #{
+ %% The ClientID MUST be a UTF-8 Encoded String
+ <<"x-amz-meta-clientid">> => ClientId,
+ %% It [Topic Name] MUST be a UTF-8 Encoded String
+ <<"x-amz-meta-fileid">> => FileId,
+ <<"x-amz-meta-filemeta">> => emqx_json:encode(emqx_ft:encode_filemeta(Filemeta)),
+ <<"x-amz-meta-filemeta-vsn">> => ?FILEMETA_VSN
+ }.
+
+list(Client, Options) ->
+ case list_key_info(Client, Options) of
+ {ok, KeyInfos} ->
+ {ok,
+ lists:map(
+ fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos
+ )};
+ {error, _Reason} = Error ->
+ Error
+ end.
+
+list_key_info(Client, Options) ->
+ list_key_info(Client, Options, _Marker = [], _Acc = []).
+
+list_key_info(Client, Options, Marker, Acc) ->
+ ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker,
+ case emqx_s3_client:list(Client, ListOptions) of
+ {ok, Result} ->
+ ?SLOG(warning, #{msg => "list_key_info", result => Result}),
+ KeyInfos = proplists:get_value(contents, Result, []),
+ case proplists:get_value(is_truncated, Result, false) of
+ true ->
+ NewMarker = [{marker, proplists:get_value(next_marker, Result)}],
+ list_key_info(Client, Options, NewMarker, [KeyInfos | Acc]);
+ false ->
+ {ok, lists:append(lists:reverse([KeyInfos | Acc]))}
+ end;
+ {error, _Reason} = Error ->
+ Error
+ end.
+
+key_info_to_exportinfo(Client, KeyInfo, _Options) ->
+ Key = proplists:get_value(key, KeyInfo),
+ {Transfer, Name} = parse_transfer_and_name(Key),
+ #{
+ transfer => Transfer,
+ name => unicode:characters_to_binary(Name),
+ uri => emqx_s3_client:uri(Client, Key),
+ timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)),
+ size => proplists:get_value(size, KeyInfo)
+ }.
+
+-define(EPOCH_START, 62167219200).
+
+datetime_to_epoch_second(DateTime) ->
+ calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START.
+
+parse_transfer_and_name(Key) ->
+ [ClientId, FileId, Name] = string:split(Key, "/", all),
+ Transfer = {
+ emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId)
+ },
+ {Transfer, Name}.
diff --git a/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf b/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf
index 4e0870bae..99004d62a 100644
--- a/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf
+++ b/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf
@@ -39,9 +39,17 @@ emqx_s3_schema {
en: "S3 endpoint port"
}
}
+ url_expire_time {
+ desc {
+ en: "The time in seconds for which the signed URLs to the S3 objects are valid."
+ }
+ label {
+ en: "Signed URL expiration time"
+ }
+ }
min_part_size {
desc {
- en: """The minimum part size for multipart uploads.
+ en: """The minimum part size for multipart uploads.
Uploaded data will be accumulated in memory until this size is reached."""
}
label {
@@ -50,7 +58,7 @@ Uploaded data will be accumulated in memory until this size is reached."""
}
max_part_size {
desc {
- en: """The maximum part size for multipart uploads.
+ en: """The maximum part size for multipart uploads.
S3 uploader won't try to upload parts larger than this size."""
}
label {
diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl
index 7c78de979..8798b608d 100644
--- a/apps/emqx_s3/src/emqx_s3.erl
+++ b/apps/emqx_s3/src/emqx_s3.erl
@@ -41,8 +41,11 @@
-type profile_config() :: #{
bucket := string(),
+ access_key_id => string(),
+ secret_access_key => string(),
host := string(),
port := pos_integer(),
+ url_expire_time := pos_integer(),
acl => acl(),
min_part_size => pos_integer(),
transport_options => transport_options()
diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl
index 01d677922..a4d470a97 100644
--- a/apps/emqx_s3/src/emqx_s3_client.erl
+++ b/apps/emqx_s3/src/emqx_s3_client.erl
@@ -8,9 +8,6 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("erlcloud/include/erlcloud_aws.hrl").
--compile(nowarn_export_all).
--compile(export_all).
-
-export([
create/1,
@@ -21,11 +18,16 @@
complete_multipart/4,
abort_multipart/3,
list/2,
+ uri/2,
- format/1
+ format/1,
+ format_request/1
]).
--export_type([client/0]).
+-export_type([
+ client/0,
+ headers/0
+]).
-type s3_bucket_acl() ::
private
@@ -35,7 +37,7 @@
| bucket_owner_read
| bucket_owner_full_control.
--type headers() :: #{binary() => binary()}.
+-type headers() :: #{binary() | string() => binary() | string()}.
-type key() :: string().
-type part_number() :: non_neg_integer().
@@ -58,6 +60,7 @@
bucket := string(),
headers := headers(),
acl := s3_bucket_acl(),
+ url_expire_time := pos_integer(),
access_key_id := string() | undefined,
secret_access_key := string() | undefined,
http_pool := ecpool:pool_name(),
@@ -76,6 +79,7 @@ create(Config) ->
aws_config => aws_config(Config),
upload_options => upload_options(Config),
bucket => maps:get(bucket, Config),
+ url_expire_time => maps:get(url_expire_time, Config),
headers => headers(Config)
}.
@@ -85,7 +89,7 @@ put_object(
Key,
Value
) ->
- try erlcloud_s3:put_object(Bucket, Key, Value, Options, Headers, AwsConfig) of
+ try erlcloud_s3:put_object(Bucket, key(Key), Value, Options, Headers, AwsConfig) of
Props when is_list(Props) ->
ok
catch
@@ -99,7 +103,7 @@ start_multipart(
#{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
Key
) ->
- case erlcloud_s3:start_multipart(Bucket, Key, Options, Headers, AwsConfig) of
+ case erlcloud_s3:start_multipart(Bucket, key(Key), Options, Headers, AwsConfig) of
{ok, Props} ->
{ok, proplists:get_value(uploadId, Props)};
{error, Reason} ->
@@ -116,7 +120,9 @@ upload_part(
PartNumber,
Value
) ->
- case erlcloud_s3:upload_part(Bucket, Key, UploadId, PartNumber, Value, Headers, AwsConfig) of
+ case
+ erlcloud_s3:upload_part(Bucket, key(Key), UploadId, PartNumber, Value, Headers, AwsConfig)
+ of
{ok, Props} ->
{ok, proplists:get_value(etag, Props)};
{error, Reason} ->
@@ -126,9 +132,12 @@ upload_part(
-spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()).
complete_multipart(
- #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId, ETags
+ #{bucket := Bucket, headers := Headers, aws_config := AwsConfig},
+ Key,
+ UploadId,
+ ETags
) ->
- case erlcloud_s3:complete_multipart(Bucket, Key, UploadId, ETags, Headers, AwsConfig) of
+ case erlcloud_s3:complete_multipart(Bucket, key(Key), UploadId, ETags, Headers, AwsConfig) of
ok ->
ok;
{error, Reason} ->
@@ -138,7 +147,7 @@ complete_multipart(
-spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()).
abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) ->
- case erlcloud_s3:abort_multipart(Bucket, Key, UploadId, [], Headers, AwsConfig) of
+ case erlcloud_s3:abort_multipart(Bucket, key(Key), UploadId, [], Headers, AwsConfig) of
ok ->
ok;
{error, Reason} ->
@@ -156,6 +165,10 @@ list(#{bucket := Bucket, aws_config := AwsConfig}, Options) ->
{error, Reason}
end.
+-spec uri(client(), key()) -> iodata().
+uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) ->
+ erlcloud_s3:make_get_url(ExpireTime, Bucket, key(Key), AwsConfig).
+
-spec format(client()) -> term().
format(#{aws_config := AwsConfig} = Client) ->
Client#{aws_config => AwsConfig#aws_config{secret_access_key = "***"}}.
@@ -170,13 +183,14 @@ upload_options(Config) ->
].
headers(#{headers := Headers}) ->
- maps:to_list(Headers).
+ string_headers(maps:to_list(Headers));
+headers(#{}) ->
+ [].
aws_config(#{
scheme := Scheme,
host := Host,
port := Port,
- headers := Headers,
access_key_id := AccessKeyId,
secret_access_key := SecretAccessKey,
http_pool := HttpPool,
@@ -187,23 +201,29 @@ aws_config(#{
s3_host = Host,
s3_port = Port,
s3_bucket_access_method = path,
+ s3_bucket_after_host = true,
access_key_id = AccessKeyId,
secret_access_key = SecretAccessKey,
- http_client = request_fun(Headers, HttpPool),
+ http_client = request_fun(HttpPool),
timeout = Timeout
}.
--type http_headers() :: [{binary(), binary()}].
-type http_pool() :: term().
--spec request_fun(http_headers(), http_pool()) -> erlcloud_httpc:request_fun().
-request_fun(CustomHeaders, HttpPool) ->
+-spec request_fun(http_pool()) -> erlcloud_httpc:request_fun().
+request_fun(HttpPool) ->
fun(Url, Method, Headers, Body, Timeout, _Config) ->
with_path_and_query_only(Url, fun(PathQuery) ->
- JoinedHeaders = join_headers(Headers, CustomHeaders),
- Request = make_request(Method, PathQuery, JoinedHeaders, Body),
+ Request = make_request(Method, PathQuery, binary_headers(Headers), Body),
+ ?SLOG(warning, #{
+ msg => "s3_ehttpc_request",
+ timeout => Timeout,
+ pool => HttpPool,
+ method => Method,
+ request => Request
+ }),
ehttpc_request(HttpPool, Method, Request, Timeout)
end)
end.
@@ -211,9 +231,9 @@ request_fun(CustomHeaders, HttpPool) ->
ehttpc_request(HttpPool, Method, Request, Timeout) ->
try ehttpc:request(HttpPool, Method, Request, Timeout) of
{ok, StatusCode, RespHeaders} ->
- {ok, {{StatusCode, undefined}, string_headers(RespHeaders), undefined}};
+ {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), undefined}};
{ok, StatusCode, RespHeaders, RespBody} ->
- {ok, {{StatusCode, undefined}, string_headers(RespHeaders), RespBody}};
+ {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), RespBody}};
{error, Reason} ->
?SLOG(error, #{
msg => "s3_ehttpc_request_fail",
@@ -258,16 +278,6 @@ make_request(_Method, PathQuery, Headers, Body) ->
format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}.
-join_headers(Headers, CustomHeaders) ->
- MapHeaders = lists:foldl(
- fun({K, V}, MHeaders) ->
- maps:put(to_binary(K), V, MHeaders)
- end,
- #{},
- Headers ++ maps:to_list(CustomHeaders)
- ),
- maps:to_list(MapHeaders).
-
with_path_and_query_only(Url, Fun) ->
case string:split(Url, "//", leading) of
[_Scheme, UrlRem] ->
@@ -281,13 +291,22 @@ with_path_and_query_only(Url, Fun) ->
{error, {invalid_url, Url}}
end.
+string_headers(Headers) ->
+ [{to_list_string(K), to_list_string(V)} || {K, V} <- Headers].
+
+erlcloud_string_headers(Headers) ->
+ [{string:to_lower(K), V} || {K, V} <- string_headers(Headers)].
+
+binary_headers(Headers) ->
+ [{to_binary(K), V} || {K, V} <- Headers].
+
to_binary(Val) when is_list(Val) -> list_to_binary(Val);
to_binary(Val) when is_binary(Val) -> Val.
-string_headers(Hdrs) ->
- [{string:to_lower(to_list_string(K)), to_list_string(V)} || {K, V} <- Hdrs].
-
to_list_string(Val) when is_binary(Val) ->
binary_to_list(Val);
to_list_string(Val) when is_list(Val) ->
Val.
+
+key(Characters) ->
+ binary_to_list(unicode:characters_to_binary(Characters)).
diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl
index 09e945edc..3c5eb8f36 100644
--- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl
+++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl
@@ -211,6 +211,7 @@ client_config(ProfileConfig, PoolName) ->
scheme => scheme(HTTPOpts),
host => maps:get(host, ProfileConfig),
port => maps:get(port, ProfileConfig),
+ url_expire_time => maps:get(url_expire_time, ProfileConfig),
headers => maps:get(headers, HTTPOpts, #{}),
acl => maps:get(acl, ProfileConfig),
bucket => maps:get(bucket, ProfileConfig),
diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl
index 5d76e7120..6db9a5886 100644
--- a/apps/emqx_s3/src/emqx_s3_schema.erl
+++ b/apps/emqx_s3/src/emqx_s3_schema.erl
@@ -63,6 +63,15 @@ fields(s3) ->
required => true
}
)},
+ {url_expire_time,
+ mk(
+ emqx_schema:duration_s(),
+ #{
+ default => "1h",
+ desc => ?DESC("url_expire_time"),
+ required => false
+ }
+ )},
{min_part_size,
mk(
emqx_schema:bytesize(),
diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl
index 4e3fe15f2..07ae5bd19 100644
--- a/apps/emqx_s3/src/emqx_s3_uploader.erl
+++ b/apps/emqx_s3/src/emqx_s3_uploader.erl
@@ -12,8 +12,13 @@
start_link/2,
write/2,
+ write/3,
+
complete/1,
- abort/1
+ complete/2,
+
+ abort/1,
+ abort/2
]).
-export([
@@ -26,14 +31,11 @@
format_status/2
]).
--export_type([opts/0, config/0]).
+-export_type([opts/0]).
-type opts() :: #{
- name := string()
-}.
-
--type config() :: #{
- min_part_size := pos_integer()
+ key := string(),
+ headers => emqx_s3_client:headers()
}.
-type data() :: #{
@@ -58,12 +60,12 @@
start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) ->
gen_statem:start_link(?MODULE, [ProfileId, Opts], []).
--spec write(pid(), binary()) -> ok_or_error(term()).
-write(Pid, WriteData) when is_binary(WriteData) ->
+-spec write(pid(), iodata()) -> ok_or_error(term()).
+write(Pid, WriteData) ->
write(Pid, WriteData, infinity).
--spec write(pid(), binary(), timeout()) -> ok_or_error(term()).
-write(Pid, WriteData, Timeout) when is_binary(WriteData) ->
+-spec write(pid(), iodata(), timeout()) -> ok_or_error(term()).
+write(Pid, WriteData, Timeout) ->
gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout).
-spec complete(pid()) -> ok_or_error(term()).
@@ -88,10 +90,10 @@ abort(Pid, Timeout) ->
callback_mode() -> handle_event_function.
-init([ProfileId, #{key := Key}]) ->
+init([ProfileId, #{key := Key} = Opts]) ->
process_flag(trap_exit, true),
{ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId),
- Client = emqx_s3_client:create(ClientConfig),
+ Client = client(ClientConfig, Opts),
{ok, upload_not_started, #{
profile_id => ProfileId,
client => Client,
@@ -111,7 +113,7 @@ handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) ->
true ->
handle_write(State, From, WriteData, Data0);
false ->
- {keep_state_and_data, {reply, From, {error, {too_large, byte_size(WriteData)}}}}
+ {keep_state_and_data, {reply, From, {error, {too_large, iolist_size(WriteData)}}}}
end;
handle_event({call, From}, complete, upload_not_started, Data0) ->
case put_object(Data0) of
@@ -218,7 +220,6 @@ maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = D
true ->
upload_part(Data);
false ->
- % ct:print("buffer size: ~p, max part size: ~p, no upload", [BufferSize, MinPartSize]),
{ok, Data}
end.
@@ -237,7 +238,6 @@ upload_part(
) ->
case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of
{ok, ETag} ->
- % ct:print("upload part ~p, etag: ~p", [PartNumber, ETag]),
NewData = Data#{
buffer => [],
buffer_size => 0,
@@ -246,7 +246,6 @@ upload_part(
},
{ok, NewData};
{error, _} = Error ->
- % ct:print("upload part ~p failed: ~p", [PartNumber, Error]),
Error
end.
@@ -260,7 +259,11 @@ complete_upload(
) ->
case upload_part(Data0) of
{ok, #{etags := ETags} = Data1} ->
- case emqx_s3_client:complete_multipart(Client, Key, UploadId, lists:reverse(ETags)) of
+ case
+ emqx_s3_client:complete_multipart(
+ Client, Key, UploadId, lists:reverse(ETags)
+ )
+ of
ok ->
{ok, Data1};
{error, _} = Error ->
@@ -300,11 +303,11 @@ put_object(
Error
end.
--spec append_buffer(data(), binary()) -> data().
+-spec append_buffer(data(), iodata()) -> data().
append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) ->
Data#{
buffer => [WriteData | Buffer],
- buffer_size => BufferSize + byte_size(WriteData)
+ buffer_size => BufferSize + iolist_size(WriteData)
}.
-compile({inline, [wrap/1, unwrap/1]}).
@@ -315,4 +318,8 @@ unwrap(WrappedData) ->
WrappedData().
is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
- BufferSize + byte_size(WriteData) =< MaxPartSize.
+ BufferSize + iolist_size(WriteData) =< MaxPartSize.
+
+client(Config, Opts) ->
+ Headers = maps:get(headers, Opts, #{}),
+ emqx_s3_client:create(Config#{headers => Headers}).
diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl
index 3d0d7bb18..f5a507653 100644
--- a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl
+++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl
@@ -83,6 +83,39 @@ t_simple_put(Config) ->
ok = emqx_s3_client:put_object(Client, Key, Data).
+t_list(Config) ->
+ Key = ?config(key, Config),
+
+ Client = client(Config),
+
+ ok = emqx_s3_client:put_object(Client, Key, <<"data">>),
+
+ {ok, List} = emqx_s3_client:list(Client, Key),
+
+ [KeyInfo] = proplists:get_value(contents, List),
+ ?assertMatch(
+ #{
+ key := Key,
+ size := 4,
+ etag := _,
+ last_modified := _
+ },
+ maps:from_list(KeyInfo)
+ ).
+
+t_url(Config) ->
+ Key = ?config(key, Config),
+
+ Client = client(Config),
+ ok = emqx_s3_client:put_object(Client, Key, <<"data">>),
+
+ Url = emqx_s3_client:url(Client, Key),
+
+ ?assertMatch(
+ {ok, {{_StatusLine, 200, "OK"}, _Headers, "data"}},
+ httpc:request(Url)
+ ).
+
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------