From 7b6b9580c8662f7ee530f6709f723b1670de5555 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 10 Jul 2024 10:05:28 +0800 Subject: [PATCH] test: add test for updating cluster.links --- .../test/emqx_cluster_link_config_SUITE.erl | 90 ++++++++++++++++--- apps/emqx_conf/src/emqx_conf_cli.erl | 4 +- 2 files changed, 81 insertions(+), 13 deletions(-) diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl index 97e62402c..7e7a74142 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl @@ -56,9 +56,28 @@ mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) -> ), {NodesA, NodesB}. +t_config_update_cli('init', Config0) -> + Config = + [ + {update_from, cli}, + {name_prefix, ?FUNCTION_NAME} + | lists:keydelete(update_from, 1, Config0) + ], + t_config_update('init', Config); +t_config_update_cli('end', Config) -> + t_config_update('end', Config). + +t_config_update_cli(Config) -> + t_config_update(Config). + t_config_update('init', Config) -> - NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]), - NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]), + NamePrefix = + case ?config(name_prefix, Config) of + undefined -> ?FUNCTION_NAME; + Name -> Name + end, + NameA = fmt("~s_~s", [NamePrefix, "a"]), + NameB = fmt("~s_~s", [NamePrefix, "b"]), LPortA = 31883, LPortB = 41883, ConfA = combine([conf_cluster(NameA), conf_log()]), @@ -73,7 +92,8 @@ t_config_update('init', Config) -> {lport_a, LPortA}, {lport_b, LPortB}, {name_a, NameA}, - {name_b, NameB} + {name_b, NameB}, + {update_from, api} | Config ]; t_config_update('end', Config) -> @@ -113,12 +133,12 @@ t_config_update(Config) -> {ok, SubRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := clink_route_bootstrap_complete}), - %% 5 nodes = 5 actors (durable storage is dsabled) + %% 5 nodes = 5 actors (durable storage is disabled) 5, 30_000 ), - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])), - ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])), + ?assertMatch({ok, _}, update(NodeA1, [LinkConfA], Config)), + ?assertMatch({ok, _}, update(NodeB1, [LinkConfB], Config)), ?assertMatch( {ok, [ @@ -158,7 +178,7 @@ t_config_update(Config) -> %% update link LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]}, - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])), + ?assertMatch({ok, _}, update(NodeA1, [LinkConfA1], Config)), ?assertMatch( {ok, [ @@ -187,20 +207,21 @@ t_config_update(Config) -> %% disable link LinkConfA2 = LinkConfA1#{<<"enable">> => false}, - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA2]])), - %% must be already blocked by the receiving cluster even if externak routing state is not + ?assertMatch({ok, _}, update(NodeA1, [LinkConfA2], Config)), + %% must be already blocked by the receiving cluster even if external routing state is not %% updated yet {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1), LinkConfB1 = LinkConfB#{<<"enable">> => false}, - ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB1]])), + ?assertMatch({ok, _}, update(NodeB1, [LinkConfB1], Config)), {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1), ?assertNotReceive({publish, _Message = #{}}, 3000), %% delete links - ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), - ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[]])), + ?assertMatch({ok, _}, update(NodeA1, [], Config)), + ?assertMatch({ok, _}, update(NodeB1, [], Config)), + validate_update_cli_failed(NodeA1, Config), ok = emqtt:stop(ClientA), ok = emqtt:stop(ClientB). @@ -645,3 +666,48 @@ fmt(Fmt, Args) -> mk_nodename(BaseName, Idx) -> binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])). + +validate_update_cli_failed(Node, Config) -> + case ?config(update_from, Config) of + api -> + ok; + cli -> + ConfBin = hocon_pp:do( + #{ + <<"cluster">> => #{ + <<"links">> => [], + <<"autoclean">> => <<"12h">> + } + }, + #{} + ), + ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config), + ?assertMatch( + {error, <<"Cannot update read-only key 'cluster.autoheal'.">>}, + erpc:call(Node, emqx_cluster_cli, conf, [["load", ConfFile]]) + ) + end. + +update(Node, Links, Config) -> + case ?config(update_from, Config) of + api -> update_links_from_api(Node, Links, Config); + cli -> update_links_from_cli(Node, Links, Config) + end. + +update_links_from_api(Node, Links, _Config) -> + erpc:call(Node, emqx_cluster_link_config, update, [Links]). + +update_links_from_cli(Node, Links, Config) -> + ConfBin = hocon_pp:do(#{<<"cluster">> => #{<<"links">> => Links}}, #{}), + ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config), + erpc:call(Node, emqx_cluster_cli, conf, [["load", ConfFile]]). + +prepare_conf_file(Name, Content, CTConfig) -> + Filename = tc_conf_file(Name, CTConfig), + filelib:ensure_dir(Filename), + ok = file:write_file(Filename, Content), + Filename. + +tc_conf_file(TC, Config) -> + DataDir = ?config(data_dir, Config), + filename:join([DataDir, TC, 'emqx.conf']). diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index b1db1fff0..697685f63 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -564,7 +564,9 @@ check_cluster_keys(Conf = #{<<"cluster">> := Cluster}, Opts) -> BadKeys = ["cluster." ++ K || K <- Keys], BadKeysStr = lists:join(<<",">>, BadKeys), {error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr} - end. + end; +check_cluster_keys(Conf, _Opts) -> + {ok, Conf}. check_config_schema(Conf) -> SchemaMod = emqx_conf:schema_module(),