feat: save ssl cert files for data bridges
This commit is contained in:
parent
e87fb3d9f1
commit
19630e9a99
|
@ -1954,7 +1954,7 @@ common_ssl_opts_schema(Defaults) ->
|
|||
)},
|
||||
{"cacertfile",
|
||||
sc(
|
||||
string(),
|
||||
binary(),
|
||||
#{
|
||||
default => D("cacertfile"),
|
||||
required => false,
|
||||
|
@ -1970,7 +1970,7 @@ common_ssl_opts_schema(Defaults) ->
|
|||
)},
|
||||
{"certfile",
|
||||
sc(
|
||||
string(),
|
||||
binary(),
|
||||
#{
|
||||
default => D("certfile"),
|
||||
required => false,
|
||||
|
@ -1985,7 +1985,7 @@ common_ssl_opts_schema(Defaults) ->
|
|||
)},
|
||||
{"keyfile",
|
||||
sc(
|
||||
string(),
|
||||
binary(),
|
||||
#{
|
||||
default => D("keyfile"),
|
||||
required => false,
|
||||
|
|
|
@ -32,7 +32,8 @@
|
|||
ensure_ssl_files/2,
|
||||
delete_ssl_files/3,
|
||||
drop_invalid_certs/1,
|
||||
is_valid_pem_file/1
|
||||
is_valid_pem_file/1,
|
||||
is_pem/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -281,8 +282,10 @@ ensure_ssl_files(_Dir, undefined, _DryRun) ->
|
|||
{ok, undefined};
|
||||
ensure_ssl_files(_Dir, #{<<"enable">> := False} = Opts, _DryRun) when ?IS_FALSE(False) ->
|
||||
{ok, Opts};
|
||||
ensure_ssl_files(_Dir, #{enable := False} = Opts, _DryRun) when ?IS_FALSE(False) ->
|
||||
{ok, Opts};
|
||||
ensure_ssl_files(Dir, Opts, DryRun) ->
|
||||
ensure_ssl_files(Dir, Opts, ?SSL_FILE_OPT_NAMES, DryRun).
|
||||
ensure_ssl_files(Dir, Opts, ?SSL_FILE_OPT_NAMES ++ ?SSL_FILE_OPT_NAMES_A, DryRun).
|
||||
|
||||
ensure_ssl_files(_Dir, Opts, [], _DryRun) ->
|
||||
{ok, Opts};
|
||||
|
@ -306,8 +309,11 @@ delete_ssl_files(Dir, NewOpts0, OldOpts0) ->
|
|||
end,
|
||||
lists:foreach(
|
||||
fun(Key) -> delete_old_file(Get(Key, NewOpts), Get(Key, OldOpts)) end,
|
||||
?SSL_FILE_OPT_NAMES
|
||||
).
|
||||
?SSL_FILE_OPT_NAMES ++ ?SSL_FILE_OPT_NAMES_A
|
||||
),
|
||||
%% try to delete the dir if it is empty
|
||||
_ = file:del_dir(pem_dir(Dir)),
|
||||
ok.
|
||||
|
||||
delete_old_file(New, Old) when New =:= Old -> ok;
|
||||
delete_old_file(_New, _Old = undefined) ->
|
||||
|
@ -394,8 +400,11 @@ save_pem_file(Dir, Key, Pem, DryRun) ->
|
|||
pem_file_name(Dir, Key, Pem) ->
|
||||
<<CK:8/binary, _/binary>> = crypto:hash(md5, Pem),
|
||||
Suffix = hex_str(CK),
|
||||
FileName = binary:replace(Key, <<"file">>, <<"-", Suffix/binary>>),
|
||||
filename:join([emqx:mutable_certs_dir(), Dir, FileName]).
|
||||
FileName = binary:replace(ensure_bin(Key), <<"file">>, <<"-", Suffix/binary>>),
|
||||
filename:join([pem_dir(Dir), FileName]).
|
||||
|
||||
pem_dir(Dir) ->
|
||||
filename:join([emqx:mutable_certs_dir(), Dir]).
|
||||
|
||||
hex_str(Bin) ->
|
||||
iolist_to_binary([io_lib:format("~2.16.0b", [X]) || <<X:8>> <= Bin]).
|
||||
|
@ -417,20 +426,21 @@ drop_invalid_certs(#{enable := False} = SSL) when ?IS_FALSE(False) ->
|
|||
drop_invalid_certs(#{<<"enable">> := False} = SSL) when ?IS_FALSE(False) ->
|
||||
maps:without(?SSL_FILE_OPT_NAMES, SSL);
|
||||
drop_invalid_certs(#{enable := True} = SSL) when ?IS_TRUE(True) ->
|
||||
drop_invalid_certs(?SSL_FILE_OPT_NAMES_A, SSL);
|
||||
do_drop_invalid_certs(?SSL_FILE_OPT_NAMES_A, SSL);
|
||||
drop_invalid_certs(#{<<"enable">> := True} = SSL) when ?IS_TRUE(True) ->
|
||||
drop_invalid_certs(?SSL_FILE_OPT_NAMES, SSL).
|
||||
do_drop_invalid_certs(?SSL_FILE_OPT_NAMES, SSL).
|
||||
|
||||
drop_invalid_certs([], SSL) ->
|
||||
do_drop_invalid_certs([], SSL) ->
|
||||
SSL;
|
||||
drop_invalid_certs([Key | Keys], SSL) ->
|
||||
do_drop_invalid_certs([Key | Keys], SSL) ->
|
||||
case maps:get(Key, SSL, undefined) of
|
||||
undefined ->
|
||||
drop_invalid_certs(Keys, SSL);
|
||||
Path ->
|
||||
case is_valid_pem_file(Path) of
|
||||
true -> SSL;
|
||||
{error, _} -> maps:without([Key], SSL)
|
||||
do_drop_invalid_certs(Keys, SSL);
|
||||
PemOrPath ->
|
||||
case is_pem(PemOrPath) orelse is_valid_pem_file(PemOrPath) of
|
||||
true -> do_drop_invalid_certs(Keys, SSL);
|
||||
{error, _} ->
|
||||
do_drop_invalid_certs(Keys, maps:without([Key], SSL))
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -476,6 +486,9 @@ ensure_str(undefined) -> undefined;
|
|||
ensure_str(L) when is_list(L) -> L;
|
||||
ensure_str(B) when is_binary(B) -> unicode:characters_to_list(B, utf8).
|
||||
|
||||
ensure_bin(B) when is_binary(B) -> B;
|
||||
ensure_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
||||
|
||||
-if(?OTP_RELEASE > 22).
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
|
|
@ -183,6 +183,7 @@ maybe_with_metrics_example(TypeNameExamp, _) ->
|
|||
|
||||
info_example_basic(http, _) ->
|
||||
#{
|
||||
enable => true,
|
||||
url => <<"http://localhost:9901/messages/${topic}">>,
|
||||
request_timeout => <<"15s">>,
|
||||
connect_timeout => <<"15s">>,
|
||||
|
@ -198,6 +199,7 @@ info_example_basic(http, _) ->
|
|||
};
|
||||
info_example_basic(mqtt, ingress) ->
|
||||
#{
|
||||
enable => true,
|
||||
connector => <<"mqtt:my_mqtt_connector">>,
|
||||
direction => ingress,
|
||||
remote_topic => <<"aws/#">>,
|
||||
|
@ -209,6 +211,7 @@ info_example_basic(mqtt, ingress) ->
|
|||
};
|
||||
info_example_basic(mqtt, egress) ->
|
||||
#{
|
||||
enable => true,
|
||||
connector => <<"mqtt:my_mqtt_connector">>,
|
||||
direction => egress,
|
||||
local_topic => <<"emqx/#">>,
|
||||
|
@ -512,7 +515,8 @@ aggregate_metrics(AllMetrics) ->
|
|||
|
||||
format_resp(#{type := Type, name := BridgeName, raw_config := RawConf,
|
||||
resource_data := #{status := Status, metrics := Metrics}}) ->
|
||||
RawConf#{
|
||||
RawConfFull = fill_defaults(Type, RawConf),
|
||||
RawConfFull#{
|
||||
type => Type,
|
||||
name => maps:get(<<"name">>, RawConf, BridgeName),
|
||||
node => node(),
|
||||
|
@ -527,6 +531,18 @@ format_metrics(#{
|
|||
} }) ->
|
||||
?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
|
||||
|
||||
fill_defaults(Type, RawConf) ->
|
||||
PackedConf = pack_bridge_conf(Type, RawConf),
|
||||
FullConf = emqx_config:fill_defaults(emqx_bridge_schema, PackedConf),
|
||||
unpack_bridge_conf(Type, FullConf).
|
||||
|
||||
pack_bridge_conf(Type, RawConf) ->
|
||||
#{<<"bridges">> => #{Type => #{<<"foo">> => RawConf}}}.
|
||||
|
||||
unpack_bridge_conf(Type, PackedConf) ->
|
||||
#{<<"bridges">> := Bridges} = PackedConf,
|
||||
#{<<"foo">> := RawConf} = maps:get(Type, Bridges),
|
||||
RawConf.
|
||||
|
||||
is_ok(ResL) ->
|
||||
case lists:filter(fun({ok, _}) -> false; (ok) -> false; (_) -> true end, ResL) of
|
||||
|
|
|
@ -193,21 +193,35 @@ do_wait_for_resource_ready(InstId, Retry) ->
|
|||
|
||||
do_create(InstId, Group, ResourceType, Config, Opts) ->
|
||||
case lookup(InstId) of
|
||||
{ok,_, _} ->
|
||||
{ok, _, _} ->
|
||||
{ok, already_created};
|
||||
{error, not_found} ->
|
||||
case do_start(InstId, Group, ResourceType, Config, Opts) of
|
||||
ok ->
|
||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
|
||||
[matched, success, failed, exception], [matched]),
|
||||
{ok, force_lookup(InstId)};
|
||||
Error ->
|
||||
Error
|
||||
case emqx_resource_ssl:convert_certs(InstId, Config) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, Config1} ->
|
||||
do_create2(InstId, Group, ResourceType, Config1, Opts)
|
||||
end
|
||||
end.
|
||||
|
||||
do_create2(InstId, Group, ResourceType, Config, Opts) ->
|
||||
ok = do_start(InstId, Group, ResourceType, Config, Opts),
|
||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
|
||||
[matched, success, failed, exception], [matched]),
|
||||
{ok, force_lookup(InstId)}.
|
||||
|
||||
do_create_dry_run(ResourceType, Config) ->
|
||||
InstId = make_test_id(),
|
||||
case emqx_resource_ssl:convert_certs(InstId, Config) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, Config1} ->
|
||||
Result = do_create_dry_run2(InstId, ResourceType, Config1),
|
||||
_ = emqx_resource_ssl:clear_certs(InstId, Config1),
|
||||
Result
|
||||
end.
|
||||
|
||||
do_create_dry_run2(InstId, ResourceType, Config) ->
|
||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState} ->
|
||||
case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
||||
|
@ -231,8 +245,9 @@ do_remove(Instance) ->
|
|||
do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
|
||||
do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]).
|
||||
|
||||
do_remove(Group, #{id := InstId} = Data, ClearMetrics) ->
|
||||
do_remove(Group, #{id := InstId, config := Config} = Data, ClearMetrics) ->
|
||||
_ = do_stop(Group, Data),
|
||||
_ = emqx_resource_ssl:clear_certs(InstId, Config),
|
||||
ets:delete(emqx_resource_instance, InstId),
|
||||
case ClearMetrics of
|
||||
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_resource_ssl).
|
||||
|
||||
-export([ convert_certs/2
|
||||
, convert_certs/3
|
||||
, clear_certs/2
|
||||
]).
|
||||
|
||||
convert_certs(ResId, NewConfig) ->
|
||||
convert_certs(ResId, NewConfig, #{}).
|
||||
|
||||
convert_certs(ResId, NewConfig, OldConfig) ->
|
||||
OldSSL = drop_invalid_certs(maps:get(ssl, OldConfig, undefined)),
|
||||
NewSSL = drop_invalid_certs(maps:get(ssl, NewConfig, undefined)),
|
||||
CertsDir = cert_dir(ResId),
|
||||
case emqx_tls_lib:ensure_ssl_files(CertsDir, NewSSL) of
|
||||
{ok, NewSSL1} ->
|
||||
ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL),
|
||||
{ok, new_ssl_config(NewConfig, NewSSL1)};
|
||||
{error, Reason} ->
|
||||
{error, {bad_ssl_config, Reason}}
|
||||
end.
|
||||
|
||||
clear_certs(ResId, Config) ->
|
||||
OldSSL = drop_invalid_certs(maps:get(ssl, Config, undefined)),
|
||||
ok = emqx_tls_lib:delete_ssl_files(cert_dir(ResId), undefined, OldSSL).
|
||||
|
||||
cert_dir(ResId) ->
|
||||
filename:join(["resources", ResId]).
|
||||
|
||||
new_ssl_config(Config, undefined) -> Config;
|
||||
new_ssl_config(Config, SSL) -> Config#{ssl => SSL}.
|
||||
|
||||
drop_invalid_certs(undefined) -> undefined;
|
||||
drop_invalid_certs(SSL) -> emqx_tls_lib:drop_invalid_certs(SSL).
|
Loading…
Reference in New Issue