From 2783192f7727e4e9f05e58505fbbbbbd7107a50f Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 9 Jul 2024 17:38:34 +0800 Subject: [PATCH] fix: update cluster.links via cli --- apps/emqx_conf/include/emqx_conf.hrl | 2 +- apps/emqx_conf/src/emqx_conf_cli.erl | 133 +++++++++++++++++---------- 2 files changed, 86 insertions(+), 49 deletions(-) diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 53f309856..08afb7f10 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -46,6 +46,6 @@ ) ). --define(READONLY_KEYS, [cluster, rpc, node]). +-define(READONLY_KEYS, [rpc, node]). -endif. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 8c38d74bc..b1db1fff0 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -15,6 +15,8 @@ %%-------------------------------------------------------------------- -module(emqx_conf_cli). +-feature(maybe_expr, enable). + -include("emqx_conf.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -40,6 +42,7 @@ -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). -define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). +-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). -define(TIMEOUT, 30000). -dialyzer({no_match, [load/0]}). @@ -345,25 +348,21 @@ load_config_from_raw(RawConf0, Opts) -> RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0), case check_config(RawConf1, Opts) of {ok, RawConf} -> - %% It has been ensured that the connector is always the first configuration to be updated. - %% However, when deleting the connector, we need to clean up the dependent actions/sources first; - %% otherwise, the deletion will fail. - %% notice: we can't create a action/sources before connector. - uninstall(<<"actions">>, RawConf, Opts), - uninstall(<<"sources">>, RawConf, Opts), - Error = - lists:filtermap( - fun({K, V}) -> - case update_config_cluster(K, V, Opts) of - ok -> false; - {error, Msg} -> {true, Msg} - end - end, - to_sorted_list(RawConf) - ), - case iolist_to_binary(Error) of - <<"">> -> ok; - ErrorBin -> {error, ErrorBin} + case update_cluster_links(cluster, RawConf, Opts) of + ok -> + %% It has been ensured that the connector is always the first configuration to be updated. + %% However, when deleting the connector, we need to clean up the dependent actions/sources first; + %% otherwise, the deletion will fail. + %% notice: we can't create a action/sources before connector. + uninstall(<<"actions">>, RawConf, Opts), + uninstall(<<"sources">>, RawConf, Opts), + Error = update_config_cluster(Opts, RawConf), + case iolist_to_binary(Error) of + <<"">> -> ok; + ErrorBin -> {error, ErrorBin} + end; + {error, Reason} -> + {error, Reason} end; {error, ?UPDATE_READONLY_KEYS_PROHIBITED, ReadOnlyKeyStr} -> Reason = iolist_to_binary( @@ -390,6 +389,26 @@ load_config_from_raw(RawConf0, Opts) -> {error, Errors} end. +update_config_cluster(Opts, RawConf) -> + lists:filtermap( + fun({K, V}) -> + case update_config_cluster(K, V, Opts) of + ok -> false; + {error, Msg} -> {true, Msg} + end + end, + to_sorted_list(RawConf) + ). + +update_cluster_links(cluster, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) -> + Res = emqx_conf:update([<<"cluster">>, <<"links">>], Links, ?OPTIONS), + check_res(<<"cluster.links">>, Res, Links, Opts); +update_cluster_links(local, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) -> + Res = emqx:update_config([<<"cluster">>, <<"links">>], Links, ?LOCAL_OPTIONS), + check_res(node(), <<"cluster.links">>, Res, Links, Opts); +update_cluster_links(_, _, _) -> + ok. + uninstall(ActionOrSource, Conf, #{mode := replace}) -> case maps:find(ActionOrSource, Conf) of {ok, New} -> @@ -449,7 +468,6 @@ update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> update_config_cluster(Key, Value, #{mode := replace} = Opts) -> check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts). --define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). update_config_local( ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, @@ -509,15 +527,14 @@ suggest_msg(_, _) -> <<"">>. check_config(Conf0, Opts) -> - case check_keys_is_not_readonly(Conf0, Opts) of - {ok, Conf1} -> - Conf = emqx_config:fill_defaults(Conf1), - case check_config_schema(Conf) of - ok -> {ok, Conf}; - {error, Reason} -> {error, Reason} - end; - Error -> - Error + maybe + {ok, Conf1} ?= check_keys_is_not_readonly(Conf0, Opts), + {ok, Conf2} ?= check_cluster_keys(Conf1, Opts), + Conf3 = emqx_config:fill_defaults(Conf2), + ok ?= check_config_schema(Conf3), + {ok, Conf3} + else + Error -> Error end. check_keys_is_not_readonly(Conf, Opts) -> @@ -535,6 +552,20 @@ check_keys_is_not_readonly(Conf, Opts) -> {error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr} end. +check_cluster_keys(Conf = #{<<"cluster">> := Cluster}, Opts) -> + IgnoreReadonly = maps:get(ignore_readonly, Opts, false), + case maps:keys(Cluster) -- [<<"links">>] of + [] -> + {ok, Conf}; + Keys when IgnoreReadonly -> + ?SLOG(info, #{msg => "readonly_root_keys_ignored", keys => Keys}), + {ok, Conf#{<<"cluster">> => maps:with([<<"links">>], Cluster)}}; + Keys -> + BadKeys = ["cluster." ++ K || K <- Keys], + BadKeysStr = lists:join(<<",">>, BadKeys), + {error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr} + end. + check_config_schema(Conf) -> SchemaMod = emqx_conf:schema_module(), Fold = fun({Key, Value}, Acc) -> @@ -604,23 +635,28 @@ filter_readonly_config(Raw) -> end. reload_config(AllConf, Opts) -> - uninstall(<<"actions">>, AllConf, Opts), - uninstall(<<"sources">>, AllConf, Opts), - Fold = fun({Key, Conf}, Acc) -> - case update_config_local(Key, Conf, Opts) of - ok -> - Acc; - Error -> - ?SLOG(error, #{ - msg => "failed_to_reload_etc_config", - key => Key, - value => Conf, - error => Error - }), - [{Key, Error} | Acc] - end - end, - sorted_fold(Fold, AllConf). + case update_cluster_links(local, AllConf, Opts) of + ok -> + uninstall(<<"actions">>, AllConf, Opts), + uninstall(<<"sources">>, AllConf, Opts), + Fold = fun({Key, Conf}, Acc) -> + case update_config_local(Key, Conf, Opts) of + ok -> + Acc; + Error -> + ?SLOG(error, #{ + msg => "failed_to_reload_etc_config", + key => Key, + value => Conf, + error => Error + }), + [{Key, Error} | Acc] + end + end, + sorted_fold(Fold, AllConf); + {error, Reason} -> + {error, Reason} + end. sorted_fold(Func, Conf) -> case lists:foldl(Func, [], to_sorted_list(Conf)) of @@ -629,10 +665,11 @@ sorted_fold(Func, Conf) -> end. to_sorted_list(Conf0) -> + Conf1 = maps:remove(<<"cluster">>, Conf0), %% connectors > actions/bridges > rule_engine Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>], - {HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []), - HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)). + {HighPriorities, Conf2} = split_high_priority_conf(Keys, Conf1, []), + HighPriorities ++ lists:keysort(1, maps:to_list(Conf2)). split_high_priority_conf([], Conf0, Acc) -> {lists:reverse(Acc), Conf0};