From 43f973742033fa6eb14e6bdf278facc27f13731c Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 3 Apr 2023 20:35:47 +0300 Subject: [PATCH] feat(ft-s3): integrate list API --- apps/emqx_ft/src/emqx_ft.erl | 1 - apps/emqx_ft/src/emqx_ft_api.erl | 8 +- .../emqx_ft_storage_exporter_s3.erl | 102 ---------- .../emqx_ft_storage_exporter_fs.erl | 0 .../emqx_ft_storage_exporter_fs_api.erl | 0 .../emqx_ft_storage_exporter_fs_proxy.erl | 0 .../src/emqx_ft_storage_exporter_s3.erl | 188 ++++++++++++++++++ apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf | 12 +- apps/emqx_s3/src/emqx_s3.erl | 3 + apps/emqx_s3/src/emqx_s3_client.erl | 89 +++++---- apps/emqx_s3/src/emqx_s3_profile_conf.erl | 1 + apps/emqx_s3/src/emqx_s3_schema.erl | 9 + apps/emqx_s3/src/emqx_s3_uploader.erl | 49 +++-- apps/emqx_s3/test/emqx_s3_client_SUITE.erl | 33 +++ 14 files changed, 333 insertions(+), 162 deletions(-) delete mode 100644 apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_s3.erl rename apps/emqx_ft/src/{emqx_ft_storage_exporter => }/emqx_ft_storage_exporter_fs.erl (100%) rename apps/emqx_ft/src/{emqx_ft_storage_exporter => }/emqx_ft_storage_exporter_fs_api.erl (100%) rename apps/emqx_ft/src/{emqx_ft_storage_exporter => }/emqx_ft_storage_exporter_fs_proxy.erl (100%) create mode 100644 apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 064b6e066..c5522cdbd 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -118,7 +118,6 @@ decode_filemeta(Map) when is_map(Map) -> end. encode_filemeta(Meta = #{}) -> - % TODO: Looks like this should be hocon's responsibility. Schema = emqx_ft_schema:schema(filemeta), hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}). diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index a667454b6..390c10557 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -69,6 +69,7 @@ schema("/file_transfer/files") -> '/file_transfer/files'(get, #{}) -> case emqx_ft_storage:files() of {ok, Files} -> + ?SLOG(warning, #{msg => "files", files => Files}), {200, #{<<"files">> => lists:map(fun format_file_info/1, Files)}}; {error, _} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} @@ -94,7 +95,7 @@ format_file_info( } ) -> Res = #{ - name => iolist_to_binary(Name), + name => format_name(Name), size => Size, timestamp => format_timestamp(Timestamp), clientid => ClientId, @@ -110,3 +111,8 @@ format_file_info( format_timestamp(Timestamp) -> iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])). + +format_name(NameBin) when is_binary(NameBin) -> + NameBin; +format_name(Name) when is_list(Name) -> + iolist_to_binary(Name). diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_s3.erl deleted file mode 100644 index 977ff576a..000000000 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_s3.erl +++ /dev/null @@ -1,102 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ft_storage_exporter_s3). - --include_lib("emqx/include/logger.hrl"). - -%% Exporter API --export([start_export/3]). --export([write/2]). --export([complete/1]). --export([discard/1]). --export([list/1]). - --export([ - start/1, - stop/1, - update/2 -]). - --type options() :: emqx_s3:profile_config(). --type transfer() :: emqx_ft:transfer(). --type filemeta() :: emqx_ft:filemeta(). --type exportinfo() :: #{ - transfer := transfer(), - name := file:name(), - uri := uri_string:uri_string(), - timestamp := emqx_datetime:epoch_second(), - size := _Bytes :: non_neg_integer(), - meta => filemeta() -}. - --type export_st() :: #{ - pid := pid(), - meta := filemeta() -}. - -%%-------------------------------------------------------------------- -%% Exporter behaviour -%%-------------------------------------------------------------------- - --spec start_export(options(), transfer(), filemeta()) -> - {ok, export_st()} | {error, term()}. -start_export(_Options, _Transfer, Filemeta = #{name := Filename}) -> - Pid = spawn(fun() -> Filename end), - #{meta => Filemeta, pid => Pid}. - --spec write(export_st(), iodata()) -> - {ok, export_st()} | {error, term()}. -write(ExportSt, _IoData) -> - {ok, ExportSt}. - --spec complete(export_st()) -> - ok | {error, term()}. -complete(_ExportSt) -> - ok. - --spec discard(export_st()) -> - ok. -discard(_ExportSt) -> - ok. - --spec list(options()) -> - {ok, [exportinfo()]} | {error, term()}. -list(_Options) -> - {ok, []}. - -%%-------------------------------------------------------------------- -%% Exporter behaviour (lifecycle) -%%-------------------------------------------------------------------- - --spec start(options()) -> ok | {error, term()}. -start(Options) -> - emqx_s3:start_profile(s3_profile_id(), Options). - --spec stop(options()) -> ok. -stop(_Options) -> - ok = emqx_s3:stop_profile(s3_profile_id()). - --spec update(options(), options()) -> ok. -update(_OldOptions, NewOptions) -> - emqx_s3:update_profile(s3_profile_id(), NewOptions). - -%%-------------------------------------------------------------------- -%% Internal functions -%% ------------------------------------------------------------------- - -s3_profile_id() -> - atom_to_binary(?MODULE). diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl similarity index 100% rename from apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs.erl rename to apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl similarity index 100% rename from apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_api.erl rename to apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl similarity index 100% rename from apps/emqx_ft/src/emqx_ft_storage_exporter/emqx_ft_storage_exporter_fs_proxy.erl rename to apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl new file mode 100644 index 000000000..1c97520e3 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -0,0 +1,188 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_exporter_s3). + +-include_lib("emqx/include/logger.hrl"). + +%% Exporter API +-export([start_export/3]). +-export([write/2]). +-export([complete/2]). +-export([discard/1]). +-export([list/1]). + +-export([ + start/1, + stop/1, + update/2 +]). + +-type options() :: emqx_s3:profile_config(). +-type transfer() :: emqx_ft:transfer(). +-type filemeta() :: emqx_ft:filemeta(). +-type exportinfo() :: #{ + transfer := transfer(), + name := file:name(), + uri := uri_string:uri_string(), + timestamp := emqx_datetime:epoch_second(), + size := _Bytes :: non_neg_integer(), + filemeta => filemeta() +}. + +-type export_st() :: #{ + pid := pid(), + filemeta := filemeta(), + transfer := transfer() +}. + +-define(S3_PROFILE_ID, <<"emqx_ft_storage_exporter_s3">>). +-define(FILEMETA_VSN, <<"1">>). +-define(S3_LIST_LIMIT, 500). + +%%-------------------------------------------------------------------- +%% Exporter behaviour +%%-------------------------------------------------------------------- + +-spec start_export(options(), transfer(), filemeta()) -> + {ok, export_st()} | {error, term()}. +start_export(_Options, Transfer, Filemeta) -> + Options = #{ + key => s3_key(Transfer, Filemeta), + headers => s3_headers(Transfer, Filemeta) + }, + case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of + {ok, Pid} -> + {ok, #{filemeta => Filemeta, pid => Pid}}; + {error, _Reason} = Error -> + Error + end. + +-spec write(export_st(), iodata()) -> + {ok, export_st()} | {error, term()}. +write(#{pid := Pid} = ExportSt, IoData) -> + case emqx_s3_uploader:write(Pid, IoData) of + ok -> + {ok, ExportSt}; + {error, _Reason} = Error -> + Error + end. + +-spec complete(export_st(), emqx_ft:checksum()) -> + ok | {error, term()}. +complete(#{pid := Pid} = _ExportSt, _Checksum) -> + emqx_s3_uploader:complete(Pid). + +-spec discard(export_st()) -> + ok. +discard(#{pid := Pid} = _ExportSt) -> + emqx_s3_uploader:abort(Pid). + +-spec list(options()) -> + {ok, [exportinfo()]} | {error, term()}. +list(Options) -> + emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options) end). + +%%-------------------------------------------------------------------- +%% Exporter behaviour (lifecycle) +%%-------------------------------------------------------------------- + +-spec start(options()) -> ok | {error, term()}. +start(Options) -> + emqx_s3:start_profile(?S3_PROFILE_ID, Options). + +-spec stop(options()) -> ok. +stop(_Options) -> + ok = emqx_s3:stop_profile(?S3_PROFILE_ID). + +-spec update(options(), options()) -> ok. +update(_OldOptions, NewOptions) -> + emqx_s3:update_profile(?S3_PROFILE_ID, NewOptions). + +%%-------------------------------------------------------------------- +%% Internal functions +%% ------------------------------------------------------------------- + +s3_key({ClientId, FileId} = _Transfer, #{name := Filename}) -> + filename:join([ + emqx_ft_fs_util:escape_filename(ClientId), + emqx_ft_fs_util:escape_filename(FileId), + Filename + ]). + +s3_headers({ClientId, FileId}, Filemeta) -> + #{ + %% The ClientID MUST be a UTF-8 Encoded String + <<"x-amz-meta-clientid">> => ClientId, + %% It [Topic Name] MUST be a UTF-8 Encoded String + <<"x-amz-meta-fileid">> => FileId, + <<"x-amz-meta-filemeta">> => emqx_json:encode(emqx_ft:encode_filemeta(Filemeta)), + <<"x-amz-meta-filemeta-vsn">> => ?FILEMETA_VSN + }. + +list(Client, Options) -> + case list_key_info(Client, Options) of + {ok, KeyInfos} -> + {ok, + lists:map( + fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos + )}; + {error, _Reason} = Error -> + Error + end. + +list_key_info(Client, Options) -> + list_key_info(Client, Options, _Marker = [], _Acc = []). + +list_key_info(Client, Options, Marker, Acc) -> + ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker, + case emqx_s3_client:list(Client, ListOptions) of + {ok, Result} -> + ?SLOG(warning, #{msg => "list_key_info", result => Result}), + KeyInfos = proplists:get_value(contents, Result, []), + case proplists:get_value(is_truncated, Result, false) of + true -> + NewMarker = [{marker, proplists:get_value(next_marker, Result)}], + list_key_info(Client, Options, NewMarker, [KeyInfos | Acc]); + false -> + {ok, lists:append(lists:reverse([KeyInfos | Acc]))} + end; + {error, _Reason} = Error -> + Error + end. + +key_info_to_exportinfo(Client, KeyInfo, _Options) -> + Key = proplists:get_value(key, KeyInfo), + {Transfer, Name} = parse_transfer_and_name(Key), + #{ + transfer => Transfer, + name => unicode:characters_to_binary(Name), + uri => emqx_s3_client:uri(Client, Key), + timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)), + size => proplists:get_value(size, KeyInfo) + }. + +-define(EPOCH_START, 62167219200). + +datetime_to_epoch_second(DateTime) -> + calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START. + +parse_transfer_and_name(Key) -> + [ClientId, FileId, Name] = string:split(Key, "/", all), + Transfer = { + emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId) + }, + {Transfer, Name}. diff --git a/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf b/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf index 4e0870bae..99004d62a 100644 --- a/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf +++ b/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf @@ -39,9 +39,17 @@ emqx_s3_schema { en: "S3 endpoint port" } } + url_expire_time { + desc { + en: "The time in seconds for which the signed URLs to the S3 objects are valid." + } + label { + en: "Signed URL expiration time" + } + } min_part_size { desc { - en: """The minimum part size for multipart uploads. + en: """The minimum part size for multipart uploads.
Uploaded data will be accumulated in memory until this size is reached.""" } label { @@ -50,7 +58,7 @@ Uploaded data will be accumulated in memory until this size is reached.""" } max_part_size { desc { - en: """The maximum part size for multipart uploads. + en: """The maximum part size for multipart uploads.
S3 uploader won't try to upload parts larger than this size.""" } label { diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl index 7c78de979..8798b608d 100644 --- a/apps/emqx_s3/src/emqx_s3.erl +++ b/apps/emqx_s3/src/emqx_s3.erl @@ -41,8 +41,11 @@ -type profile_config() :: #{ bucket := string(), + access_key_id => string(), + secret_access_key => string(), host := string(), port := pos_integer(), + url_expire_time := pos_integer(), acl => acl(), min_part_size => pos_integer(), transport_options => transport_options() diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index 01d677922..a4d470a97 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -8,9 +8,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("erlcloud/include/erlcloud_aws.hrl"). --compile(nowarn_export_all). --compile(export_all). - -export([ create/1, @@ -21,11 +18,16 @@ complete_multipart/4, abort_multipart/3, list/2, + uri/2, - format/1 + format/1, + format_request/1 ]). --export_type([client/0]). +-export_type([ + client/0, + headers/0 +]). -type s3_bucket_acl() :: private @@ -35,7 +37,7 @@ | bucket_owner_read | bucket_owner_full_control. --type headers() :: #{binary() => binary()}. +-type headers() :: #{binary() | string() => binary() | string()}. -type key() :: string(). -type part_number() :: non_neg_integer(). @@ -58,6 +60,7 @@ bucket := string(), headers := headers(), acl := s3_bucket_acl(), + url_expire_time := pos_integer(), access_key_id := string() | undefined, secret_access_key := string() | undefined, http_pool := ecpool:pool_name(), @@ -76,6 +79,7 @@ create(Config) -> aws_config => aws_config(Config), upload_options => upload_options(Config), bucket => maps:get(bucket, Config), + url_expire_time => maps:get(url_expire_time, Config), headers => headers(Config) }. @@ -85,7 +89,7 @@ put_object( Key, Value ) -> - try erlcloud_s3:put_object(Bucket, Key, Value, Options, Headers, AwsConfig) of + try erlcloud_s3:put_object(Bucket, key(Key), Value, Options, Headers, AwsConfig) of Props when is_list(Props) -> ok catch @@ -99,7 +103,7 @@ start_multipart( #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig}, Key ) -> - case erlcloud_s3:start_multipart(Bucket, Key, Options, Headers, AwsConfig) of + case erlcloud_s3:start_multipart(Bucket, key(Key), Options, Headers, AwsConfig) of {ok, Props} -> {ok, proplists:get_value(uploadId, Props)}; {error, Reason} -> @@ -116,7 +120,9 @@ upload_part( PartNumber, Value ) -> - case erlcloud_s3:upload_part(Bucket, Key, UploadId, PartNumber, Value, Headers, AwsConfig) of + case + erlcloud_s3:upload_part(Bucket, key(Key), UploadId, PartNumber, Value, Headers, AwsConfig) + of {ok, Props} -> {ok, proplists:get_value(etag, Props)}; {error, Reason} -> @@ -126,9 +132,12 @@ upload_part( -spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()). complete_multipart( - #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId, ETags + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, + Key, + UploadId, + ETags ) -> - case erlcloud_s3:complete_multipart(Bucket, Key, UploadId, ETags, Headers, AwsConfig) of + case erlcloud_s3:complete_multipart(Bucket, key(Key), UploadId, ETags, Headers, AwsConfig) of ok -> ok; {error, Reason} -> @@ -138,7 +147,7 @@ complete_multipart( -spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()). abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) -> - case erlcloud_s3:abort_multipart(Bucket, Key, UploadId, [], Headers, AwsConfig) of + case erlcloud_s3:abort_multipart(Bucket, key(Key), UploadId, [], Headers, AwsConfig) of ok -> ok; {error, Reason} -> @@ -156,6 +165,10 @@ list(#{bucket := Bucket, aws_config := AwsConfig}, Options) -> {error, Reason} end. +-spec uri(client(), key()) -> iodata(). +uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) -> + erlcloud_s3:make_get_url(ExpireTime, Bucket, key(Key), AwsConfig). + -spec format(client()) -> term(). format(#{aws_config := AwsConfig} = Client) -> Client#{aws_config => AwsConfig#aws_config{secret_access_key = "***"}}. @@ -170,13 +183,14 @@ upload_options(Config) -> ]. headers(#{headers := Headers}) -> - maps:to_list(Headers). + string_headers(maps:to_list(Headers)); +headers(#{}) -> + []. aws_config(#{ scheme := Scheme, host := Host, port := Port, - headers := Headers, access_key_id := AccessKeyId, secret_access_key := SecretAccessKey, http_pool := HttpPool, @@ -187,23 +201,29 @@ aws_config(#{ s3_host = Host, s3_port = Port, s3_bucket_access_method = path, + s3_bucket_after_host = true, access_key_id = AccessKeyId, secret_access_key = SecretAccessKey, - http_client = request_fun(Headers, HttpPool), + http_client = request_fun(HttpPool), timeout = Timeout }. --type http_headers() :: [{binary(), binary()}]. -type http_pool() :: term(). --spec request_fun(http_headers(), http_pool()) -> erlcloud_httpc:request_fun(). -request_fun(CustomHeaders, HttpPool) -> +-spec request_fun(http_pool()) -> erlcloud_httpc:request_fun(). +request_fun(HttpPool) -> fun(Url, Method, Headers, Body, Timeout, _Config) -> with_path_and_query_only(Url, fun(PathQuery) -> - JoinedHeaders = join_headers(Headers, CustomHeaders), - Request = make_request(Method, PathQuery, JoinedHeaders, Body), + Request = make_request(Method, PathQuery, binary_headers(Headers), Body), + ?SLOG(warning, #{ + msg => "s3_ehttpc_request", + timeout => Timeout, + pool => HttpPool, + method => Method, + request => Request + }), ehttpc_request(HttpPool, Method, Request, Timeout) end) end. @@ -211,9 +231,9 @@ request_fun(CustomHeaders, HttpPool) -> ehttpc_request(HttpPool, Method, Request, Timeout) -> try ehttpc:request(HttpPool, Method, Request, Timeout) of {ok, StatusCode, RespHeaders} -> - {ok, {{StatusCode, undefined}, string_headers(RespHeaders), undefined}}; + {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), undefined}}; {ok, StatusCode, RespHeaders, RespBody} -> - {ok, {{StatusCode, undefined}, string_headers(RespHeaders), RespBody}}; + {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), RespBody}}; {error, Reason} -> ?SLOG(error, #{ msg => "s3_ehttpc_request_fail", @@ -258,16 +278,6 @@ make_request(_Method, PathQuery, Headers, Body) -> format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}. -join_headers(Headers, CustomHeaders) -> - MapHeaders = lists:foldl( - fun({K, V}, MHeaders) -> - maps:put(to_binary(K), V, MHeaders) - end, - #{}, - Headers ++ maps:to_list(CustomHeaders) - ), - maps:to_list(MapHeaders). - with_path_and_query_only(Url, Fun) -> case string:split(Url, "//", leading) of [_Scheme, UrlRem] -> @@ -281,13 +291,22 @@ with_path_and_query_only(Url, Fun) -> {error, {invalid_url, Url}} end. +string_headers(Headers) -> + [{to_list_string(K), to_list_string(V)} || {K, V} <- Headers]. + +erlcloud_string_headers(Headers) -> + [{string:to_lower(K), V} || {K, V} <- string_headers(Headers)]. + +binary_headers(Headers) -> + [{to_binary(K), V} || {K, V} <- Headers]. + to_binary(Val) when is_list(Val) -> list_to_binary(Val); to_binary(Val) when is_binary(Val) -> Val. -string_headers(Hdrs) -> - [{string:to_lower(to_list_string(K)), to_list_string(V)} || {K, V} <- Hdrs]. - to_list_string(Val) when is_binary(Val) -> binary_to_list(Val); to_list_string(Val) when is_list(Val) -> Val. + +key(Characters) -> + binary_to_list(unicode:characters_to_binary(Characters)). diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 09e945edc..3c5eb8f36 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -211,6 +211,7 @@ client_config(ProfileConfig, PoolName) -> scheme => scheme(HTTPOpts), host => maps:get(host, ProfileConfig), port => maps:get(port, ProfileConfig), + url_expire_time => maps:get(url_expire_time, ProfileConfig), headers => maps:get(headers, HTTPOpts, #{}), acl => maps:get(acl, ProfileConfig), bucket => maps:get(bucket, ProfileConfig), diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index 5d76e7120..6db9a5886 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -63,6 +63,15 @@ fields(s3) -> required => true } )}, + {url_expire_time, + mk( + emqx_schema:duration_s(), + #{ + default => "1h", + desc => ?DESC("url_expire_time"), + required => false + } + )}, {min_part_size, mk( emqx_schema:bytesize(), diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl index 4e3fe15f2..07ae5bd19 100644 --- a/apps/emqx_s3/src/emqx_s3_uploader.erl +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -12,8 +12,13 @@ start_link/2, write/2, + write/3, + complete/1, - abort/1 + complete/2, + + abort/1, + abort/2 ]). -export([ @@ -26,14 +31,11 @@ format_status/2 ]). --export_type([opts/0, config/0]). +-export_type([opts/0]). -type opts() :: #{ - name := string() -}. - --type config() :: #{ - min_part_size := pos_integer() + key := string(), + headers => emqx_s3_client:headers() }. -type data() :: #{ @@ -58,12 +60,12 @@ start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) -> gen_statem:start_link(?MODULE, [ProfileId, Opts], []). --spec write(pid(), binary()) -> ok_or_error(term()). -write(Pid, WriteData) when is_binary(WriteData) -> +-spec write(pid(), iodata()) -> ok_or_error(term()). +write(Pid, WriteData) -> write(Pid, WriteData, infinity). --spec write(pid(), binary(), timeout()) -> ok_or_error(term()). -write(Pid, WriteData, Timeout) when is_binary(WriteData) -> +-spec write(pid(), iodata(), timeout()) -> ok_or_error(term()). +write(Pid, WriteData, Timeout) -> gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout). -spec complete(pid()) -> ok_or_error(term()). @@ -88,10 +90,10 @@ abort(Pid, Timeout) -> callback_mode() -> handle_event_function. -init([ProfileId, #{key := Key}]) -> +init([ProfileId, #{key := Key} = Opts]) -> process_flag(trap_exit, true), {ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId), - Client = emqx_s3_client:create(ClientConfig), + Client = client(ClientConfig, Opts), {ok, upload_not_started, #{ profile_id => ProfileId, client => Client, @@ -111,7 +113,7 @@ handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) -> true -> handle_write(State, From, WriteData, Data0); false -> - {keep_state_and_data, {reply, From, {error, {too_large, byte_size(WriteData)}}}} + {keep_state_and_data, {reply, From, {error, {too_large, iolist_size(WriteData)}}}} end; handle_event({call, From}, complete, upload_not_started, Data0) -> case put_object(Data0) of @@ -218,7 +220,6 @@ maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = D true -> upload_part(Data); false -> - % ct:print("buffer size: ~p, max part size: ~p, no upload", [BufferSize, MinPartSize]), {ok, Data} end. @@ -237,7 +238,6 @@ upload_part( ) -> case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of {ok, ETag} -> - % ct:print("upload part ~p, etag: ~p", [PartNumber, ETag]), NewData = Data#{ buffer => [], buffer_size => 0, @@ -246,7 +246,6 @@ upload_part( }, {ok, NewData}; {error, _} = Error -> - % ct:print("upload part ~p failed: ~p", [PartNumber, Error]), Error end. @@ -260,7 +259,11 @@ complete_upload( ) -> case upload_part(Data0) of {ok, #{etags := ETags} = Data1} -> - case emqx_s3_client:complete_multipart(Client, Key, UploadId, lists:reverse(ETags)) of + case + emqx_s3_client:complete_multipart( + Client, Key, UploadId, lists:reverse(ETags) + ) + of ok -> {ok, Data1}; {error, _} = Error -> @@ -300,11 +303,11 @@ put_object( Error end. --spec append_buffer(data(), binary()) -> data(). +-spec append_buffer(data(), iodata()) -> data(). append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) -> Data#{ buffer => [WriteData | Buffer], - buffer_size => BufferSize + byte_size(WriteData) + buffer_size => BufferSize + iolist_size(WriteData) }. -compile({inline, [wrap/1, unwrap/1]}). @@ -315,4 +318,8 @@ unwrap(WrappedData) -> WrappedData(). is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> - BufferSize + byte_size(WriteData) =< MaxPartSize. + BufferSize + iolist_size(WriteData) =< MaxPartSize. + +client(Config, Opts) -> + Headers = maps:get(headers, Opts, #{}), + emqx_s3_client:create(Config#{headers => Headers}). diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl index 3d0d7bb18..f5a507653 100644 --- a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl @@ -83,6 +83,39 @@ t_simple_put(Config) -> ok = emqx_s3_client:put_object(Client, Key, Data). +t_list(Config) -> + Key = ?config(key, Config), + + Client = client(Config), + + ok = emqx_s3_client:put_object(Client, Key, <<"data">>), + + {ok, List} = emqx_s3_client:list(Client, Key), + + [KeyInfo] = proplists:get_value(contents, List), + ?assertMatch( + #{ + key := Key, + size := 4, + etag := _, + last_modified := _ + }, + maps:from_list(KeyInfo) + ). + +t_url(Config) -> + Key = ?config(key, Config), + + Client = client(Config), + ok = emqx_s3_client:put_object(Client, Key, <<"data">>), + + Url = emqx_s3_client:url(Client, Key), + + ?assertMatch( + {ok, {{_StatusLine, 200, "OK"}, _Headers, "data"}}, + httpc:request(Url) + ). + %%-------------------------------------------------------------------- %% Helpers %%--------------------------------------------------------------------