Merge pull request #11053 from fix/EMQX-10231/cluster-view

fix(ft-fs): use `emqx:running_nodes()` as default cluster view
This commit is contained in:
Andrew Mayorov 2023-06-17 00:54:24 +02:00 committed by GitHub
commit 13d9f5c3e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 63 additions and 42 deletions

View File

@ -654,10 +654,13 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
%% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2
env_handler => fun((AppName :: atom()) -> term()),
%% Application env preset before calling `emqx_common_test_helpers:start_apps/2`
env => {AppName :: atom(), Key :: atom(), Val :: term()},
env => [{AppName :: atom(), Key :: atom(), Val :: term()}],
%% Whether to execute `emqx_config:init_load(SchemaMod)`
%% default: true
load_schema => boolean(),
%% Which node in the cluster to join to.
%% default: first core node
join_to => node(),
%% If we want to exercise the scenario where a node joins an
%% existing cluster where there has already been some
%% configuration changes (via cluster rpc), then we need to enable
@ -692,28 +695,38 @@ emqx_cluster(Specs0, CommonOpts) ->
]),
%% Set the default node of the cluster:
CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
JoinTo0 =
JoinTo =
case CoreNodes of
[First | _] -> First;
_ -> undefined
end,
JoinTo =
case maps:find(join_to, CommonOpts) of
{ok, true} -> JoinTo0;
{ok, JT} -> JT;
error -> JoinTo0
end,
[
{Name,
merge_opts(Opts, #{
NodeOpts = fun(Number) ->
#{
base_port => base_port(Number),
join_to => JoinTo,
env => [
{mria, core_nodes, CoreNodes},
{mria, node_role, Role},
{gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
]
})}
}
end,
RoleOpts = fun
(core) ->
#{
join_to => JoinTo,
env => [
{mria, node_role, core}
]
};
(replicant) ->
#{
env => [
{mria, node_role, replicant},
{ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}}
]
}
end,
[
{Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)}
|| {{Role, Name, Opts}, Number} <- Specs
].

View File

@ -140,7 +140,7 @@ mk_cluster_specs(Config, Opts) ->
{core, emqx_bridge_api_SUITE1, #{}},
{core, emqx_bridge_api_SUITE2, #{}}
],
CommonOpts = #{
CommonOpts = Opts#{
env => [{emqx, boot_modules, [broker]}],
apps => [],
% NOTE
@ -157,7 +157,6 @@ mk_cluster_specs(Config, Opts) ->
load_apps => ?SUITE_APPS ++ [emqx_dashboard],
env_handler => fun load_suite_config/1,
load_schema => false,
join_to => maps:get(join_to, Opts, true),
priv_data_dir => ?config(priv_dir, Config)
},
emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts).

View File

@ -247,7 +247,6 @@ cluster(Specs, Config) ->
{env, Env},
{apps, [emqx_conf]},
{load_schema, false},
{join_to, true},
{priv_data_dir, PrivDataDir},
{env_handler, fun
(emqx) ->

View File

@ -96,7 +96,7 @@ handle_event(
complete ->
{next_state, start_assembling, NSt, ?internal([])};
{incomplete, _} ->
Nodes = mria_mnesia:running_nodes() -- [node()],
Nodes = emqx:running_nodes() -- [node()],
{next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])};
% TODO: recovery?
{error, _} = Error ->

View File

@ -361,7 +361,7 @@ list(_Options, Query) ->
end.
list(QueryIn) ->
{Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(mria_mnesia:running_nodes())),
{Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(emqx:running_nodes())),
list_nodes(NodeQuery, Nodes, #{items => []}).
list_nodes(Query, Nodes = [Node | Rest], Acc) ->

View File

@ -24,6 +24,8 @@
-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
-define(SUITE_APPS, [emqx_conf, emqx_ft]).
all() ->
[
{group, single},
@ -49,10 +51,9 @@ end_per_suite(_Config) ->
init_per_group(Group = cluster, Config) ->
Cluster = mk_cluster_specs(Config),
ct:pal("Starting ~p", [Cluster]),
Nodes = [
emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()})
|| {Name, Opts} <- Cluster
],
Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
InitResult = erpc:multicall(Nodes, fun() -> init_node(Config) end),
[] = [{Node, Error} || {Node, {R, Error}} <- lists:zip(Nodes, InitResult), R /= ok],
[{group, Group}, {cluster_nodes, Nodes} | Config];
init_per_group(Group, Config) ->
[{group, Group} | Config].
@ -65,22 +66,29 @@ end_per_group(cluster, Config) ->
end_per_group(_Group, _Config) ->
ok.
mk_cluster_specs(Config) ->
mk_cluster_specs(_Config) ->
Specs = [
{core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}},
{core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}}
{core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}},
{replicant, emqx_ft_api_SUITE3, #{listener_ports => [{tcp, 4883}]}}
],
CommOpts = [
{env, [{emqx, boot_modules, [broker, listeners]}]},
{apps, [emqx_ft]},
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
{env_handler, emqx_ft_test_helpers:env_handler(Config)}
CommOpts = #{
env => [
{mria, db_backend, rlog},
{emqx, boot_modules, [broker, listeners]}
],
apps => [],
load_apps => ?SUITE_APPS,
conf => [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]
},
emqx_common_test_helpers:emqx_cluster(
Specs,
CommOpts
).
init_node(Config) ->
ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, emqx_ft_test_helpers:env_handler(Config)).
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(t_ft_disabled, _Config) ->
@ -96,7 +104,7 @@ t_list_files(Config) ->
ClientId = client_id(Config),
FileId = <<"f1">>,
Node = lists:last(cluster(Config)),
Node = lists:last(test_nodes(Config)),
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
{ok, 200, #{<<"files">> := Files}} =
@ -124,7 +132,7 @@ t_download_transfer(Config) ->
ClientId = client_id(Config),
FileId = <<"f1">>,
Node = lists:last(cluster(Config)),
Node = lists:last(test_nodes(Config)),
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
?assertMatch(
@ -184,7 +192,7 @@ t_download_transfer(Config) ->
t_list_files_paging(Config) ->
ClientId = client_id(Config),
NFiles = 20,
Nodes = cluster(Config),
Nodes = test_nodes(Config),
Uploads = [
{mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)}
|| N <- lists:seq(1, NFiles)
@ -280,8 +288,13 @@ t_ft_disabled(_Config) ->
%% Helpers
%%--------------------------------------------------------------------
cluster(Config) ->
[node() | proplists:get_value(cluster_nodes, Config, [])].
test_nodes(Config) ->
case proplists:get_value(cluster_nodes, Config, []) of
[] ->
[node()];
Nodes ->
Nodes
end.
client_id(Config) ->
iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])).

View File

@ -36,7 +36,7 @@ start_additional_node(Config, Name) ->
).
stop_additional_node(Node) ->
ok = rpc:call(Node, ekka, leave, []),
_ = rpc:call(Node, ekka, leave, []),
ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]),
ok = emqx_common_test_helpers:stop_slave(Node),
ok.

View File

@ -295,7 +295,6 @@ cluster(Specs) ->
{env, Env},
{apps, [emqx_conf]},
{load_schema, false},
{join_to, true},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, []),

View File

@ -159,7 +159,6 @@ cluster(Specs) ->
{env, Env},
{apps, [emqx_conf, emqx_management]},
{load_schema, false},
{join_to, true},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, []),

View File

@ -444,7 +444,6 @@ cluster(Config) ->
env => [{mria, db_backend, rlog}],
load_schema => true,
start_autocluster => true,
join_to => true,
listener_ports => [],
conf => [{[dashboard, listeners, http, bind], 0}],
env_handler =>