test: add test for updating cluster.links
This commit is contained in:
parent
2783192f77
commit
7b6b9580c8
|
@ -56,9 +56,28 @@ mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) ->
|
||||||
),
|
),
|
||||||
{NodesA, NodesB}.
|
{NodesA, NodesB}.
|
||||||
|
|
||||||
|
t_config_update_cli('init', Config0) ->
|
||||||
|
Config =
|
||||||
|
[
|
||||||
|
{update_from, cli},
|
||||||
|
{name_prefix, ?FUNCTION_NAME}
|
||||||
|
| lists:keydelete(update_from, 1, Config0)
|
||||||
|
],
|
||||||
|
t_config_update('init', Config);
|
||||||
|
t_config_update_cli('end', Config) ->
|
||||||
|
t_config_update('end', Config).
|
||||||
|
|
||||||
|
t_config_update_cli(Config) ->
|
||||||
|
t_config_update(Config).
|
||||||
|
|
||||||
t_config_update('init', Config) ->
|
t_config_update('init', Config) ->
|
||||||
NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
|
NamePrefix =
|
||||||
NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
|
case ?config(name_prefix, Config) of
|
||||||
|
undefined -> ?FUNCTION_NAME;
|
||||||
|
Name -> Name
|
||||||
|
end,
|
||||||
|
NameA = fmt("~s_~s", [NamePrefix, "a"]),
|
||||||
|
NameB = fmt("~s_~s", [NamePrefix, "b"]),
|
||||||
LPortA = 31883,
|
LPortA = 31883,
|
||||||
LPortB = 41883,
|
LPortB = 41883,
|
||||||
ConfA = combine([conf_cluster(NameA), conf_log()]),
|
ConfA = combine([conf_cluster(NameA), conf_log()]),
|
||||||
|
@ -73,7 +92,8 @@ t_config_update('init', Config) ->
|
||||||
{lport_a, LPortA},
|
{lport_a, LPortA},
|
||||||
{lport_b, LPortB},
|
{lport_b, LPortB},
|
||||||
{name_a, NameA},
|
{name_a, NameA},
|
||||||
{name_b, NameB}
|
{name_b, NameB},
|
||||||
|
{update_from, api}
|
||||||
| Config
|
| Config
|
||||||
];
|
];
|
||||||
t_config_update('end', Config) ->
|
t_config_update('end', Config) ->
|
||||||
|
@ -113,12 +133,12 @@ t_config_update(Config) ->
|
||||||
|
|
||||||
{ok, SubRef} = snabbkaffe:subscribe(
|
{ok, SubRef} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
|
?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
|
||||||
%% 5 nodes = 5 actors (durable storage is dsabled)
|
%% 5 nodes = 5 actors (durable storage is disabled)
|
||||||
5,
|
5,
|
||||||
30_000
|
30_000
|
||||||
),
|
),
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])),
|
?assertMatch({ok, _}, update(NodeA1, [LinkConfA], Config)),
|
||||||
?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])),
|
?assertMatch({ok, _}, update(NodeB1, [LinkConfB], Config)),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, [
|
{ok, [
|
||||||
|
@ -158,7 +178,7 @@ t_config_update(Config) ->
|
||||||
|
|
||||||
%% update link
|
%% update link
|
||||||
LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
|
LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])),
|
?assertMatch({ok, _}, update(NodeA1, [LinkConfA1], Config)),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, [
|
{ok, [
|
||||||
|
@ -187,20 +207,21 @@ t_config_update(Config) ->
|
||||||
|
|
||||||
%% disable link
|
%% disable link
|
||||||
LinkConfA2 = LinkConfA1#{<<"enable">> => false},
|
LinkConfA2 = LinkConfA1#{<<"enable">> => false},
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA2]])),
|
?assertMatch({ok, _}, update(NodeA1, [LinkConfA2], Config)),
|
||||||
%% must be already blocked by the receiving cluster even if externak routing state is not
|
%% must be already blocked by the receiving cluster even if external routing state is not
|
||||||
%% updated yet
|
%% updated yet
|
||||||
{ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1),
|
{ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1),
|
||||||
|
|
||||||
LinkConfB1 = LinkConfB#{<<"enable">> => false},
|
LinkConfB1 = LinkConfB#{<<"enable">> => false},
|
||||||
?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB1]])),
|
?assertMatch({ok, _}, update(NodeB1, [LinkConfB1], Config)),
|
||||||
{ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1),
|
{ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1),
|
||||||
|
|
||||||
?assertNotReceive({publish, _Message = #{}}, 3000),
|
?assertNotReceive({publish, _Message = #{}}, 3000),
|
||||||
|
|
||||||
%% delete links
|
%% delete links
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])),
|
?assertMatch({ok, _}, update(NodeA1, [], Config)),
|
||||||
?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[]])),
|
?assertMatch({ok, _}, update(NodeB1, [], Config)),
|
||||||
|
validate_update_cli_failed(NodeA1, Config),
|
||||||
|
|
||||||
ok = emqtt:stop(ClientA),
|
ok = emqtt:stop(ClientA),
|
||||||
ok = emqtt:stop(ClientB).
|
ok = emqtt:stop(ClientB).
|
||||||
|
@ -645,3 +666,48 @@ fmt(Fmt, Args) ->
|
||||||
|
|
||||||
mk_nodename(BaseName, Idx) ->
|
mk_nodename(BaseName, Idx) ->
|
||||||
binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])).
|
binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])).
|
||||||
|
|
||||||
|
validate_update_cli_failed(Node, Config) ->
|
||||||
|
case ?config(update_from, Config) of
|
||||||
|
api ->
|
||||||
|
ok;
|
||||||
|
cli ->
|
||||||
|
ConfBin = hocon_pp:do(
|
||||||
|
#{
|
||||||
|
<<"cluster">> => #{
|
||||||
|
<<"links">> => [],
|
||||||
|
<<"autoclean">> => <<"12h">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
#{}
|
||||||
|
),
|
||||||
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
||||||
|
?assertMatch(
|
||||||
|
{error, <<"Cannot update read-only key 'cluster.autoheal'.">>},
|
||||||
|
erpc:call(Node, emqx_cluster_cli, conf, [["load", ConfFile]])
|
||||||
|
)
|
||||||
|
end.
|
||||||
|
|
||||||
|
update(Node, Links, Config) ->
|
||||||
|
case ?config(update_from, Config) of
|
||||||
|
api -> update_links_from_api(Node, Links, Config);
|
||||||
|
cli -> update_links_from_cli(Node, Links, Config)
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_links_from_api(Node, Links, _Config) ->
|
||||||
|
erpc:call(Node, emqx_cluster_link_config, update, [Links]).
|
||||||
|
|
||||||
|
update_links_from_cli(Node, Links, Config) ->
|
||||||
|
ConfBin = hocon_pp:do(#{<<"cluster">> => #{<<"links">> => Links}}, #{}),
|
||||||
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
||||||
|
erpc:call(Node, emqx_cluster_cli, conf, [["load", ConfFile]]).
|
||||||
|
|
||||||
|
prepare_conf_file(Name, Content, CTConfig) ->
|
||||||
|
Filename = tc_conf_file(Name, CTConfig),
|
||||||
|
filelib:ensure_dir(Filename),
|
||||||
|
ok = file:write_file(Filename, Content),
|
||||||
|
Filename.
|
||||||
|
|
||||||
|
tc_conf_file(TC, Config) ->
|
||||||
|
DataDir = ?config(data_dir, Config),
|
||||||
|
filename:join([DataDir, TC, 'emqx.conf']).
|
||||||
|
|
|
@ -564,7 +564,9 @@ check_cluster_keys(Conf = #{<<"cluster">> := Cluster}, Opts) ->
|
||||||
BadKeys = ["cluster." ++ K || K <- Keys],
|
BadKeys = ["cluster." ++ K || K <- Keys],
|
||||||
BadKeysStr = lists:join(<<",">>, BadKeys),
|
BadKeysStr = lists:join(<<",">>, BadKeys),
|
||||||
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
||||||
end.
|
end;
|
||||||
|
check_cluster_keys(Conf, _Opts) ->
|
||||||
|
{ok, Conf}.
|
||||||
|
|
||||||
check_config_schema(Conf) ->
|
check_config_schema(Conf) ->
|
||||||
SchemaMod = emqx_conf:schema_module(),
|
SchemaMod = emqx_conf:schema_module(),
|
||||||
|
|
Loading…
Reference in New Issue