349 lines
14 KiB
Erlang
349 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_mgmt_cli_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
|
|
ok = emqx_mgmt_cli:load(),
|
|
Config.
|
|
|
|
end_per_suite(_) ->
|
|
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
|
|
|
|
init_per_testcase(t_autocluster_leave = TC, Config) ->
|
|
[Core1, Core2, Core3, Repl] =
|
|
Nodes = [
|
|
t_autocluster_leave_core1,
|
|
t_autocluster_leave_core2,
|
|
t_autocluster_leave_core3,
|
|
t_autocluster_leave_replicant
|
|
],
|
|
|
|
NodeNames = [emqx_cth_cluster:node_name(N) || N <- Nodes],
|
|
AppSpec = [
|
|
emqx,
|
|
{emqx_conf, #{
|
|
config => #{
|
|
cluster => #{
|
|
discovery_strategy => static,
|
|
static => #{seeds => NodeNames}
|
|
}
|
|
}
|
|
}},
|
|
emqx_management
|
|
],
|
|
Cluster = emqx_cth_cluster:start(
|
|
[
|
|
{Core1, #{role => core, apps => AppSpec}},
|
|
{Core2, #{role => core, apps => AppSpec}},
|
|
{Core3, #{role => core, apps => AppSpec}},
|
|
{Repl, #{role => replicant, apps => AppSpec}}
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
|
|
),
|
|
[{cluster, Cluster} | Config];
|
|
init_per_testcase(_TC, Config) ->
|
|
Config.
|
|
|
|
end_per_testcase(_TC, Config) ->
|
|
case ?config(cluster, Config) of
|
|
undefined -> ok;
|
|
Cluster -> emqx_cth_cluster:stop(Cluster)
|
|
end.
|
|
|
|
t_status(_Config) ->
|
|
emqx_ctl:run_command([]),
|
|
emqx_ctl:run_command(["status"]),
|
|
ok.
|
|
|
|
t_broker(_Config) ->
|
|
%% broker # Show broker version, uptime and description
|
|
emqx_ctl:run_command(["broker"]),
|
|
%% broker stats # Show broker statistics of clients, topics, subscribers
|
|
emqx_ctl:run_command(["broker", "stats"]),
|
|
%% broker metrics # Show broker metrics
|
|
emqx_ctl:run_command(["broker", "metrics"]),
|
|
ok.
|
|
|
|
t_cluster(_Config) ->
|
|
SelfNode = node(),
|
|
FakeNode = 'fake@127.0.0.1',
|
|
MFA = {io, format, [""]},
|
|
meck:new(mria_mnesia, [non_strict, passthrough, no_link]),
|
|
meck:expect(mria_mnesia, running_nodes, 0, [SelfNode, FakeNode]),
|
|
{atomic, {ok, TnxId, _}} =
|
|
mria:transaction(
|
|
emqx_cluster_rpc_shard,
|
|
fun emqx_cluster_rpc:init_mfa/2,
|
|
[SelfNode, MFA]
|
|
),
|
|
emqx_cluster_rpc:maybe_init_tnx_id(FakeNode, TnxId),
|
|
?assertMatch(
|
|
{atomic, [
|
|
#{
|
|
node := SelfNode,
|
|
mfa := MFA,
|
|
created_at := _,
|
|
tnx_id := TnxId,
|
|
initiator := SelfNode
|
|
},
|
|
#{
|
|
node := FakeNode,
|
|
mfa := MFA,
|
|
created_at := _,
|
|
tnx_id := TnxId,
|
|
initiator := SelfNode
|
|
}
|
|
]},
|
|
emqx_cluster_rpc:status()
|
|
),
|
|
%% cluster join <Node> # Join the cluster
|
|
%% cluster leave # Leave the cluster
|
|
%% cluster force-leave <Node> # Force the node leave from cluster
|
|
%% cluster status # Cluster status
|
|
emqx_ctl:run_command(["cluster", "status"]),
|
|
|
|
emqx_ctl:run_command(["cluster", "force-leave", atom_to_list(FakeNode)]),
|
|
?assertMatch(
|
|
{atomic, [
|
|
#{
|
|
node := SelfNode,
|
|
mfa := MFA,
|
|
created_at := _,
|
|
tnx_id := TnxId,
|
|
initiator := SelfNode
|
|
}
|
|
]},
|
|
emqx_cluster_rpc:status()
|
|
),
|
|
meck:unload(mria_mnesia),
|
|
ok.
|
|
|
|
t_clients(_Config) ->
|
|
%% clients list # List all clients
|
|
emqx_ctl:run_command(["clients", "list"]),
|
|
%% clients show <ClientId> # Show a client
|
|
%% clients kick <ClientId> # Kick out a client
|
|
ok.
|
|
|
|
t_routes(_Config) ->
|
|
%% routes list # List all routes
|
|
emqx_ctl:run_command(["routes", "list"]),
|
|
%% routes show <Topic> # Show a route
|
|
ok.
|
|
|
|
t_subscriptions(_Config) ->
|
|
%% subscriptions list # List all subscriptions
|
|
emqx_ctl:run_command(["subscriptions", "list"]),
|
|
%% subscriptions show <ClientId> # Show subscriptions of a client
|
|
%% subscriptions add <ClientId> <Topic> <QoS> # Add a static subscription manually
|
|
%% subscriptions del <ClientId> <Topic> # Delete a static subscription manually
|
|
ok.
|
|
|
|
t_plugins(_Config) ->
|
|
%% plugins <command> [Name-Vsn] # e.g. 'start emqx_plugin_template-5.0-rc.1'
|
|
%% plugins list # List all installed plugins
|
|
emqx_ctl:run_command(["plugins", "list"]),
|
|
%% plugins describe Name-Vsn # Describe an installed plugins
|
|
%% plugins install Name-Vsn # Install a plugin package placed
|
|
%% # in plugin'sinstall_dir
|
|
%% plugins uninstall Name-Vsn # Uninstall a plugin. NOTE: it deletes
|
|
%% # all files in install_dir/Name-Vsn
|
|
%% plugins start Name-Vsn # Start a plugin
|
|
%% plugins stop Name-Vsn # Stop a plugin
|
|
%% plugins restart Name-Vsn # Stop then start a plugin
|
|
%% plugins disable Name-Vsn # Disable auto-boot
|
|
%% plugins enable Name-Vsn [Position] # Enable auto-boot at Position in the boot list, where Position could be
|
|
%% # 'front', 'rear', or 'before Other-Vsn' to specify a relative position.
|
|
%% # The Position parameter can be used to adjust the boot order.
|
|
%% # If no Position is given, an already configured plugin
|
|
%% # will stay at is old position; a newly plugin is appended to the rear
|
|
%% # e.g. plugins disable foo-0.1.0 front
|
|
%% # plugins enable bar-0.2.0 before foo-0.1.0
|
|
ok.
|
|
|
|
t_vm(_Config) ->
|
|
%% vm all # Show info of Erlang VM
|
|
emqx_ctl:run_command(["vm", "all"]),
|
|
%% vm load # Show load of Erlang VM
|
|
emqx_ctl:run_command(["vm", "load"]),
|
|
%% vm memory # Show memory of Erlang VM
|
|
emqx_ctl:run_command(["vm", "memory"]),
|
|
%% vm process # Show process of Erlang VM
|
|
emqx_ctl:run_command(["vm", "process"]),
|
|
%% vm io # Show IO of Erlang VM
|
|
emqx_ctl:run_command(["vm", "io"]),
|
|
%% vm ports # Show Ports of Erlang VM
|
|
emqx_ctl:run_command(["vm", "ports"]),
|
|
ok.
|
|
|
|
t_mnesia(_Config) ->
|
|
%% mnesia # Mnesia system info
|
|
emqx_ctl:run_command(["mnesia"]),
|
|
ok.
|
|
|
|
t_log(_Config) ->
|
|
%% log set-level <Level> # Set the overall log level
|
|
%% log primary-level # Show the primary log level now
|
|
emqx_ctl:run_command(["log", "primary-level"]),
|
|
%% log primary-level <Level> # Set the primary log level
|
|
%% log handlers list # Show log handlers
|
|
emqx_ctl:run_command(["log", "handlers", "list"]),
|
|
%% log handlers start <HandlerId> # Start a log handler
|
|
%% log handlers stop <HandlerId> # Stop a log handler
|
|
%% log handlers set-level <HandlerId> <Level> # Set log level of a log handler
|
|
ok.
|
|
|
|
t_trace(_Config) ->
|
|
%% trace list # List all traces started on local node
|
|
emqx_ctl:run_command(["trace", "list"]),
|
|
%% trace start client <ClientId> <File> [<Level>] # Traces for a client on local node
|
|
%% trace stop client <ClientId> # Stop tracing for a client on local node
|
|
%% trace start topic <Topic> <File> [<Level>] # Traces for a topic on local node
|
|
%% trace stop topic <Topic> # Stop tracing for a topic on local node
|
|
%% trace start ip_address <IP> <File> [<Level>] # Traces for a client ip on local node
|
|
%% trace stop ip_addresss <IP> # Stop tracing for a client ip on local node
|
|
ok.
|
|
|
|
t_traces(_Config) ->
|
|
%% traces list # List all cluster traces started
|
|
emqx_ctl:run_command(["traces", "list"]),
|
|
%% traces start <Name> client <ClientId> # Traces for a client in cluster
|
|
%% traces start <Name> topic <Topic> # Traces for a topic in cluster
|
|
%% traces start <Name> ip_address <IPAddr> # Traces for a IP in cluster
|
|
%% traces stop <Name> # Stop trace in cluster
|
|
%% traces delete <Name> # Delete trace in cluster
|
|
ok.
|
|
|
|
t_traces_client(_Config) ->
|
|
TraceC = "TraceNameClientID",
|
|
emqx_ctl:run_command(["traces", "start", TraceC, "client", "ClientID"]),
|
|
emqx_ctl:run_command(["traces", "stop", TraceC]),
|
|
emqx_ctl:run_command(["traces", "delete", TraceC]).
|
|
|
|
t_traces_client_with_duration(_Config) ->
|
|
TraceC = "TraceNameClientID",
|
|
Duration = "1000",
|
|
emqx_ctl:run_command(["traces", "start", TraceC, "client", "ClientID", Duration]),
|
|
emqx_ctl:run_command(["traces", "stop", TraceC]),
|
|
emqx_ctl:run_command(["traces", "delete", TraceC]).
|
|
|
|
t_traces_topic(_Config) ->
|
|
TraceT = "TraceNameTopic",
|
|
emqx_ctl:run_command(["traces", "start", TraceT, "topic", "a/b"]),
|
|
emqx_ctl:run_command(["traces", "stop", TraceT]),
|
|
emqx_ctl:run_command(["traces", "delete", TraceT]).
|
|
|
|
t_traces_ip(_Config) ->
|
|
TraceI = "TraceNameIP",
|
|
emqx_ctl:run_command(["traces", "start", TraceI, "ip_address", "127.0.0.1"]),
|
|
emqx_ctl:run_command(["traces", "stop", TraceI]),
|
|
emqx_ctl:run_command(["traces", "delete", TraceI]).
|
|
|
|
t_listeners(_Config) ->
|
|
%% listeners # List listeners
|
|
emqx_ctl:run_command(["listeners"]),
|
|
%% listeners stop <Identifier> # Stop a listener
|
|
%% listeners start <Identifier> # Start a listener
|
|
%% listeners restart <Identifier> # Restart a listener
|
|
ok.
|
|
|
|
t_authz(_Config) ->
|
|
%% authz cache-clean all # Clears authorization cache on all nodes
|
|
?assertMatch(ok, emqx_ctl:run_command(["authz", "cache-clean", "all"])),
|
|
ClientId = "authz_clean_test",
|
|
ClientIdBin = list_to_binary(ClientId),
|
|
%% authz cache-clean <ClientId> # Clears authorization cache for given client
|
|
?assertMatch({error, not_found}, emqx_ctl:run_command(["authz", "cache-clean", ClientId])),
|
|
{ok, C} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
|
{ok, _} = emqtt:connect(C),
|
|
{ok, _, _} = emqtt:subscribe(C, <<"topic/1">>, 1),
|
|
[Pid] = emqx_cm:lookup_channels(ClientIdBin),
|
|
?assertMatch([_], gen_server:call(Pid, list_authz_cache)),
|
|
|
|
?assertMatch(ok, emqx_ctl:run_command(["authz", "cache-clean", ClientId])),
|
|
?assertMatch([], gen_server:call(Pid, list_authz_cache)),
|
|
%% authz cache-clean node <Node> # Clears authorization cache on given node
|
|
{ok, _, _} = emqtt:subscribe(C, <<"topic/2">>, 1),
|
|
?assertMatch([_], gen_server:call(Pid, list_authz_cache)),
|
|
?assertMatch(ok, emqx_ctl:run_command(["authz", "cache-clean", "node", atom_to_list(node())])),
|
|
?assertMatch([], gen_server:call(Pid, list_authz_cache)),
|
|
ok = emqtt:disconnect(C),
|
|
ok.
|
|
|
|
t_olp(_Config) ->
|
|
%% olp status # Return OLP status if system is overloaded
|
|
emqx_ctl:run_command(["olp", "status"]),
|
|
%% olp enable # Enable overload protection
|
|
%% olp disable # Disable overload protection
|
|
ok.
|
|
|
|
t_admin(_Config) ->
|
|
%% admins add <Username> <Password> <Description> # Add dashboard user
|
|
%% admins passwd <Username> <Password> # Reset dashboard user password
|
|
%% admins del <Username> # Delete dashboard user
|
|
ok.
|
|
|
|
t_autocluster_leave(Config) ->
|
|
[Core1, Core2, Core3, Repl] = Cluster = ?config(cluster, Config),
|
|
%% Mria membership updates are async, makes sense to wait a little
|
|
timer:sleep(300),
|
|
ClusterView = [lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster],
|
|
[View1, View2, View3, View4] = ClusterView,
|
|
?assertEqual(lists:sort(Cluster), View1),
|
|
?assertEqual(View1, View2),
|
|
?assertEqual(View1, View3),
|
|
?assertEqual(View1, View4),
|
|
|
|
rpc:call(Core3, emqx_mgmt_cli, cluster, [["leave"]]),
|
|
timer:sleep(1000),
|
|
%% Replicant node may still discover and join Core3 which is now split from [Core1, Core2],
|
|
%% but it's expected to choose a bigger cluster of [Core1, Core2]..
|
|
?assertMatch([Core3], rpc:call(Core3, emqx, running_nodes, [])),
|
|
?assertEqual(undefined, rpc:call(Core1, erlang, whereis, [ekka_autocluster])),
|
|
?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core1, emqx, running_nodes, [])),
|
|
?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core2, emqx, running_nodes, [])),
|
|
?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Repl, emqx, running_nodes, [])),
|
|
|
|
rpc:call(Repl, emqx_mgmt_cli, cluster, [["leave"]]),
|
|
timer:sleep(1000),
|
|
?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core1, emqx, running_nodes, [])),
|
|
?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core2, emqx, running_nodes, [])),
|
|
|
|
rpc:call(Core3, emqx_mgmt_cli, cluster, [["discovery", "enable"]]),
|
|
rpc:call(Repl, emqx_mgmt_cli, cluster, [["discovery", "enable"]]),
|
|
%% core nodes will join and restart asyncly, may need more time to re-cluster
|
|
?assertEqual(
|
|
ok,
|
|
emqx_common_test_helpers:wait_for(
|
|
?FUNCTION_NAME,
|
|
?LINE,
|
|
fun() ->
|
|
[lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster] =:= ClusterView
|
|
end,
|
|
10_000
|
|
)
|
|
).
|