Merge pull request #7786 from terry-xiaoyu/save_tls_files_for_bridges
Save tls files for bridges
This commit is contained in:
commit
46e993fa81
|
@ -57,6 +57,7 @@
|
|||
validate_heap_size/1,
|
||||
parse_user_lookup_fun/1,
|
||||
validate_alarm_actions/1,
|
||||
non_empty_string/1,
|
||||
validations/0
|
||||
]).
|
||||
|
||||
|
@ -1898,6 +1899,7 @@ client_ssl_opts_schema(Defaults1) ->
|
|||
hoconsc:union([disable, string()]),
|
||||
#{
|
||||
required => false,
|
||||
validator => fun emqx_schema:non_empty_string/1,
|
||||
desc => ?DESC(client_ssl_opts_schema_server_name_indication)
|
||||
}
|
||||
)}
|
||||
|
@ -2177,3 +2179,8 @@ authentication(Type) ->
|
|||
-spec qos() -> typerefl:type().
|
||||
qos() ->
|
||||
typerefl:alias("qos", typerefl:union([0, 1, 2])).
|
||||
|
||||
non_empty_string(<<>>) -> {error, empty_string_not_allowed};
|
||||
non_empty_string("") -> {error, empty_string_not_allowed};
|
||||
non_empty_string(S) when is_binary(S); is_list(S) -> ok;
|
||||
non_empty_string(_) -> {error, invalid_string}.
|
||||
|
|
|
@ -180,7 +180,6 @@ do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
|
|||
{OldSource, Front, Rear} = take(Type, OldSources),
|
||||
NewSource = get_source_by_type(type(RawNewSource), Sources),
|
||||
ok = ensure_resource_deleted(OldSource),
|
||||
clear_certs(OldSource),
|
||||
InitedSources = init_source(NewSource),
|
||||
Front ++ [InitedSources] ++ Rear;
|
||||
do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
-export([start/2, stop/1]).
|
||||
|
||||
-export([ pre_config_update/3
|
||||
, post_config_update/5
|
||||
]).
|
||||
|
||||
-define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())).
|
||||
|
@ -46,8 +47,18 @@ pre_config_update(_, {_Oper, _, _}, undefined) ->
|
|||
pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
|
||||
%% to save the 'enable' to the config files
|
||||
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
|
||||
pre_config_update(_, Conf, _OldConfig) when is_map(Conf) ->
|
||||
{ok, Conf}.
|
||||
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
|
||||
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, ConfNew} ->
|
||||
{ok, ConfNew}
|
||||
end.
|
||||
|
||||
post_config_update(Path, '$remove', _, OldConf, _AppEnvs) ->
|
||||
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf);
|
||||
post_config_update(_Path, _Req, _, _OldConf, _AppEnvs) ->
|
||||
ok.
|
||||
|
||||
%% internal functions
|
||||
operation_to_enable(disable) -> false;
|
||||
|
|
|
@ -15,7 +15,10 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_connector).
|
||||
|
||||
-export([config_key_path/0]).
|
||||
-export([ config_key_path/0
|
||||
, pre_config_update/3
|
||||
, post_config_update/5
|
||||
]).
|
||||
|
||||
-export([ parse_connector_id/1
|
||||
, connector_id/2
|
||||
|
@ -31,18 +34,25 @@
|
|||
, delete/2
|
||||
]).
|
||||
|
||||
-export([ post_config_update/5
|
||||
]).
|
||||
|
||||
config_key_path() ->
|
||||
[connectors].
|
||||
|
||||
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
|
||||
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, ConfNew} ->
|
||||
{ok, ConfNew}
|
||||
end.
|
||||
|
||||
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
|
||||
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
|
||||
post_config_update([connectors, Type, Name] = Path, '$remove', _, OldConf, _AppEnvs) ->
|
||||
ConnId = connector_id(Type, Name),
|
||||
try foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) ->
|
||||
try
|
||||
foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) ->
|
||||
throw({dependency_bridges_exist, emqx_bridge:bridge_id(BType, BName)})
|
||||
end)
|
||||
end),
|
||||
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf)
|
||||
catch throw:Error -> {error, Error}
|
||||
end;
|
||||
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
|
||||
|
|
|
@ -15,37 +15,36 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_resource_ssl).
|
||||
-module(emqx_connector_ssl).
|
||||
|
||||
-export([ convert_certs/2
|
||||
, convert_certs/3
|
||||
, clear_certs/2
|
||||
]).
|
||||
|
||||
convert_certs(ResId, NewConfig) ->
|
||||
convert_certs(ResId, NewConfig, #{}).
|
||||
|
||||
convert_certs(ResId, NewConfig, OldConfig) ->
|
||||
OldSSL = drop_invalid_certs(maps:get(ssl, OldConfig, undefined)),
|
||||
NewSSL = drop_invalid_certs(maps:get(ssl, NewConfig, undefined)),
|
||||
CertsDir = cert_dir(ResId),
|
||||
case emqx_tls_lib:ensure_ssl_files(CertsDir, NewSSL) of
|
||||
convert_certs(RltvDir, NewConfig) ->
|
||||
NewSSL = drop_invalid_certs(maps:get(<<"ssl">>, NewConfig, undefined)),
|
||||
case emqx_tls_lib:ensure_ssl_files(RltvDir, NewSSL) of
|
||||
{ok, NewSSL1} ->
|
||||
ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL),
|
||||
{ok, new_ssl_config(NewConfig, NewSSL1)};
|
||||
{error, Reason} ->
|
||||
{error, {bad_ssl_config, Reason}}
|
||||
end.
|
||||
|
||||
clear_certs(ResId, Config) ->
|
||||
OldSSL = drop_invalid_certs(maps:get(ssl, Config, undefined)),
|
||||
ok = emqx_tls_lib:delete_ssl_files(cert_dir(ResId), undefined, OldSSL).
|
||||
|
||||
cert_dir(ResId) ->
|
||||
filename:join(["resources", ResId]).
|
||||
clear_certs(RltvDir, Config) ->
|
||||
OldSSL = drop_invalid_certs(map_get_oneof([<<"ssl">>, ssl], Config, undefined)),
|
||||
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL).
|
||||
|
||||
new_ssl_config(Config, undefined) -> Config;
|
||||
new_ssl_config(Config, SSL) -> Config#{ssl => SSL}.
|
||||
new_ssl_config(Config, SSL) -> Config#{<<"ssl">> => SSL}.
|
||||
|
||||
drop_invalid_certs(undefined) -> undefined;
|
||||
drop_invalid_certs(SSL) -> emqx_tls_lib:drop_invalid_certs(SSL).
|
||||
|
||||
map_get_oneof([], _Map, Default) -> Default;
|
||||
map_get_oneof([Key | Keys], Map, Default) ->
|
||||
case maps:find(Key, Map) of
|
||||
error ->
|
||||
map_get_oneof(Keys, Map, Default);
|
||||
{ok, Value} ->
|
||||
Value
|
||||
end.
|
|
@ -31,8 +31,6 @@
|
|||
, egress_desc/0
|
||||
]).
|
||||
|
||||
-export([non_empty_string/1]).
|
||||
|
||||
-import(emqx_schema, [mk_duration/2]).
|
||||
|
||||
namespace() -> "connector-mqtt".
|
||||
|
@ -98,7 +96,7 @@ fields("ingress") ->
|
|||
[ {remote_topic,
|
||||
sc(binary(),
|
||||
#{ required => true
|
||||
, validator => fun ?MODULE:non_empty_string/1
|
||||
, validator => fun emqx_schema:non_empty_string/1
|
||||
, desc => ?DESC("ingress_remote_topic")
|
||||
})}
|
||||
, {remote_qos,
|
||||
|
@ -108,7 +106,7 @@ fields("ingress") ->
|
|||
})}
|
||||
, {local_topic,
|
||||
sc(binary(),
|
||||
#{ validator => fun ?MODULE:non_empty_string/1
|
||||
#{ validator => fun emqx_schema:non_empty_string/1
|
||||
, desc => ?DESC("ingress_local_topic")
|
||||
})}
|
||||
, {local_qos,
|
||||
|
@ -140,12 +138,12 @@ fields("egress") ->
|
|||
[ {local_topic,
|
||||
sc(binary(),
|
||||
#{ desc => ?DESC("egress_local_topic")
|
||||
, validator => fun ?MODULE:non_empty_string/1
|
||||
, validator => fun emqx_schema:non_empty_string/1
|
||||
})}
|
||||
, {remote_topic,
|
||||
sc(binary(),
|
||||
#{ required => true
|
||||
, validator => fun ?MODULE:non_empty_string/1
|
||||
, validator => fun emqx_schema:non_empty_string/1
|
||||
, desc => ?DESC("egress_remote_topic")
|
||||
})}
|
||||
, {remote_qos,
|
||||
|
@ -228,10 +226,5 @@ local_topic will be forwarded.
|
|||
qos() ->
|
||||
hoconsc:union([emqx_schema:qos(), binary()]).
|
||||
|
||||
non_empty_string(<<>>) -> {error, empty_string_not_allowed};
|
||||
non_empty_string("") -> {error, empty_string_not_allowed};
|
||||
non_empty_string(S) when is_binary(S); is_list(S) -> ok;
|
||||
non_empty_string(_) -> {error, invalid_string}.
|
||||
|
||||
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
||||
ref(Field) -> hoconsc:ref(?MODULE, Field).
|
||||
|
|
|
@ -196,32 +196,14 @@ do_create(InstId, Group, ResourceType, Config, Opts) ->
|
|||
{ok, _, _} ->
|
||||
{ok, already_created};
|
||||
{error, not_found} ->
|
||||
case emqx_resource_ssl:convert_certs(InstId, Config) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, Config1} ->
|
||||
do_create2(InstId, Group, ResourceType, Config1, Opts)
|
||||
end
|
||||
ok = do_start(InstId, Group, ResourceType, Config, Opts),
|
||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
|
||||
[matched, success, failed, exception], [matched]),
|
||||
{ok, force_lookup(InstId)}
|
||||
end.
|
||||
|
||||
do_create2(InstId, Group, ResourceType, Config, Opts) ->
|
||||
ok = do_start(InstId, Group, ResourceType, Config, Opts),
|
||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
|
||||
[matched, success, failed, exception], [matched]),
|
||||
{ok, force_lookup(InstId)}.
|
||||
|
||||
do_create_dry_run(ResourceType, Config) ->
|
||||
InstId = make_test_id(),
|
||||
case emqx_resource_ssl:convert_certs(InstId, Config) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, Config1} ->
|
||||
Result = do_create_dry_run2(InstId, ResourceType, Config1),
|
||||
_ = emqx_resource_ssl:clear_certs(InstId, Config1),
|
||||
Result
|
||||
end.
|
||||
|
||||
do_create_dry_run2(InstId, ResourceType, Config) ->
|
||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState} ->
|
||||
case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
||||
|
@ -245,9 +227,8 @@ do_remove(Instance) ->
|
|||
do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
|
||||
do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]).
|
||||
|
||||
do_remove(Group, #{id := InstId, config := Config} = Data, ClearMetrics) ->
|
||||
do_remove(Group, #{id := InstId} = Data, ClearMetrics) ->
|
||||
_ = do_stop(Group, Data),
|
||||
_ = emqx_resource_ssl:clear_certs(InstId, Config),
|
||||
ets:delete(emqx_resource_instance, InstId),
|
||||
case ClearMetrics of
|
||||
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
|
||||
|
|
|
@ -82,7 +82,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
|
|||
topic := TopicTks,
|
||||
payload := PayloadTks}}) ->
|
||||
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
|
||||
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
||||
Payload = format_msg(PayloadTks, Selected),
|
||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||
?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
|
||||
|
@ -96,7 +96,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}},
|
|||
topic := TopicTks,
|
||||
payload := PayloadTks}}) ->
|
||||
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
|
||||
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
||||
Payload = format_msg(PayloadTks, Selected),
|
||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||
?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
|
||||
|
@ -163,3 +163,8 @@ replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
|
|||
end;
|
||||
replace_simple_var(Val, _Data, _Default) ->
|
||||
Val.
|
||||
|
||||
format_msg([], Selected) ->
|
||||
emqx_json:encode(Selected);
|
||||
format_msg(Tokens, Selected) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
|
||||
|
|
Loading…
Reference in New Issue