From bd7250cb1392619eb8d091f62991426d3c1e4b60 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 6 Apr 2023 00:04:10 +0300 Subject: [PATCH] fix(s3): fix hash pool type --- .../src/emqx_ft_storage_exporter_s3.erl | 44 ++++++---- apps/emqx_s3/src/emqx_s3_client.erl | 86 ++++++++++++------- apps/emqx_s3/src/emqx_s3_profile_conf.erl | 4 + apps/emqx_s3/test/emqx_s3_client_SUITE.erl | 36 ++++++-- 4 files changed, 117 insertions(+), 53 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index 24c3c52e5..63df71179 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -137,10 +137,11 @@ s3_headers({ClientId, FileId}, Filemeta) -> list(Client, Options) -> case list_key_info(Client, Options) of {ok, KeyInfos} -> - {ok, - lists:map( - fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos - )}; + MaybeExportInfos = lists:map( + fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos + ), + ExportInfos = [ExportInfo || {ok, ExportInfo} <- MaybeExportInfos], + {ok, ExportInfos}; {error, _Reason} = Error -> Error end. @@ -170,14 +171,18 @@ next_marker(KeyInfos) -> key_info_to_exportinfo(Client, KeyInfo, _Options) -> Key = proplists:get_value(key, KeyInfo), - {Transfer, Name} = parse_transfer_and_name(Key), - #{ - transfer => Transfer, - name => unicode:characters_to_binary(Name), - uri => emqx_s3_client:uri(Client, Key), - timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)), - size => proplists:get_value(size, KeyInfo) - }. + case parse_transfer_and_name(Key) of + {ok, {Transfer, Name}} -> + {ok, #{ + transfer => Transfer, + name => unicode:characters_to_binary(Name), + uri => emqx_s3_client:uri(Client, Key), + timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)), + size => proplists:get_value(size, KeyInfo) + }}; + {error, _Reason} = Error -> + Error + end. -define(EPOCH_START, 62167219200). @@ -185,8 +190,13 @@ datetime_to_epoch_second(DateTime) -> calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START. parse_transfer_and_name(Key) -> - [ClientId, FileId, Name] = string:split(Key, "/", all), - Transfer = { - emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId) - }, - {Transfer, Name}. + case string:split(Key, "/", all) of + [ClientId, FileId, Name] -> + Transfer = { + emqx_ft_fs_util:unescape_filename(ClientId), + emqx_ft_fs_util:unescape_filename(FileId) + }, + {ok, {Transfer, Name}}; + _ -> + {error, invalid_key} + end. diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index b6578a7e1..bfe87a602 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -38,6 +38,8 @@ -type part_number() :: non_neg_integer(). -type upload_id() :: string(). -type etag() :: string(). +-type http_pool() :: ehttpc:pool_name(). +-type pool_type() :: random | hash. -type upload_options() :: list({acl, emqx_s3:acl()}). -opaque client() :: #{ @@ -59,6 +61,7 @@ access_key_id := string() | undefined, secret_access_key := string() | undefined, http_pool := ehttpc:pool_name(), + pool_type := pool_type(), request_timeout := timeout() | undefined, max_retries := non_neg_integer() | undefined }. @@ -79,7 +82,8 @@ create(Config) -> upload_options => upload_options(Config), bucket => maps:get(bucket, Config), url_expire_time => maps:get(url_expire_time, Config), - headers => headers(Config) + headers => headers(Config), + pool_type => maps:get(pool_type, Config) }. -spec put_object(client(), key(), iodata()) -> ok_or_error(term()). @@ -211,6 +215,7 @@ aws_config(#{ access_key_id := AccessKeyId, secret_access_key := SecretAccessKey, http_pool := HttpPool, + pool_type := PoolType, request_timeout := Timeout, max_retries := MaxRetries }) -> @@ -224,7 +229,9 @@ aws_config(#{ access_key_id = AccessKeyId, secret_access_key = SecretAccessKey, - http_client = request_fun(HttpPool, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)), + http_client = request_fun( + HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES) + ), %% This value will be transparently passed to ehttpc timeout = with_default(Timeout, ?DEFAULT_REQUEST_TIMEOUT), @@ -232,55 +239,63 @@ aws_config(#{ retry_num = 1 }. --type http_pool() :: term(). - --spec request_fun(http_pool(), non_neg_integer()) -> erlcloud_httpc:request_fun(). -request_fun(HttpPool, MaxRetries) -> +-spec request_fun(http_pool(), pool_type(), non_neg_integer()) -> erlcloud_httpc:request_fun(). +request_fun(HttpPool, PoolType, MaxRetries) -> fun(Url, Method, Headers, Body, Timeout, _Config) -> with_path_and_query_only(Url, fun(PathQuery) -> Request = make_request( Method, PathQuery, headers_erlcloud_request_to_ehttpc(Headers), Body ), - ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) + case pick_worker_safe(HttpPool, PoolType) of + {ok, Worker} -> + ehttpc_request(Worker, Method, Request, Timeout, MaxRetries); + {error, Reason} -> + ?SLOG(error, #{ + msg => "s3_request_fun_fail", + reason => Reason, + http_pool => HttpPool, + pool_type => PoolType, + method => Method, + request => Request, + timeout => Timeout, + max_retries => MaxRetries + }), + {error, Reason} + end end) end. ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) -> - ?SLOG(debug, #{ - msg => "s3_ehttpc_request", - timeout => Timeout, - pool => HttpPool, - method => Method, - max_retries => MaxRetries, - request => format_request(Request) - }), - try ehttpc:request(HttpPool, Method, Request, Timeout, MaxRetries) of - {ok, StatusCode, RespHeaders} -> - ?SLOG(debug, #{ - msg => "s3_ehttpc_request_ok", - status_code => StatusCode, - headers => RespHeaders - }), - {ok, { - {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), undefined - }}; - {ok, StatusCode, RespHeaders, RespBody} -> + try timer:tc(fun() -> ehttpc:request(HttpPool, Method, Request, Timeout, MaxRetries) end) of + {Time, {ok, StatusCode, RespHeaders}} -> ?SLOG(debug, #{ msg => "s3_ehttpc_request_ok", status_code => StatusCode, headers => RespHeaders, - body => RespBody + time => Time + }), + {ok, { + {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), undefined + }}; + {Time, {ok, StatusCode, RespHeaders, RespBody}} -> + ?SLOG(debug, #{ + msg => "s3_ehttpc_request_ok", + status_code => StatusCode, + headers => RespHeaders, + body => RespBody, + time => Time }), {ok, { {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), RespBody }}; - {error, Reason} -> + {Time, {error, Reason}} -> ?SLOG(error, #{ msg => "s3_ehttpc_request_fail", reason => Reason, timeout => Timeout, pool => HttpPool, - method => Method + method => Method, + time => Time }), {error, Reason} catch @@ -304,6 +319,19 @@ ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) -> {error, Reason} end. +pick_worker_safe(HttpPool, PoolType) -> + try + {ok, pick_worker(HttpPool, PoolType)} + catch + error:badarg -> + {error, no_ehttpc_pool} + end. + +pick_worker(HttpPool, random) -> + ehttpc_pool:pick_worker(HttpPool); +pick_worker(HttpPool, hash) -> + ehttpc_pool:pick_worker(HttpPool, self()). + -define(IS_BODY_EMPTY(Body), (Body =:= undefined orelse Body =:= <<>>)). -define(NEEDS_NO_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)). diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 13efb9d74..e11dac530 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -204,6 +204,7 @@ client_config(ProfileConfig, PoolName) -> secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined), request_timeout => maps:get(request_timeout, HTTPOpts, undefined), max_retries => maps:get(max_retries, HTTPOpts, undefined), + pool_type => maps:get(pool_type, HTTPOpts, random), http_pool => PoolName }. @@ -371,9 +372,12 @@ stop_http_pool(ProfileId, PoolName) -> ok = ?tp(debug, "s3_stop_http_pool", #{pool_name => PoolName}). do_start_http_pool(PoolName, HttpConfig) -> + ?SLOG(warning, #{msg => "s3_start_http_pool", pool_name => PoolName, config => HttpConfig}), case ehttpc_sup:start_pool(PoolName, HttpConfig) of {ok, _} -> + ?SLOG(warning, #{msg => "s3_start_http_pool_success", pool_name => PoolName}), ok; {error, _} = Error -> + ?SLOG(error, #{msg => "s3_start_http_pool_fail", pool_name => PoolName, error => Error}), Error end. diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl index ec7d5ebcf..4af803c67 100644 --- a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl @@ -20,9 +20,15 @@ all() -> groups() -> AllCases = emqx_common_test_helpers:all(?MODULE), + PoolGroups = [ + {group, pool_random}, + {group, pool_hash} + ], [ - {tcp, [], AllCases}, - {tls, [], AllCases} + {tcp, [], PoolGroups}, + {tls, [], PoolGroups}, + {pool_random, [], AllCases}, + {pool_hash, [], AllCases} ]. init_per_suite(Config) -> @@ -32,8 +38,17 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok = application:stop(emqx_s3). -init_per_group(ConnType, Config) -> - [{conn_type, ConnType} | Config]. +init_per_group(ConnTypeGroup, Config) when ConnTypeGroup =:= tcp; ConnTypeGroup =:= tls -> + [{conn_type, ConnTypeGroup} | Config]; +init_per_group(PoolTypeGroup, Config) when + PoolTypeGroup =:= pool_random; PoolTypeGroup =:= pool_hash +-> + PoolType = + case PoolTypeGroup of + pool_random -> random; + pool_hash -> hash + end, + [{pool_type, PoolType} | Config]. end_per_group(_ConnType, _Config) -> ok. @@ -127,11 +142,18 @@ client(Config) -> emqx_s3_client:create(ClientConfig). profile_config(Config) -> - maps:put( + ProfileConfig0 = emqx_s3_test_helpers:base_config(?config(conn_type, Config)), + ProfileConfig1 = maps:put( bucket, ?config(bucket, Config), - emqx_s3_test_helpers:base_config(?config(conn_type, Config)) - ). + ProfileConfig0 + ), + ProfileConfig2 = emqx_map_lib:deep_put( + [transport_options, pool_type], + ProfileConfig1, + ?config(pool_type, Config) + ), + ProfileConfig2. data(Size) -> iolist_to_binary([$a || _ <- lists:seq(1, Size)]).