diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index f195e083c..2d51f6f14 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -519,21 +519,51 @@ ensure_quic_listener(Name, UdpPort) -> %% Clusterisation and multi-node testing %% +-type cluster_spec() :: [node_spec()]. +-type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}. +-type role() :: core | replicant. +-type shortname() :: atom(). +-type nodename() :: atom(). +-type node_opts() :: #{ + %% Need to loaded apps. These apps will be loaded once the node started + load_apps => list(), + %% Need to started apps. It is the first arg passed to emqx_common_test_helpers:start_apps/2 + apps => list(), + %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2 + env_handler => fun((AppName :: atom()) -> term()), + %% Application env preset before calling `emqx_common_test_helpers:start_apps/2` + env => {AppName :: atom(), Key :: atom(), Val :: term()}, + %% Whether to execute `emqx_config:init_load(SchemaMod)` + %% default: true + load_schema => boolean(), + %% Eval by emqx_config:put/2 + conf => [{KeyPath :: list(), Val :: term()}], + %% Fast option to config listener port + %% default rule: + %% - tcp: base_port + %% - ssl: base_port + 1 + %% - ws : base_port + 3 + %% - wss: base_port + 4 + listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}] +}. + +-spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}]. emqx_cluster(Specs) -> emqx_cluster(Specs, #{}). +-spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}]. emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) -> emqx_cluster(Specs, maps:from_list(CommonOpts)); emqx_cluster(Specs0, CommonOpts) -> Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))), Specs = expand_node_specs(Specs1, CommonOpts), - CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], - %% Assign grpc ports: + %% Assign grpc ports GenRpcPorts = maps:from_list([ {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}} || {{_, Name, _}, Num} <- Specs ]), %% Set the default node of the cluster: + CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], JoinTo = case CoreNodes of [First | _] -> First; @@ -554,6 +584,8 @@ emqx_cluster(Specs0, CommonOpts) -> ]. %% Lower level starting API + +-spec start_slave(shortname(), node_opts()) -> nodename(). start_slave(Name, Opts) -> {ok, Node} = ct_slave:start( list_to_atom(atom_to_list(Name) ++ "@" ++ host()), @@ -590,6 +622,7 @@ epmd_path() -> %% Node initialization +-spec setup_node(nodename(), node_opts()) -> ok. setup_node(Node, Opts) when is_list(Opts) -> setup_node(Node, maps:from_list(Opts)); setup_node(Node, Opts) when is_map(Opts) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 9f757b0cc..9646eb747 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -314,7 +314,9 @@ do_select( ?FRESH_SELECT -> ets:select(Tab, Ms, Limit); _ -> - ets:select(Continuation) + %% XXX: Repair is necessary because we pass Continuation back + %% and forth through the nodes in the `do_cluster_query` + ets:select(ets:repair_continuation(Continuation, Ms)) end, case Result of '$end_of_table' -> @@ -508,7 +510,7 @@ format_query_result( %% queries that can be read meta => Meta#{count => Total}, data => lists:flatten( - lists:foldr( + lists:foldl( fun({Node, Rows}, Acc) -> [lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc] end, diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index c9abef375..a065b9c83 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -28,15 +28,116 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). + ok. %%-------------------------------------------------------------------- %% cases %%-------------------------------------------------------------------- t_cluster_query(_Config) -> + net_kernel:start(['master@127.0.0.1', longnames]), + ct:timetrap({seconds, 120}), + snabbkaffe:fix_ct_logging(), + [{Name, Opts}, {Name1, Opts1}] = cluster_specs(), + Node1 = emqx_common_test_helpers:start_slave(Name, Opts), + Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1), + try + process_flag(trap_exit, true), + ClientLs1 = [start_emqtt_client(Node1, I, 2883) || I <- lists:seq(1, 10)], + ClientLs2 = [start_emqtt_client(Node2, I, 3883) || I <- lists:seq(1, 10)], + + %% returned list should be the same regardless of which node is requested + {200, ClientsAll} = query_clients(Node1, #{}), + ?assertEqual({200, ClientsAll}, query_clients(Node2, #{})), + ?assertMatch( + #{page := 1, limit := 100, count := 20}, + maps:get(meta, ClientsAll) + ), + ?assertMatch(20, length(maps:get(data, ClientsAll))), + %% query the first page, counting in entire cluster + {200, ClientsPage1} = query_clients(Node1, #{<<"limit">> => 5}), + ?assertMatch( + #{page := 1, limit := 5, count := 20}, + maps:get(meta, ClientsPage1) + ), + ?assertMatch(5, length(maps:get(data, ClientsPage1))), + + %% assert: AllPage = Page1 + Page2 + Page3 + Page4 + %% !!!Note: this equation requires that the queried tables must be ordered_set + {200, ClientsPage2} = query_clients(Node1, #{<<"page">> => 2, <<"limit">> => 5}), + {200, ClientsPage3} = query_clients(Node2, #{<<"page">> => 3, <<"limit">> => 5}), + {200, ClientsPage4} = query_clients(Node1, #{<<"page">> => 4, <<"limit">> => 5}), + GetClientIds = fun(L) -> lists:map(fun(#{clientid := Id}) -> Id end, L) end, + ?assertEqual( + GetClientIds(maps:get(data, ClientsAll)), + GetClientIds( + maps:get(data, ClientsPage1) ++ maps:get(data, ClientsPage2) ++ + maps:get(data, ClientsPage3) ++ maps:get(data, ClientsPage4) + ) + ), + + %% exact match can return non-zero total + {200, ClientsNode1} = query_clients(Node2, #{<<"username">> => <<"corenode1@127.0.0.1">>}), + ?assertMatch( + #{count := 10}, + maps:get(meta, ClientsNode1) + ), + + %% fuzzy searching can't return total + {200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}), + ?assertMatch( + #{count := 0}, + maps:get(meta, ClientsNode2) + ), + ?assertMatch(10, length(maps:get(data, ClientsNode2))), + + _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), + _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2) + after + emqx_common_test_helpers:stop_slave(Node1), + emqx_common_test_helpers:stop_slave(Node2) + end, ok. + +%%-------------------------------------------------------------------- +%% helpers +%%-------------------------------------------------------------------- + +cluster_specs() -> + Specs = + %% default listeners port + [ + {core, corenode1, #{listener_ports => [{tcp, 2883}]}}, + {core, corenode2, #{listener_ports => [{tcp, 3883}]}} + ], + CommOpts = + [ + {env, [{emqx, boot_modules, all}]}, + {apps, []}, + {conf, [ + {[listeners, ssl, default, enabled], false}, + {[listeners, ws, default, enabled], false}, + {[listeners, wss, default, enabled], false} + ]} + ], + emqx_common_test_helpers:emqx_cluster( + Specs, + CommOpts + ). + +start_emqtt_client(Node0, N, Port) -> + Node = atom_to_binary(Node0), + ClientId = iolist_to_binary([Node, "-", integer_to_binary(N)]), + {ok, C} = emqtt:start_link([{clientid, ClientId}, {username, Node}, {port, Port}]), + {ok, _} = emqtt:connect(C), + C. + +query_clients(Node, Qs0) -> + Qs = maps:merge( + #{<<"page">> => 1, <<"limit">> => 100}, + Qs0 + ), + rpc:call(Node, emqx_mgmt_api_clients, clients, [get, #{query_string => Qs}]).