From ba800d853d37d8b4c054d95abfdf036f59bcccea Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 26 Apr 2022 18:19:40 +0800 Subject: [PATCH 1/5] fix(rule): republish all available fields if payload template empty --- apps/emqx_rule_engine/src/emqx_rule_outputs.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index c0f685d4e..6a858a73b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -82,7 +82,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, topic := TopicTks, payload := PayloadTks}}) -> Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), - Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), + Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}), @@ -96,7 +96,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}}, topic := TopicTks, payload := PayloadTks}}) -> Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), - Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), + Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}), @@ -163,3 +163,8 @@ replace_simple_var(Tokens, Data, Default) when is_list(Tokens) -> end; replace_simple_var(Val, _Data, _Default) -> Val. + +format_msg([], Selected) -> + emqx_json:encode(Selected); +format_msg(Tokens, Selected) -> + emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected). From 94e24c262177d0a0d8636aeb8630b1937608090d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Apr 2022 01:02:57 +0800 Subject: [PATCH 2/5] refactor: move ssl file handling from resources to bridges --- apps/emqx_bridge/src/emqx_bridge_app.erl | 15 ++++++-- apps/emqx_connector/src/emqx_connector.erl | 21 +++++++---- .../src/emqx_connector_ssl.erl} | 35 +++++++++---------- .../src/emqx_resource_instance.erl | 29 +++------------ 4 files changed, 50 insertions(+), 50 deletions(-) rename apps/{emqx_resource/src/emqx_resource_ssl.erl => emqx_connector/src/emqx_connector_ssl.erl} (60%) diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index b02fe2a9c..99b2c4a84 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -20,6 +20,7 @@ -export([start/2, stop/1]). -export([ pre_config_update/3 + , post_config_update/5 ]). -define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())). @@ -46,8 +47,18 @@ pre_config_update(_, {_Oper, _, _}, undefined) -> pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> %% to save the 'enable' to the config files {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; -pre_config_update(_, Conf, _OldConfig) when is_map(Conf) -> - {ok, Conf}. +pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> + case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of + {error, Reason} -> + {error, Reason}; + {ok, ConfNew} -> + {ok, ConfNew} + end. + +post_config_update(Path, '$remove', _, OldConf, _AppEnvs) -> + _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf); +post_config_update(_Path, _Req, _, _OldConf, _AppEnvs) -> + ok. %% internal functions operation_to_enable(disable) -> false; diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 16684466f..0e17971e1 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -15,7 +15,10 @@ %%-------------------------------------------------------------------- -module(emqx_connector). --export([config_key_path/0]). +-export([ config_key_path/0 + , pre_config_update/3 + , post_config_update/5 + ]). -export([ parse_connector_id/1 , connector_id/2 @@ -31,20 +34,26 @@ , delete/2 ]). --export([ post_config_update/5 - ]). - config_key_path() -> [connectors]. +pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> + case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of + {error, Reason} -> + {error, Reason}; + {ok, ConfNew} -> + {ok, ConfNew} + end. + -dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]). -post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> +post_config_update([connectors, Type, Name] = Path, '$remove', _, OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), try foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) -> throw({dependency_bridges_exist, emqx_bridge:bridge_id(BType, BName)}) end) catch throw:Error -> {error, Error} - end; + end, + _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf); post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), foreach_linked_bridges(ConnId, diff --git a/apps/emqx_resource/src/emqx_resource_ssl.erl b/apps/emqx_connector/src/emqx_connector_ssl.erl similarity index 60% rename from apps/emqx_resource/src/emqx_resource_ssl.erl rename to apps/emqx_connector/src/emqx_connector_ssl.erl index 9e3fe0456..07b12eea1 100644 --- a/apps/emqx_resource/src/emqx_resource_ssl.erl +++ b/apps/emqx_connector/src/emqx_connector_ssl.erl @@ -15,37 +15,36 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_resource_ssl). +-module(emqx_connector_ssl). -export([ convert_certs/2 - , convert_certs/3 , clear_certs/2 ]). -convert_certs(ResId, NewConfig) -> - convert_certs(ResId, NewConfig, #{}). - -convert_certs(ResId, NewConfig, OldConfig) -> - OldSSL = drop_invalid_certs(maps:get(ssl, OldConfig, undefined)), - NewSSL = drop_invalid_certs(maps:get(ssl, NewConfig, undefined)), - CertsDir = cert_dir(ResId), - case emqx_tls_lib:ensure_ssl_files(CertsDir, NewSSL) of +convert_certs(RltvDir, NewConfig) -> + NewSSL = drop_invalid_certs(maps:get(<<"ssl">>, NewConfig, undefined)), + case emqx_tls_lib:ensure_ssl_files(RltvDir, NewSSL) of {ok, NewSSL1} -> - ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL), {ok, new_ssl_config(NewConfig, NewSSL1)}; {error, Reason} -> {error, {bad_ssl_config, Reason}} end. -clear_certs(ResId, Config) -> - OldSSL = drop_invalid_certs(maps:get(ssl, Config, undefined)), - ok = emqx_tls_lib:delete_ssl_files(cert_dir(ResId), undefined, OldSSL). - -cert_dir(ResId) -> - filename:join(["resources", ResId]). +clear_certs(RltvDir, Config) -> + OldSSL = drop_invalid_certs(map_get_oneof([<<"ssl">>, ssl], Config, undefined)), + ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL). new_ssl_config(Config, undefined) -> Config; -new_ssl_config(Config, SSL) -> Config#{ssl => SSL}. +new_ssl_config(Config, SSL) -> Config#{<<"ssl">> => SSL}. drop_invalid_certs(undefined) -> undefined; drop_invalid_certs(SSL) -> emqx_tls_lib:drop_invalid_certs(SSL). + +map_get_oneof([], _Map, Default) -> Default; +map_get_oneof([Key | Keys], Map, Default) -> + case maps:find(Key, Map) of + error -> + map_get_oneof(Keys, Map, Default); + {ok, Value} -> + Value + end. \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 352ddf247..60b2babe5 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -196,32 +196,14 @@ do_create(InstId, Group, ResourceType, Config, Opts) -> {ok, _, _} -> {ok, already_created}; {error, not_found} -> - case emqx_resource_ssl:convert_certs(InstId, Config) of - {error, Reason} -> - {error, Reason}; - {ok, Config1} -> - do_create2(InstId, Group, ResourceType, Config1, Opts) - end + ok = do_start(InstId, Group, ResourceType, Config, Opts), + ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, + [matched, success, failed, exception], [matched]), + {ok, force_lookup(InstId)} end. -do_create2(InstId, Group, ResourceType, Config, Opts) -> - ok = do_start(InstId, Group, ResourceType, Config, Opts), - ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, - [matched, success, failed, exception], [matched]), - {ok, force_lookup(InstId)}. - do_create_dry_run(ResourceType, Config) -> InstId = make_test_id(), - case emqx_resource_ssl:convert_certs(InstId, Config) of - {error, Reason} -> - {error, Reason}; - {ok, Config1} -> - Result = do_create_dry_run2(InstId, ResourceType, Config1), - _ = emqx_resource_ssl:clear_certs(InstId, Config1), - Result - end. - -do_create_dry_run2(InstId, ResourceType, Config) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of @@ -245,9 +227,8 @@ do_remove(Instance) -> do_remove(InstId, ClearMetrics) when is_binary(InstId) -> do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]). -do_remove(Group, #{id := InstId, config := Config} = Data, ClearMetrics) -> +do_remove(Group, #{id := InstId} = Data, ClearMetrics) -> _ = do_stop(Group, Data), - _ = emqx_resource_ssl:clear_certs(InstId, Config), ets:delete(emqx_resource_instance, InstId), case ClearMetrics of true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); From 318d0df4194c0c0332d0a963d4aca60f347c8775 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Apr 2022 11:58:41 +0800 Subject: [PATCH 3/5] fix: return value of post_config_update --- apps/emqx_connector/src/emqx_connector.erl | 9 +++++---- apps/emqx_connector/src/emqx_connector_ssl.erl | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 0e17971e1..fbb89e8e7 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -48,12 +48,13 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> -dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]). post_config_update([connectors, Type, Name] = Path, '$remove', _, OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), - try foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) -> + try + foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) -> throw({dependency_bridges_exist, emqx_bridge:bridge_id(BType, BName)}) - end) + end), + _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf) catch throw:Error -> {error, Error} - end, - _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf); + end; post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), foreach_linked_bridges(ConnId, diff --git a/apps/emqx_connector/src/emqx_connector_ssl.erl b/apps/emqx_connector/src/emqx_connector_ssl.erl index 07b12eea1..02d9a4070 100644 --- a/apps/emqx_connector/src/emqx_connector_ssl.erl +++ b/apps/emqx_connector/src/emqx_connector_ssl.erl @@ -47,4 +47,4 @@ map_get_oneof([Key | Keys], Map, Default) -> map_get_oneof(Keys, Map, Default); {ok, Value} -> Value - end. \ No newline at end of file + end. From 46550d5a6ffa5ed8af5c8adbb89c3aa4befa0eaf Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Apr 2022 14:07:33 +0800 Subject: [PATCH 4/5] fix: don't remote the cert files when updating authz --- apps/emqx_authz/src/emqx_authz.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 953dad27c..e394f46f8 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -180,7 +180,6 @@ do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) -> {OldSource, Front, Rear} = take(Type, OldSources), NewSource = get_source_by_type(type(RawNewSource), Sources), ok = ensure_resource_deleted(OldSource), - clear_certs(OldSource), InitedSources = init_source(NewSource), Front ++ [InitedSources] ++ Rear; do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> From 0635918d16575dd9d34c85be5cac79e6caeee34c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Apr 2022 14:17:22 +0800 Subject: [PATCH 5/5] fix: define ssl SNI field as a non-empty-string --- apps/emqx/src/emqx_schema.erl | 7 +++++++ .../src/mqtt/emqx_connector_mqtt_schema.erl | 15 ++++----------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 4a23bfeda..b4cb63fc4 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -57,6 +57,7 @@ validate_heap_size/1, parse_user_lookup_fun/1, validate_alarm_actions/1, + non_empty_string/1, validations/0 ]). @@ -1898,6 +1899,7 @@ client_ssl_opts_schema(Defaults1) -> hoconsc:union([disable, string()]), #{ required => false, + validator => fun emqx_schema:non_empty_string/1, desc => ?DESC(client_ssl_opts_schema_server_name_indication) } )} @@ -2177,3 +2179,8 @@ authentication(Type) -> -spec qos() -> typerefl:type(). qos() -> typerefl:alias("qos", typerefl:union([0, 1, 2])). + +non_empty_string(<<>>) -> {error, empty_string_not_allowed}; +non_empty_string("") -> {error, empty_string_not_allowed}; +non_empty_string(S) when is_binary(S); is_list(S) -> ok; +non_empty_string(_) -> {error, invalid_string}. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 2a0fcc3fa..d913e1ecf 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -31,8 +31,6 @@ , egress_desc/0 ]). --export([non_empty_string/1]). - -import(emqx_schema, [mk_duration/2]). namespace() -> "connector-mqtt". @@ -98,7 +96,7 @@ fields("ingress") -> [ {remote_topic, sc(binary(), #{ required => true - , validator => fun ?MODULE:non_empty_string/1 + , validator => fun emqx_schema:non_empty_string/1 , desc => ?DESC("ingress_remote_topic") })} , {remote_qos, @@ -108,7 +106,7 @@ fields("ingress") -> })} , {local_topic, sc(binary(), - #{ validator => fun ?MODULE:non_empty_string/1 + #{ validator => fun emqx_schema:non_empty_string/1 , desc => ?DESC("ingress_local_topic") })} , {local_qos, @@ -140,12 +138,12 @@ fields("egress") -> [ {local_topic, sc(binary(), #{ desc => ?DESC("egress_local_topic") - , validator => fun ?MODULE:non_empty_string/1 + , validator => fun emqx_schema:non_empty_string/1 })} , {remote_topic, sc(binary(), #{ required => true - , validator => fun ?MODULE:non_empty_string/1 + , validator => fun emqx_schema:non_empty_string/1 , desc => ?DESC("egress_remote_topic") })} , {remote_qos, @@ -228,10 +226,5 @@ local_topic will be forwarded. qos() -> hoconsc:union([emqx_schema:qos(), binary()]). -non_empty_string(<<>>) -> {error, empty_string_not_allowed}; -non_empty_string("") -> {error, empty_string_not_allowed}; -non_empty_string(S) when is_binary(S); is_list(S) -> ok; -non_empty_string(_) -> {error, invalid_string}. - sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field).