emqx/apps/emqx_s3/src/emqx_s3.erl

130 lines
3.8 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_s3).
-include_lib("emqx/include/types.hrl").
-export([
start_profile/2,
stop_profile/1,
update_profile/2,
start_uploader/2,
with_client/2
]).
-export([
pre_config_update/3,
post_config_update/3
]).
-export_type([
profile_id/0,
profile_config/0,
acl/0
]).
-type profile_id() :: atom() | binary().
-type acl() ::
private
| public_read
| public_read_write
| authenticated_read
| bucket_owner_read
| bucket_owner_full_control.
-type transport_options() :: #{
headers => map(),
connect_timeout => pos_integer(),
enable_pipelining => pos_integer(),
max_retries => pos_integer(),
pool_size => pos_integer(),
pool_type => atom(),
ipv6_probe => boolean(),
ssl => map()
}.
-type profile_config() :: #{
bucket := string(),
access_key_id => string(),
secret_access_key => emqx_secret:t(string()),
host := string(),
port := pos_integer(),
url_expire_time := pos_integer(),
acl => acl(),
min_part_size => pos_integer(),
transport_options => transport_options()
}.
-define(IS_PROFILE_ID(ProfileId), (is_atom(ProfileId) orelse is_binary(ProfileId))).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec start_profile(profile_id(), profile_config()) -> ok_or_error(term()).
start_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) ->
case emqx_s3_sup:start_profile(ProfileId, ProfileConfig) of
{ok, _} ->
ok;
{error, _} = Error ->
Error
end.
-spec stop_profile(profile_id()) -> ok_or_error(term()).
stop_profile(ProfileId) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_sup:stop_profile(ProfileId).
-spec update_profile(profile_id(), profile_config()) -> ok_or_error(term()).
update_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig).
-spec start_uploader(profile_id(), emqx_s3_uploader:opts()) ->
supervisor:start_ret() | {error, profile_not_found}.
start_uploader(ProfileId, Opts) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts).
-spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) ->
{error, profile_not_found} | Result.
with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(ProfileId) ->
case emqx_s3_profile_conf:checkout_config(ProfileId) of
{ok, ClientConfig, _UploadConfig} ->
try
Fun(emqx_s3_client:create(ClientConfig))
after
emqx_s3_profile_conf:checkin_config(ProfileId)
end;
{error, _} = Error ->
Error
end.
%%
-spec pre_config_update(
profile_id(), maybe(emqx_config:raw_config()), maybe(emqx_config:raw_config())
) ->
{ok, maybe(profile_config())} | {error, term()}.
pre_config_update(ProfileId, NewConfig = #{<<"transport_options">> := TransportOpts}, _OldConfig) ->
case emqx_connector_ssl:convert_certs(mk_certs_dir(ProfileId), TransportOpts) of
{ok, TransportOptsConv} ->
{ok, NewConfig#{<<"transport_options">> := TransportOptsConv}};
{error, Reason} ->
{error, Reason}
end;
pre_config_update(_ProfileId, NewConfig, _OldConfig) ->
{ok, NewConfig}.
-spec post_config_update(
profile_id(),
maybe(emqx_config:config()),
maybe(emqx_config:config())
) ->
ok.
post_config_update(_ProfileId, _NewConfig, _OldConfig) ->
ok.
mk_certs_dir(ProfileId) ->
filename:join([s3, profiles, ProfileId]).