diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index aeaa4c9e2..378696d2c 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -499,15 +499,14 @@ fill_defaults(RawConf, Opts) -> ). -spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map(). -fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) -> +fill_defaults(SchemaMod, RawConf = #{<<"durable_storage">> := Ds}, Opts) -> %% FIXME: kludge to prevent `emqx_config' module from filling in %% the default values for backends and layouts. These records are %% inside unions, and adding default values there will add %% incompatible fields. - %% - %% Note: this function is called for each individual conf root, so - %% this clause only affects this particular subtree. - RawConf; + RawConf1 = maps:remove(<<"durable_storage">>, RawConf), + Conf = fill_defaults(SchemaMod, RawConf1, Opts), + Conf#{<<"durable_storage">> => Ds}; fill_defaults(SchemaMod, RawConf, Opts0) -> Opts = maps:merge(#{required => false, make_serializable => true}, Opts0), hocon_tconf:check_plain( diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl index 97e62402c..166179d78 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl @@ -56,9 +56,31 @@ mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) -> ), {NodesA, NodesB}. +t_config_update_cli('init', Config0) -> + Config1 = + [ + {name_prefix, ?FUNCTION_NAME} + | Config0 + ], + Config2 = t_config_update('init', Config1), + [ + {update_from, cli} + | lists:keydelete(update_from, 1, Config2) + ]; +t_config_update_cli('end', Config) -> + t_config_update('end', Config). + +t_config_update_cli(Config) -> + t_config_update(Config). + t_config_update('init', Config) -> - NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]), - NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]), + NamePrefix = + case ?config(name_prefix, Config) of + undefined -> ?FUNCTION_NAME; + Name -> Name + end, + NameA = fmt("~s_~s", [NamePrefix, "a"]), + NameB = fmt("~s_~s", [NamePrefix, "b"]), LPortA = 31883, LPortB = 41883, ConfA = combine([conf_cluster(NameA), conf_log()]), @@ -73,7 +95,8 @@ t_config_update('init', Config) -> {lport_a, LPortA}, {lport_b, LPortB}, {name_a, NameA}, - {name_b, NameB} + {name_b, NameB}, + {update_from, api} | Config ]; t_config_update('end', Config) -> @@ -113,12 +136,12 @@ t_config_update(Config) -> {ok, SubRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := clink_route_bootstrap_complete}), - %% 5 nodes = 5 actors (durable storage is dsabled) + %% 5 nodes = 5 actors (durable storage is disabled) 5, 30_000 ), - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])), - ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])), + ?assertMatch({ok, _}, update(NodeA1, [LinkConfA], Config)), + ?assertMatch({ok, _}, update(NodeB1, [LinkConfB], Config)), ?assertMatch( {ok, [ @@ -158,7 +181,7 @@ t_config_update(Config) -> %% update link LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]}, - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])), + ?assertMatch({ok, _}, update(NodeA1, [LinkConfA1], Config)), ?assertMatch( {ok, [ @@ -187,20 +210,21 @@ t_config_update(Config) -> %% disable link LinkConfA2 = LinkConfA1#{<<"enable">> => false}, - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA2]])), - %% must be already blocked by the receiving cluster even if externak routing state is not + ?assertMatch({ok, _}, update(NodeA1, [LinkConfA2], Config)), + %% must be already blocked by the receiving cluster even if external routing state is not %% updated yet {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1), LinkConfB1 = LinkConfB#{<<"enable">> => false}, - ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB1]])), + ?assertMatch({ok, _}, update(NodeB1, [LinkConfB1], Config)), {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1), ?assertNotReceive({publish, _Message = #{}}, 3000), %% delete links - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), - ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[]])), + ?assertMatch({ok, _}, update(NodeA1, [], Config)), + ?assertMatch({ok, _}, update(NodeB1, [], Config)), + validate_update_cli_failed(NodeA1, Config), ok = emqtt:stop(ClientA), ok = emqtt:stop(ClientB). @@ -645,3 +669,51 @@ fmt(Fmt, Args) -> mk_nodename(BaseName, Idx) -> binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])). + +validate_update_cli_failed(Node, Config) -> + case ?config(update_from, Config) of + api -> + ok; + cli -> + ConfBin = hocon_pp:do( + #{ + <<"cluster">> => #{ + <<"links">> => [], + <<"autoclean">> => <<"12h">> + } + }, + #{} + ), + ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config), + ?assertMatch( + {error, <<"Cannot update read-only key 'cluster.autoclean'.">>}, + erpc:call(Node, emqx_conf_cli, conf, [["load", ConfFile]]) + ) + end. + +update(Node, Links, Config) -> + case ?config(update_from, Config) of + api -> update_links_from_api(Node, Links, Config); + cli -> update_links_from_cli(Node, Links, Config) + end. + +update_links_from_api(Node, Links, _Config) -> + erpc:call(Node, emqx_cluster_link_config, update, [Links]). + +update_links_from_cli(Node, Links, Config) -> + ConfBin = hocon_pp:do(#{<<"cluster">> => #{<<"links">> => Links}}, #{}), + ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config), + case erpc:call(Node, emqx_conf_cli, conf, [["load", ConfFile]]) of + ok -> {ok, Links}; + Error -> Error + end. + +prepare_conf_file(Name, Content, CTConfig) -> + Filename = tc_conf_file(Name, CTConfig), + filelib:ensure_dir(Filename), + ok = file:write_file(Filename, Content), + Filename. + +tc_conf_file(TC, Config) -> + DataDir = ?config(data_dir, Config), + filename:join([DataDir, TC, 'emqx.conf']). diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 53f309856..4d44d9076 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -46,6 +46,4 @@ ) ). --define(READONLY_KEYS, [cluster, rpc, node]). - -endif. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index e730f23c0..3c0e98096 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -15,6 +15,8 @@ %%-------------------------------------------------------------------- -module(emqx_conf_cli). +-feature(maybe_expr, enable). + -include("emqx_conf.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -40,8 +42,12 @@ -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). -define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). +-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). -define(TIMEOUT, 30000). +%% All 'cluster.*' keys, except for 'cluster.link', should also be treated as read-only. +-define(READONLY_ROOT_KEYS, [rpc, node]). + -dialyzer({no_match, [load/0]}). load() -> @@ -312,8 +318,14 @@ get_config(Key) -> load_config(Path, Opts) when is_list(Path) -> case hocon:files([Path]) of {ok, RawConf} when RawConf =:= #{} -> - emqx_ctl:warning("load ~ts is empty~n", [Path]), - {error, empty_hocon_file}; + case filelib:is_regular(Path) of + true -> + emqx_ctl:warning("load ~ts is empty~n", [Path]), + {error, #{cause => empty_hocon_file, path => Path}}; + false -> + emqx_ctl:warning("~ts file is not found~n", [Path]), + {error, #{cause => not_a_file, path => Path}} + end; {ok, RawConf} -> load_config_from_raw(RawConf, Opts); {error, Reason} -> @@ -339,25 +351,21 @@ load_config_from_raw(RawConf0, Opts) -> RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0), case check_config(RawConf1, Opts) of {ok, RawConf} -> - %% It has been ensured that the connector is always the first configuration to be updated. - %% However, when deleting the connector, we need to clean up the dependent actions/sources first; - %% otherwise, the deletion will fail. - %% notice: we can't create a action/sources before connector. - uninstall(<<"actions">>, RawConf, Opts), - uninstall(<<"sources">>, RawConf, Opts), - Error = - lists:filtermap( - fun({K, V}) -> - case update_config_cluster(K, V, Opts) of - ok -> false; - {error, Msg} -> {true, Msg} - end - end, - to_sorted_list(RawConf) - ), - case iolist_to_binary(Error) of - <<"">> -> ok; - ErrorBin -> {error, ErrorBin} + case update_cluster_links(cluster, RawConf, Opts) of + ok -> + %% It has been ensured that the connector is always the first configuration to be updated. + %% However, when deleting the connector, we need to clean up the dependent actions/sources first; + %% otherwise, the deletion will fail. + %% notice: we can't create a action/sources before connector. + uninstall(<<"actions">>, RawConf, Opts), + uninstall(<<"sources">>, RawConf, Opts), + Error = update_config_cluster(Opts, RawConf), + case iolist_to_binary(Error) of + <<"">> -> ok; + ErrorBin -> {error, ErrorBin} + end; + {error, Reason} -> + {error, Reason} end; {error, ?UPDATE_READONLY_KEYS_PROHIBITED, ReadOnlyKeyStr} -> Reason = iolist_to_binary( @@ -384,6 +392,26 @@ load_config_from_raw(RawConf0, Opts) -> {error, Errors} end. +update_config_cluster(Opts, RawConf) -> + lists:filtermap( + fun({K, V}) -> + case update_config_cluster(K, V, Opts) of + ok -> false; + {error, Msg} -> {true, Msg} + end + end, + to_sorted_list(RawConf) + ). + +update_cluster_links(cluster, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) -> + Res = emqx_conf:update([<<"cluster">>, <<"links">>], Links, ?OPTIONS), + check_res(<<"cluster.links">>, Res, Links, Opts); +update_cluster_links(local, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) -> + Res = emqx:update_config([<<"cluster">>, <<"links">>], Links, ?LOCAL_OPTIONS), + check_res(node(), <<"cluster.links">>, Res, Links, Opts); +update_cluster_links(_, _, _) -> + ok. + uninstall(ActionOrSource, Conf, #{mode := replace}) -> case maps:find(ActionOrSource, Conf) of {ok, New} -> @@ -443,7 +471,6 @@ update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> update_config_cluster(Key, Value, #{mode := replace} = Opts) -> check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts). --define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). update_config_local( ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, @@ -503,21 +530,20 @@ suggest_msg(_, _) -> <<"">>. check_config(Conf0, Opts) -> - case check_keys_is_not_readonly(Conf0, Opts) of - {ok, Conf1} -> - Conf = emqx_config:fill_defaults(Conf1), - case check_config_schema(Conf) of - ok -> {ok, Conf}; - {error, Reason} -> {error, Reason} - end; - Error -> - Error + maybe + {ok, Conf1} ?= check_keys_is_not_readonly(Conf0, Opts), + {ok, Conf2} ?= check_cluster_keys(Conf1, Opts), + Conf3 = emqx_config:fill_defaults(Conf2), + ok ?= check_config_schema(Conf3), + {ok, Conf3} + else + Error -> Error end. check_keys_is_not_readonly(Conf, Opts) -> IgnoreReadonly = maps:get(ignore_readonly, Opts, false), Keys = maps:keys(Conf), - ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], + ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_ROOT_KEYS], case lists:filter(fun(K) -> lists:member(K, Keys) end, ReadOnlyKeys) of [] -> {ok, Conf}; @@ -529,6 +555,22 @@ check_keys_is_not_readonly(Conf, Opts) -> {error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr} end. +check_cluster_keys(Conf = #{<<"cluster">> := Cluster}, Opts) -> + IgnoreReadonly = maps:get(ignore_readonly, Opts, false), + case maps:keys(Cluster) -- [<<"links">>] of + [] -> + {ok, Conf}; + Keys when IgnoreReadonly -> + ?SLOG(info, #{msg => "readonly_root_keys_ignored", keys => Keys}), + {ok, Conf#{<<"cluster">> => maps:with([<<"links">>], Cluster)}}; + Keys -> + BadKeys = [<<"cluster.", K/binary>> || K <- Keys], + BadKeysStr = lists:join(<<",">>, BadKeys), + {error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr} + end; +check_cluster_keys(Conf, _Opts) -> + {ok, Conf}. + check_config_schema(Conf) -> SchemaMod = emqx_conf:schema_module(), Fold = fun({Key, Value}, Acc) -> @@ -587,7 +629,7 @@ filter_readonly_config(Raw) -> try RawDefault = fill_defaults(Raw), _ = emqx_config:check_config(SchemaMod, RawDefault), - {ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)} + {ok, maps:without([atom_to_binary(K) || K <- ?READONLY_ROOT_KEYS], Raw)} catch throw:Error -> ?SLOG(error, #{ @@ -598,23 +640,28 @@ filter_readonly_config(Raw) -> end. reload_config(AllConf, Opts) -> - uninstall(<<"actions">>, AllConf, Opts), - uninstall(<<"sources">>, AllConf, Opts), - Fold = fun({Key, Conf}, Acc) -> - case update_config_local(Key, Conf, Opts) of - ok -> - Acc; - Error -> - ?SLOG(error, #{ - msg => "failed_to_reload_etc_config", - key => Key, - value => Conf, - error => Error - }), - [{Key, Error} | Acc] - end - end, - sorted_fold(Fold, AllConf). + case update_cluster_links(local, AllConf, Opts) of + ok -> + uninstall(<<"actions">>, AllConf, Opts), + uninstall(<<"sources">>, AllConf, Opts), + Fold = fun({Key, Conf}, Acc) -> + case update_config_local(Key, Conf, Opts) of + ok -> + Acc; + Error -> + ?SLOG(error, #{ + msg => "failed_to_reload_etc_config", + key => Key, + value => Conf, + error => Error + }), + [{Key, Error} | Acc] + end + end, + sorted_fold(Fold, AllConf); + {error, Reason} -> + {error, Reason} + end. sorted_fold(Func, Conf) -> case lists:foldl(Func, [], to_sorted_list(Conf)) of @@ -623,10 +670,11 @@ sorted_fold(Func, Conf) -> end. to_sorted_list(Conf0) -> + Conf1 = maps:remove(<<"cluster">>, Conf0), %% connectors > actions/bridges > rule_engine Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>], - {HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []), - HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)). + {HighPriorities, Conf2} = split_high_priority_conf(Keys, Conf1, []), + HighPriorities ++ lists:keysort(1, maps:to_list(Conf2)). split_high_priority_conf([], Conf0, Acc) -> {lists:reverse(Acc), Conf0}; @@ -753,7 +801,7 @@ find_running_confs() -> lists:map( fun(Node) -> Conf = emqx_conf_proto_v4:get_config(Node, []), - {Node, maps:without(?READONLY_KEYS, Conf)} + {Node, maps:without(?READONLY_ROOT_KEYS, Conf)} end, mria:running_nodes() ). diff --git a/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl b/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl index 63f6821b8..bf570f528 100644 --- a/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl @@ -21,9 +21,11 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include("emqx_conf.hrl"). + -import(emqx_config_SUITE, [prepare_conf_file/3]). +-define(READONLY_ROOT_KEYS, [rpc, node]). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -81,7 +83,11 @@ t_load_config(Config) -> Conf#{<<"sources">> => [emqx_authz_schema:default_authz()]}, emqx_conf:get_raw([Authz]) ), - ?assertEqual({error, empty_hocon_file}, emqx_conf_cli:conf(["load", "non-exist-file"])), + ?assertMatch({error, #{cause := not_a_file}}, emqx_conf_cli:conf(["load", "non-exist-file"])), + EmptyFile = "empty_file.conf", + ok = file:write_file(EmptyFile, <<>>), + ?assertMatch({error, #{cause := empty_hocon_file}}, emqx_conf_cli:conf(["load", EmptyFile])), + ok = file:delete(EmptyFile), ok. t_conflict_mix_conf(Config) -> @@ -203,7 +209,7 @@ t_load_readonly(Config) -> %% Don't update readonly key ?assertEqual(Conf, emqx_conf:get_raw([Key])) end, - ?READONLY_KEYS + ?READONLY_ROOT_KEYS ), ok. diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 066d4a45d..029f77fc1 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -309,7 +309,7 @@ t_configs_key(_Config) -> }, ReadOnlyBin = iolist_to_binary(hocon_pp:do(ReadOnlyConf, #{})), {error, ReadOnlyError} = update_configs_with_binary(ReadOnlyBin), - ?assertEqual(<<"{\"errors\":\"Cannot update read-only key 'cluster'.\"}">>, ReadOnlyError), + ?assertMatch(<<"{\"errors\":\"Cannot update read-only key 'cluster", _/binary>>, ReadOnlyError), ?assertMatch({ok, <<>>}, update_configs_with_binary(ReadOnlyBin, _IgnoreReadonly = true)), ok.