fix: list listeners cli not working
This commit is contained in:
parent
1664438b4f
commit
a8386adea3
|
@ -74,7 +74,7 @@ format_list(Listener) ->
|
|||
[
|
||||
begin
|
||||
Running = is_running(Type, listener_id(Type, LName), LConf),
|
||||
{Type, LName, maps:put(running, Running, LConf)}
|
||||
{listener_id(Type, LName), maps:put(running, Running, LConf)}
|
||||
end
|
||||
|| {LName, LConf} <- maps:to_list(Conf), is_map(LConf)
|
||||
].
|
||||
|
@ -100,13 +100,11 @@ format_raw_listeners({Type, Conf}) ->
|
|||
|
||||
-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
|
||||
is_running(ListenerId) ->
|
||||
{Type, Name} = parse_listener_id(ListenerId),
|
||||
case
|
||||
[
|
||||
Running
|
||||
|| {Type0, Name0, #{running := Running}} <- list(),
|
||||
Type0 =:= Type,
|
||||
Name0 =:= Name
|
||||
|| {Id, #{running := Running}} <- list(),
|
||||
Id =:= ListenerId
|
||||
]
|
||||
of
|
||||
[] -> {error, not_found};
|
||||
|
@ -144,7 +142,7 @@ is_running(quic, _ListenerId, _Conf) ->
|
|||
false.
|
||||
|
||||
current_conns(ID, ListenOn) ->
|
||||
{Type, Name} = parse_listener_id(ID),
|
||||
{ok, #{type := Type, name := Name}} = parse_listener_id(ID),
|
||||
current_conns(Type, Name, ListenOn).
|
||||
|
||||
current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
|
||||
|
@ -155,7 +153,7 @@ current_conns(_, _, _) ->
|
|||
{error, not_support}.
|
||||
|
||||
max_conns(ID, ListenOn) ->
|
||||
{Type, Name} = parse_listener_id(ID),
|
||||
{ok, #{type := Type, name := Name}} = parse_listener_id(ID),
|
||||
max_conns(Type, Name, ListenOn).
|
||||
|
||||
max_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
|
||||
|
@ -362,7 +360,7 @@ post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
|
|||
perform_listener_changes(Action, MapConfs) ->
|
||||
lists:foreach(
|
||||
fun({Id, Conf}) ->
|
||||
{Type, Name} = parse_listener_id(Id),
|
||||
{ok, #{type := Type, name := Name}} = parse_listener_id(Id),
|
||||
Action(Type, Name, Conf)
|
||||
end,
|
||||
maps:to_list(MapConfs)
|
||||
|
@ -485,7 +483,7 @@ parse_listener_id(Id) ->
|
|||
case string:split(str(Id), ":", leading) of
|
||||
[Type, Name] ->
|
||||
case lists:member(Type, ?TYPES_STRING) of
|
||||
true -> {list_to_existing_atom(Type), list_to_atom(Name)};
|
||||
true -> {ok, #{type => list_to_existing_atom(Type), name => list_to_atom(Name)}};
|
||||
false -> {error, {invalid_listener_id, Id}}
|
||||
end;
|
||||
_ ->
|
||||
|
@ -518,25 +516,27 @@ tcp_opts(Opts) ->
|
|||
|
||||
foreach_listeners(Do) ->
|
||||
lists:foreach(
|
||||
fun({Type, LName, LConf}) ->
|
||||
Do(Type, LName, LConf)
|
||||
fun({Id, LConf}) ->
|
||||
{ok, #{type := Type, name := Name}} = parse_listener_id(Id),
|
||||
Do(Type, Name, LConf)
|
||||
end,
|
||||
list()
|
||||
).
|
||||
|
||||
has_enabled_listener_conf_by_type(Type) ->
|
||||
lists:any(
|
||||
fun({Type0, _LName, LConf}) when is_map(LConf) ->
|
||||
fun({Id, LConf}) when is_map(LConf) ->
|
||||
{ok, #{type := Type0}} = parse_listener_id(Id),
|
||||
Type =:= Type0 andalso maps:get(enabled, LConf, true)
|
||||
end,
|
||||
list()
|
||||
).
|
||||
|
||||
apply_on_listener(ListenerId, Do) ->
|
||||
{Type, ListenerName} = parse_listener_id(ListenerId),
|
||||
case emqx_config:find_listener_conf(Type, ListenerName, []) of
|
||||
{not_found, _, _} -> error({listener_config_not_found, Type, ListenerName});
|
||||
{ok, Conf} -> Do(Type, ListenerName, Conf)
|
||||
{ok, #{type := Type, name := Name}} = parse_listener_id(ListenerId),
|
||||
case emqx_config:find_listener_conf(Type, Name, []) of
|
||||
{not_found, _, _} -> error({listener_config_not_found, Type, Name});
|
||||
{ok, Conf} -> Do(Type, Name, Conf)
|
||||
end.
|
||||
|
||||
str(A) when is_atom(A) ->
|
||||
|
|
|
@ -74,7 +74,7 @@ global_chain_config() ->
|
|||
|
||||
listener_chain_configs() ->
|
||||
lists:map(
|
||||
fun({ListenerID, _, _}) ->
|
||||
fun({ListenerID, _}) ->
|
||||
{ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])}
|
||||
end,
|
||||
emqx_listeners:list()
|
||||
|
|
|
@ -26,7 +26,8 @@
|
|||
crud_listeners_by_id/2,
|
||||
list_listeners_on_node/2,
|
||||
crud_listener_by_id_on_node/2,
|
||||
action_listeners/2
|
||||
action_listeners_by_id/2,
|
||||
action_listeners_by_id_on_node/2
|
||||
]).
|
||||
|
||||
%% for rpc call
|
||||
|
@ -45,7 +46,7 @@
|
|||
-define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
|
||||
-define(ADDR_PORT_INUSE, <<"Addr port in use">>).
|
||||
|
||||
-define(OPTS(_Type_), #{rawconf_with_defaults => true, override_to => _Type_}).
|
||||
-define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
|
||||
|
||||
namespace() -> "listeners".
|
||||
|
||||
|
@ -104,7 +105,7 @@ schema("/listeners/:id") ->
|
|||
};
|
||||
schema("/listeners/:id/:action") ->
|
||||
#{
|
||||
'operationId' => action_listeners,
|
||||
'operationId' => action_listeners_by_id,
|
||||
post => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Start/stop/restart listeners on all nodes.">>,
|
||||
|
@ -175,7 +176,7 @@ schema("/nodes/:node/listeners/:id") ->
|
|||
};
|
||||
schema("/nodes/:node/listeners/:id/:action") ->
|
||||
#{
|
||||
'operationId' => action_listeners,
|
||||
'operationId' => action_listeners_by_id_on_node,
|
||||
post => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Start/stop/restart listeners on a specified node.">>,
|
||||
|
@ -265,7 +266,7 @@ listeners_info() ->
|
|||
validate_id(Id) ->
|
||||
case emqx_listeners:parse_listener_id(Id) of
|
||||
{error, Reason} -> {error, Reason};
|
||||
_ -> ok
|
||||
{ok, _} -> ok
|
||||
end.
|
||||
|
||||
%% api
|
||||
|
@ -289,7 +290,7 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
|
|||
{400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
|
||||
end;
|
||||
crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||
case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of
|
||||
{ok, _} -> {204};
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
|
@ -299,7 +300,7 @@ parse_listener_conf(Conf0) ->
|
|||
Conf1 = maps:remove(<<"running">>, Conf0),
|
||||
{IdBin, Conf2} = maps:take(<<"id">>, Conf1),
|
||||
{TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(IdBin),
|
||||
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
|
||||
TypeAtom = binary_to_existing_atom(TypeBin),
|
||||
case Type =:= TypeAtom of
|
||||
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
|
||||
|
@ -350,10 +351,12 @@ crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
|
|||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end.
|
||||
|
||||
action_listeners(post, #{bindings := #{id := Id, action := Action, node := Node}}) ->
|
||||
action_listeners_by_id_on_node(post,
|
||||
#{bindings := #{id := Id, action := Action, node := Node}}) ->
|
||||
{_, Result} = action_listeners(Node, Id, Action),
|
||||
Result;
|
||||
action_listeners(post, #{bindings := #{id := Id, action := Action}}) ->
|
||||
Result.
|
||||
|
||||
action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
|
||||
Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
|
||||
case
|
||||
lists:filter(
|
||||
|
@ -455,7 +458,7 @@ do_list_listeners() ->
|
|||
-spec do_update_listener(string(), emqx_config:update_request()) ->
|
||||
{ok, map()} | {error, _}.
|
||||
do_update_listener(Id, Config) ->
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||
case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of
|
||||
{ok, #{raw_config := RawConf}} -> {ok, RawConf};
|
||||
{error, Reason} -> {error, Reason}
|
||||
|
@ -463,7 +466,7 @@ do_update_listener(Id, Config) ->
|
|||
|
||||
-spec do_remove_listener(string()) -> ok.
|
||||
do_remove_listener(Id) ->
|
||||
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
||||
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||
case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of
|
||||
{ok, _} -> ok;
|
||||
{error, Reason} -> error(Reason)
|
||||
|
|
|
@ -26,36 +26,39 @@
|
|||
|
||||
-export([load/0]).
|
||||
|
||||
-export([ status/1
|
||||
, broker/1
|
||||
, cluster/1
|
||||
, clients/1
|
||||
, routes/1
|
||||
, subscriptions/1
|
||||
, plugins/1
|
||||
, listeners/1
|
||||
, vm/1
|
||||
, mnesia/1
|
||||
, trace/1
|
||||
, traces/1
|
||||
, log/1
|
||||
, authz/1
|
||||
, olp/1
|
||||
]).
|
||||
-export([
|
||||
status/1,
|
||||
broker/1,
|
||||
cluster/1,
|
||||
clients/1,
|
||||
routes/1,
|
||||
subscriptions/1,
|
||||
plugins/1,
|
||||
listeners/1,
|
||||
vm/1,
|
||||
mnesia/1,
|
||||
trace/1,
|
||||
traces/1,
|
||||
log/1,
|
||||
authz/1,
|
||||
olp/1
|
||||
]).
|
||||
|
||||
-define(PROC_INFOKEYS, [status,
|
||||
memory,
|
||||
message_queue_len,
|
||||
total_heap_size,
|
||||
heap_size,
|
||||
stack_size,
|
||||
reductions]).
|
||||
-define(PROC_INFOKEYS, [
|
||||
status,
|
||||
memory,
|
||||
message_queue_len,
|
||||
total_heap_size,
|
||||
heap_size,
|
||||
stack_size,
|
||||
reductions
|
||||
]).
|
||||
|
||||
-define(MAX_LIMIT, 10000).
|
||||
|
||||
-define(APP, emqx).
|
||||
|
||||
-spec(load() -> ok).
|
||||
-spec load() -> ok.
|
||||
load() ->
|
||||
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
|
||||
lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
|
||||
|
@ -70,7 +73,7 @@ status([]) ->
|
|||
{InternalStatus, _ProvidedStatus} = init:get_status(),
|
||||
emqx_ctl:print("Node ~p ~ts is ~p~n", [node(), emqx_app:get_release(), InternalStatus]);
|
||||
status(_) ->
|
||||
emqx_ctl:usage("status", "Show broker status").
|
||||
emqx_ctl:usage("status", "Show broker status").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc Query broker
|
||||
|
@ -79,19 +82,22 @@ broker([]) ->
|
|||
Funs = [sysdescr, version, datetime],
|
||||
[emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs],
|
||||
emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]);
|
||||
|
||||
broker(["stats"]) ->
|
||||
[emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
|
||||
|| {Stat, Val} <- lists:sort(emqx_stats:getstats())];
|
||||
|
||||
[
|
||||
emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
|
||||
|| {Stat, Val} <- lists:sort(emqx_stats:getstats())
|
||||
];
|
||||
broker(["metrics"]) ->
|
||||
[emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
|
||||
|| {Metric, Val} <- lists:sort(emqx_metrics:all())];
|
||||
|
||||
[
|
||||
emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
|
||||
|| {Metric, Val} <- lists:sort(emqx_metrics:all())
|
||||
];
|
||||
broker(_) ->
|
||||
emqx_ctl:usage([{"broker", "Show broker version, uptime and description"},
|
||||
{"broker stats", "Show broker statistics of clients, topics, subscribers"},
|
||||
{"broker metrics", "Show broker metrics"}]).
|
||||
emqx_ctl:usage([
|
||||
{"broker", "Show broker version, uptime and description"},
|
||||
{"broker stats", "Show broker statistics of clients, topics, subscribers"},
|
||||
{"broker metrics", "Show broker metrics"}
|
||||
]).
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% @doc Cluster with other nodes
|
||||
|
@ -106,7 +112,6 @@ cluster(["join", SNode]) ->
|
|||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
|
||||
end;
|
||||
|
||||
cluster(["leave"]) ->
|
||||
case ekka:leave() of
|
||||
ok ->
|
||||
|
@ -115,7 +120,6 @@ cluster(["leave"]) ->
|
|||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
|
||||
end;
|
||||
|
||||
cluster(["force-leave", SNode]) ->
|
||||
case ekka:force_leave(ekka_node:parse_name(SNode)) of
|
||||
ok ->
|
||||
|
@ -126,38 +130,37 @@ cluster(["force-leave", SNode]) ->
|
|||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
|
||||
end;
|
||||
|
||||
cluster(["status"]) ->
|
||||
emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]);
|
||||
|
||||
cluster(_) ->
|
||||
emqx_ctl:usage([{"cluster join <Node>", "Join the cluster"},
|
||||
{"cluster leave", "Leave the cluster"},
|
||||
{"cluster force-leave <Node>","Force the node leave from cluster"},
|
||||
{"cluster status", "Cluster status"}]).
|
||||
emqx_ctl:usage([
|
||||
{"cluster join <Node>", "Join the cluster"},
|
||||
{"cluster leave", "Leave the cluster"},
|
||||
{"cluster force-leave <Node>", "Force the node leave from cluster"},
|
||||
{"cluster status", "Cluster status"}
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc Query clients
|
||||
|
||||
clients(["list"]) ->
|
||||
dump(emqx_channel, client);
|
||||
|
||||
clients(["show", ClientId]) ->
|
||||
if_client(ClientId, fun print/1);
|
||||
|
||||
clients(["kick", ClientId]) ->
|
||||
ok = emqx_cm:kick_session(bin(ClientId)),
|
||||
emqx_ctl:print("ok~n");
|
||||
|
||||
clients(_) ->
|
||||
emqx_ctl:usage([{"clients list", "List all clients"},
|
||||
{"clients show <ClientId>", "Show a client"},
|
||||
{"clients kick <ClientId>", "Kick out a client"}]).
|
||||
emqx_ctl:usage([
|
||||
{"clients list", "List all clients"},
|
||||
{"clients show <ClientId>", "Show a client"},
|
||||
{"clients kick <ClientId>", "Kick out a client"}
|
||||
]).
|
||||
|
||||
if_client(ClientId, Fun) ->
|
||||
case ets:lookup(emqx_channel, (bin(ClientId))) of
|
||||
[] -> emqx_ctl:print("Not Found.~n");
|
||||
[Channel] -> Fun({client, Channel})
|
||||
[Channel] -> Fun({client, Channel})
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -165,20 +168,22 @@ if_client(ClientId, Fun) ->
|
|||
|
||||
routes(["list"]) ->
|
||||
dump(emqx_route);
|
||||
|
||||
routes(["show", Topic]) ->
|
||||
Routes = ets:lookup(emqx_route, bin(Topic)),
|
||||
[print({emqx_route, Route}) || Route <- Routes];
|
||||
|
||||
routes(_) ->
|
||||
emqx_ctl:usage([{"routes list", "List all routes"},
|
||||
{"routes show <Topic>", "Show a route"}]).
|
||||
emqx_ctl:usage([
|
||||
{"routes list", "List all routes"},
|
||||
{"routes show <Topic>", "Show a route"}
|
||||
]).
|
||||
|
||||
subscriptions(["list"]) ->
|
||||
lists:foreach(fun(Suboption) ->
|
||||
print({emqx_suboption, Suboption})
|
||||
end, ets:tab2list(emqx_suboption));
|
||||
|
||||
lists:foreach(
|
||||
fun(Suboption) ->
|
||||
print({emqx_suboption, Suboption})
|
||||
end,
|
||||
ets:tab2list(emqx_suboption)
|
||||
);
|
||||
subscriptions(["show", ClientId]) ->
|
||||
case ets:lookup(emqx_subid, bin(ClientId)) of
|
||||
[] ->
|
||||
|
@ -186,43 +191,45 @@ subscriptions(["show", ClientId]) ->
|
|||
[{_, Pid}] ->
|
||||
case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of
|
||||
[] -> emqx_ctl:print("Not Found.~n");
|
||||
Suboption ->
|
||||
[print({emqx_suboption, Sub}) || Sub <- Suboption]
|
||||
Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption]
|
||||
end
|
||||
end;
|
||||
|
||||
subscriptions(["add", ClientId, Topic, QoS]) ->
|
||||
if_valid_qos(QoS, fun(IntQos) ->
|
||||
case ets:lookup(emqx_channel, bin(ClientId)) of
|
||||
[] -> emqx_ctl:print("Error: Channel not found!");
|
||||
[{_, Pid}] ->
|
||||
{Topic1, Options} = emqx_topic:parse(bin(Topic)),
|
||||
Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]},
|
||||
emqx_ctl:print("ok~n")
|
||||
end
|
||||
end);
|
||||
|
||||
if_valid_qos(QoS, fun(IntQos) ->
|
||||
case ets:lookup(emqx_channel, bin(ClientId)) of
|
||||
[] ->
|
||||
emqx_ctl:print("Error: Channel not found!");
|
||||
[{_, Pid}] ->
|
||||
{Topic1, Options} = emqx_topic:parse(bin(Topic)),
|
||||
Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]},
|
||||
emqx_ctl:print("ok~n")
|
||||
end
|
||||
end);
|
||||
subscriptions(["del", ClientId, Topic]) ->
|
||||
case ets:lookup(emqx_channel, bin(ClientId)) of
|
||||
[] -> emqx_ctl:print("Error: Channel not found!");
|
||||
[] ->
|
||||
emqx_ctl:print("Error: Channel not found!");
|
||||
[{_, Pid}] ->
|
||||
Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]},
|
||||
emqx_ctl:print("ok~n")
|
||||
end;
|
||||
|
||||
subscriptions(_) ->
|
||||
emqx_ctl:usage(
|
||||
[{"subscriptions list", "List all subscriptions"},
|
||||
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
||||
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
||||
{"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
|
||||
[
|
||||
{"subscriptions list", "List all subscriptions"},
|
||||
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
||||
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
||||
{"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}
|
||||
]
|
||||
).
|
||||
|
||||
if_valid_qos(QoS, Fun) ->
|
||||
try list_to_integer(QoS) of
|
||||
Int when ?IS_QOS(Int) -> Fun(Int);
|
||||
_ -> emqx_ctl:print("QoS should be 0, 1, 2~n")
|
||||
catch _:_ ->
|
||||
emqx_ctl:print("QoS should be 0, 1, 2~n")
|
||||
catch
|
||||
_:_ ->
|
||||
emqx_ctl:print("QoS should be 0, 1, 2~n")
|
||||
end.
|
||||
|
||||
plugins(["list"]) ->
|
||||
|
@ -231,7 +238,7 @@ plugins(["describe", NameVsn]) ->
|
|||
emqx_plugins_cli:describe(NameVsn, fun emqx_ctl:print/2);
|
||||
plugins(["install", NameVsn]) ->
|
||||
emqx_plugins_cli:ensure_installed(NameVsn, fun emqx_ctl:print/2);
|
||||
plugins(["uninstall", NameVsn])->
|
||||
plugins(["uninstall", NameVsn]) ->
|
||||
emqx_plugins_cli:ensure_uninstalled(NameVsn, fun emqx_ctl:print/2);
|
||||
plugins(["start", NameVsn]) ->
|
||||
emqx_plugins_cli:ensure_started(NameVsn, fun emqx_ctl:print/2);
|
||||
|
@ -251,69 +258,73 @@ plugins(["enable", NameVsn, "before", Other]) ->
|
|||
emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2);
|
||||
plugins(_) ->
|
||||
emqx_ctl:usage(
|
||||
[{"plugins <command> [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"},
|
||||
{"plugins list", "List all installed plugins"},
|
||||
{"plugins describe Name-Vsn", "Describe an installed plugins"},
|
||||
{"plugins install Name-Vsn", "Install a plugin package placed\n"
|
||||
"in plugin'sinstall_dir"},
|
||||
{"plugins uninstall Name-Vsn", "Uninstall a plugin. NOTE: it deletes\n"
|
||||
"all files in install_dir/Name-Vsn"},
|
||||
{"plugins start Name-Vsn", "Start a plugin"},
|
||||
{"plugins stop Name-Vsn", "Stop a plugin"},
|
||||
{"plugins restart Name-Vsn", "Stop then start a plugin"},
|
||||
{"plugins disable Name-Vsn", "Disable auto-boot"},
|
||||
{"plugins enable Name-Vsn [Position]",
|
||||
"Enable auto-boot at Position in the boot list, where Position could be\n"
|
||||
"'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n"
|
||||
"The Position parameter can be used to adjust the boot order.\n"
|
||||
"If no Position is given, an already configured plugin\n"
|
||||
"will stay at is old position; a newly plugin is appended to the rear\n"
|
||||
"e.g. plugins disable foo-0.1.0 front\n"
|
||||
" plugins enable bar-0.2.0 before foo-0.1.0"}
|
||||
]).
|
||||
[
|
||||
{"plugins <command> [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"},
|
||||
{"plugins list", "List all installed plugins"},
|
||||
{"plugins describe Name-Vsn", "Describe an installed plugins"},
|
||||
{"plugins install Name-Vsn",
|
||||
"Install a plugin package placed\n"
|
||||
"in plugin'sinstall_dir"},
|
||||
{"plugins uninstall Name-Vsn",
|
||||
"Uninstall a plugin. NOTE: it deletes\n"
|
||||
"all files in install_dir/Name-Vsn"},
|
||||
{"plugins start Name-Vsn", "Start a plugin"},
|
||||
{"plugins stop Name-Vsn", "Stop a plugin"},
|
||||
{"plugins restart Name-Vsn", "Stop then start a plugin"},
|
||||
{"plugins disable Name-Vsn", "Disable auto-boot"},
|
||||
{"plugins enable Name-Vsn [Position]",
|
||||
"Enable auto-boot at Position in the boot list, where Position could be\n"
|
||||
"'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n"
|
||||
"The Position parameter can be used to adjust the boot order.\n"
|
||||
"If no Position is given, an already configured plugin\n"
|
||||
"will stay at is old position; a newly plugin is appended to the rear\n"
|
||||
"e.g. plugins disable foo-0.1.0 front\n"
|
||||
" plugins enable bar-0.2.0 before foo-0.1.0"}
|
||||
]
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc vm command
|
||||
|
||||
vm([]) ->
|
||||
vm(["all"]);
|
||||
|
||||
vm(["all"]) ->
|
||||
[vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
|
||||
|
||||
vm(["load"]) ->
|
||||
[emqx_ctl:print("cpu/~-20s: ~ts~n", [L, V]) || {L, V} <- emqx_vm:loads()];
|
||||
|
||||
vm(["memory"]) ->
|
||||
[emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
|
||||
|
||||
vm(["process"]) ->
|
||||
[emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
|
||||
|| {Name, Key} <- [{limit, process_limit}, {count, process_count}]];
|
||||
|
||||
[
|
||||
emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
|
||||
|| {Name, Key} <- [{limit, process_limit}, {count, process_count}]
|
||||
];
|
||||
vm(["io"]) ->
|
||||
IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))),
|
||||
[emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
|
||||
|| Key <- [max_fds, active_fds]];
|
||||
|
||||
[
|
||||
emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
|
||||
|| Key <- [max_fds, active_fds]
|
||||
];
|
||||
vm(["ports"]) ->
|
||||
[emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
|
||||
|| {Name, Key} <- [{count, port_count}, {limit, port_limit}]];
|
||||
|
||||
[
|
||||
emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
|
||||
|| {Name, Key} <- [{count, port_count}, {limit, port_limit}]
|
||||
];
|
||||
vm(_) ->
|
||||
emqx_ctl:usage([{"vm all", "Show info of Erlang VM"},
|
||||
{"vm load", "Show load of Erlang VM"},
|
||||
{"vm memory", "Show memory of Erlang VM"},
|
||||
{"vm process", "Show process of Erlang VM"},
|
||||
{"vm io", "Show IO of Erlang VM"},
|
||||
{"vm ports", "Show Ports of Erlang VM"}]).
|
||||
emqx_ctl:usage([
|
||||
{"vm all", "Show info of Erlang VM"},
|
||||
{"vm load", "Show load of Erlang VM"},
|
||||
{"vm memory", "Show memory of Erlang VM"},
|
||||
{"vm process", "Show process of Erlang VM"},
|
||||
{"vm io", "Show IO of Erlang VM"},
|
||||
{"vm ports", "Show Ports of Erlang VM"}
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc mnesia Command
|
||||
|
||||
mnesia([]) ->
|
||||
mnesia:system_info();
|
||||
|
||||
mnesia(_) ->
|
||||
emqx_ctl:usage([{"mnesia", "Mnesia system info"}]).
|
||||
|
||||
|
@ -325,40 +336,40 @@ log(["set-level", Level]) ->
|
|||
ok -> emqx_ctl:print("~ts~n", [Level]);
|
||||
Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error])
|
||||
end;
|
||||
|
||||
log(["primary-level"]) ->
|
||||
Level = emqx_logger:get_primary_log_level(),
|
||||
emqx_ctl:print("~ts~n", [Level]);
|
||||
|
||||
log(["primary-level", Level]) ->
|
||||
_ = emqx_logger:set_primary_log_level(list_to_atom(Level)),
|
||||
emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]);
|
||||
|
||||
log(["handlers", "list"]) ->
|
||||
_ = [emqx_ctl:print(
|
||||
_ = [
|
||||
emqx_ctl:print(
|
||||
"LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n",
|
||||
[Id, Level, Dst, Status]
|
||||
)
|
||||
|| #{id := Id,
|
||||
level := Level,
|
||||
dst := Dst,
|
||||
status := Status} <- emqx_logger:get_log_handlers()],
|
||||
)
|
||||
|| #{
|
||||
id := Id,
|
||||
level := Level,
|
||||
dst := Dst,
|
||||
status := Status
|
||||
} <- emqx_logger:get_log_handlers()
|
||||
],
|
||||
ok;
|
||||
|
||||
log(["handlers", "start", HandlerId]) ->
|
||||
case emqx_logger:start_log_handler(list_to_atom(HandlerId)) of
|
||||
ok -> emqx_ctl:print("log handler ~ts started~n", [HandlerId]);
|
||||
ok ->
|
||||
emqx_ctl:print("log handler ~ts started~n", [HandlerId]);
|
||||
{error, Reason} ->
|
||||
emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [HandlerId, Reason])
|
||||
end;
|
||||
|
||||
log(["handlers", "stop", HandlerId]) ->
|
||||
case emqx_logger:stop_log_handler(list_to_atom(HandlerId)) of
|
||||
ok -> emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]);
|
||||
ok ->
|
||||
emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]);
|
||||
{error, Reason} ->
|
||||
emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [HandlerId, Reason])
|
||||
end;
|
||||
|
||||
log(["handlers", "set-level", HandlerId, Level]) ->
|
||||
case emqx_logger:set_log_handler_level(list_to_atom(HandlerId), list_to_atom(Level)) of
|
||||
ok ->
|
||||
|
@ -367,57 +378,60 @@ log(["handlers", "set-level", HandlerId, Level]) ->
|
|||
{error, Error} ->
|
||||
emqx_ctl:print("[error] ~p~n", [Error])
|
||||
end;
|
||||
|
||||
log(_) ->
|
||||
emqx_ctl:usage(
|
||||
[{"log set-level <Level>", "Set the overall log level"},
|
||||
{"log primary-level", "Show the primary log level now"},
|
||||
{"log primary-level <Level>","Set the primary log level"},
|
||||
{"log handlers list", "Show log handlers"},
|
||||
{"log handlers start <HandlerId>", "Start a log handler"},
|
||||
{"log handlers stop <HandlerId>", "Stop a log handler"},
|
||||
{"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}]).
|
||||
[
|
||||
{"log set-level <Level>", "Set the overall log level"},
|
||||
{"log primary-level", "Show the primary log level now"},
|
||||
{"log primary-level <Level>", "Set the primary log level"},
|
||||
{"log handlers list", "Show log handlers"},
|
||||
{"log handlers start <HandlerId>", "Start a log handler"},
|
||||
{"log handlers stop <HandlerId>", "Stop a log handler"},
|
||||
{"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}
|
||||
]
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc Trace Command
|
||||
|
||||
trace(["list"]) ->
|
||||
lists:foreach(fun(Trace) ->
|
||||
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
||||
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
|
||||
end, emqx_trace_handler:running());
|
||||
|
||||
lists:foreach(
|
||||
fun(Trace) ->
|
||||
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
||||
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
|
||||
end,
|
||||
emqx_trace_handler:running()
|
||||
);
|
||||
trace(["stop", Operation, Filter0]) ->
|
||||
case trace_type(Operation, Filter0) of
|
||||
{ok, Type, Filter} -> trace_off(Type, Filter);
|
||||
error -> trace([])
|
||||
end;
|
||||
|
||||
trace(["start", Operation, ClientId, LogFile]) ->
|
||||
trace(["start", Operation, ClientId, LogFile, "all"]);
|
||||
|
||||
trace(["start", Operation, Filter0, LogFile, Level]) ->
|
||||
case trace_type(Operation, Filter0) of
|
||||
{ok, Type, Filter} ->
|
||||
trace_on(name(Filter0), Type, Filter,
|
||||
list_to_existing_atom(Level), LogFile);
|
||||
error -> trace([])
|
||||
trace_on(
|
||||
name(Filter0),
|
||||
Type,
|
||||
Filter,
|
||||
list_to_existing_atom(Level),
|
||||
LogFile
|
||||
);
|
||||
error ->
|
||||
trace([])
|
||||
end;
|
||||
|
||||
trace(_) ->
|
||||
emqx_ctl:usage([{"trace list", "List all traces started on local node"},
|
||||
{"trace start client <ClientId> <File> [<Level>]",
|
||||
"Traces for a client on local node"},
|
||||
{"trace stop client <ClientId>",
|
||||
"Stop tracing for a client on local node"},
|
||||
{"trace start topic <Topic> <File> [<Level>] ",
|
||||
"Traces for a topic on local node"},
|
||||
{"trace stop topic <Topic> ",
|
||||
"Stop tracing for a topic on local node"},
|
||||
emqx_ctl:usage([
|
||||
{"trace list", "List all traces started on local node"},
|
||||
{"trace start client <ClientId> <File> [<Level>]", "Traces for a client on local node"},
|
||||
{"trace stop client <ClientId>", "Stop tracing for a client on local node"},
|
||||
{"trace start topic <Topic> <File> [<Level>] ", "Traces for a topic on local node"},
|
||||
{"trace stop topic <Topic> ", "Stop tracing for a topic on local node"},
|
||||
{"trace start ip_address <IP> <File> [<Level>] ",
|
||||
"Traces for a client ip on local node"},
|
||||
{"trace stop ip_addresss <IP> ",
|
||||
"Stop tracing for a client ip on local node"}
|
||||
{"trace stop ip_addresss <IP> ", "Stop tracing for a client ip on local node"}
|
||||
]).
|
||||
|
||||
trace_on(Name, Type, Filter, Level, LogFile) ->
|
||||
|
@ -447,32 +461,37 @@ traces(["list"]) ->
|
|||
[] ->
|
||||
emqx_ctl:print("Cluster Trace is empty~n", []);
|
||||
_ ->
|
||||
lists:foreach(fun(Trace) ->
|
||||
#{type := Type, name := Name, status := Status,
|
||||
log_size := LogSize} = Trace,
|
||||
emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n",
|
||||
[Name, Type, maps:get(Type, Trace), Status, LogSize])
|
||||
end, List)
|
||||
lists:foreach(
|
||||
fun(Trace) ->
|
||||
#{
|
||||
type := Type,
|
||||
name := Name,
|
||||
status := Status,
|
||||
log_size := LogSize
|
||||
} = Trace,
|
||||
emqx_ctl:print(
|
||||
"Trace(~s: ~s=~s, ~s, LogSize:~p)~n",
|
||||
[Name, Type, maps:get(Type, Trace), Status, LogSize]
|
||||
)
|
||||
end,
|
||||
List
|
||||
)
|
||||
end,
|
||||
length(List);
|
||||
|
||||
traces(["stop", Name]) ->
|
||||
trace_cluster_off(Name);
|
||||
|
||||
traces(["delete", Name]) ->
|
||||
trace_cluster_del(Name);
|
||||
|
||||
traces(["start", Name, Operation, Filter]) ->
|
||||
traces(["start", Name, Operation, Filter, "900"]);
|
||||
|
||||
traces(["start", Name, Operation, Filter0, DurationS]) ->
|
||||
case trace_type(Operation, Filter0) of
|
||||
{ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
|
||||
{ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
|
||||
error -> traces([])
|
||||
end;
|
||||
|
||||
traces(_) ->
|
||||
emqx_ctl:usage([{"traces list", "List all cluster traces started"},
|
||||
emqx_ctl:usage([
|
||||
{"traces list", "List all cluster traces started"},
|
||||
{"traces start <Name> client <ClientId>", "Traces for a client in cluster"},
|
||||
{"traces start <Name> topic <Topic>", "Traces for a topic in cluster"},
|
||||
{"traces start <Name> ip_address <IPAddr>", "Traces for a IP in cluster"},
|
||||
|
@ -483,18 +502,21 @@ traces(_) ->
|
|||
trace_cluster_on(Name, Type, Filter, DurationS0) ->
|
||||
DurationS = list_to_integer(DurationS0),
|
||||
Now = erlang:system_time(second),
|
||||
Trace = #{ name => list_to_binary(Name)
|
||||
, type => atom_to_binary(Type)
|
||||
, Type => list_to_binary(Filter)
|
||||
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Now))
|
||||
, end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS))
|
||||
},
|
||||
Trace = #{
|
||||
name => list_to_binary(Name),
|
||||
type => atom_to_binary(Type),
|
||||
Type => list_to_binary(Filter),
|
||||
start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)),
|
||||
end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS))
|
||||
},
|
||||
case emqx_trace:create(Trace) of
|
||||
{ok, _} ->
|
||||
emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("[error] cluster_trace ~s ~s=~s ~p~n",
|
||||
[Name, Type, Filter, Error])
|
||||
emqx_ctl:print(
|
||||
"[error] cluster_trace ~s ~s=~s ~p~n",
|
||||
[Name, Type, Filter, Error]
|
||||
)
|
||||
end.
|
||||
|
||||
trace_cluster_del(Name) ->
|
||||
|
@ -518,29 +540,34 @@ trace_type(_, _) -> error.
|
|||
%% @doc Listeners Command
|
||||
|
||||
listeners([]) ->
|
||||
lists:foreach(fun({ID, _Name, Conf}) ->
|
||||
{Host, Port} = maps:get(bind, Conf),
|
||||
Acceptors = maps:get(acceptors, Conf),
|
||||
ProxyProtocol = maps:get(proxy_protocol, Conf, undefined),
|
||||
Running = maps:get(running, Conf),
|
||||
CurrentConns = case emqx_listeners:current_conns(ID, {Host, Port}) of
|
||||
{error, _} -> [];
|
||||
CC -> [{current_conn, CC}]
|
||||
end,
|
||||
MaxConn = case emqx_listeners:max_conns(ID, {Host, Port}) of
|
||||
{error, _} -> [];
|
||||
MC -> [{max_conns, MC}]
|
||||
end,
|
||||
Info = [
|
||||
{listen_on, {string, format_listen_on(Port)}},
|
||||
{acceptors, Acceptors},
|
||||
{proxy_protocol, ProxyProtocol},
|
||||
{running, Running}
|
||||
lists:foreach(
|
||||
fun({ID, Conf}) ->
|
||||
{Host, Port} = maps:get(bind, Conf),
|
||||
Acceptors = maps:get(acceptors, Conf),
|
||||
ProxyProtocol = maps:get(proxy_protocol, Conf, undefined),
|
||||
Running = maps:get(running, Conf),
|
||||
CurrentConns =
|
||||
case emqx_listeners:current_conns(ID, {Host, Port}) of
|
||||
{error, _} -> [];
|
||||
CC -> [{current_conn, CC}]
|
||||
end,
|
||||
MaxConn =
|
||||
case emqx_listeners:max_conns(ID, {Host, Port}) of
|
||||
{error, _} -> [];
|
||||
MC -> [{max_conns, MC}]
|
||||
end,
|
||||
Info =
|
||||
[
|
||||
{listen_on, {string, format_listen_on(Port)}},
|
||||
{acceptors, Acceptors},
|
||||
{proxy_protocol, ProxyProtocol},
|
||||
{running, Running}
|
||||
] ++ CurrentConns ++ MaxConn,
|
||||
emqx_ctl:print("~ts~n", [ID]),
|
||||
lists:foreach(fun indent_print/1, Info)
|
||||
end, emqx_listeners:list());
|
||||
|
||||
emqx_ctl:print("~ts~n", [ID]),
|
||||
lists:foreach(fun indent_print/1, Info)
|
||||
end,
|
||||
emqx_listeners:list()
|
||||
);
|
||||
listeners(["stop", ListenerId]) ->
|
||||
case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of
|
||||
ok ->
|
||||
|
@ -548,7 +575,6 @@ listeners(["stop", ListenerId]) ->
|
|||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error])
|
||||
end;
|
||||
|
||||
listeners(["start", ListenerId]) ->
|
||||
case emqx_listeners:start_listener(list_to_atom(ListenerId)) of
|
||||
ok ->
|
||||
|
@ -556,7 +582,6 @@ listeners(["start", ListenerId]) ->
|
|||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error])
|
||||
end;
|
||||
|
||||
listeners(["restart", ListenerId]) ->
|
||||
case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of
|
||||
ok ->
|
||||
|
@ -564,13 +589,13 @@ listeners(["restart", ListenerId]) ->
|
|||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error])
|
||||
end;
|
||||
|
||||
listeners(_) ->
|
||||
emqx_ctl:usage([{"listeners", "List listeners"},
|
||||
{"listeners stop <Identifier>", "Stop a listener"},
|
||||
{"listeners start <Identifier>", "Start a listener"},
|
||||
{"listeners restart <Identifier>", "Restart a listener"}
|
||||
]).
|
||||
emqx_ctl:usage([
|
||||
{"listeners", "List listeners"},
|
||||
{"listeners stop <Identifier>", "Stop a listener"},
|
||||
{"listeners start <Identifier>", "Start a listener"},
|
||||
{"listeners restart <Identifier>", "Restart a listener"}
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc authz Command
|
||||
|
@ -582,7 +607,6 @@ authz(["cache-clean", "node", Node]) ->
|
|||
{error, Reason} ->
|
||||
emqx_ctl:print("Authorization drain failed on node ~ts: ~0p.~n", [Node, Reason])
|
||||
end;
|
||||
|
||||
authz(["cache-clean", "all"]) ->
|
||||
case emqx_mgmt:clean_authz_cache_all() of
|
||||
ok ->
|
||||
|
@ -590,22 +614,22 @@ authz(["cache-clean", "all"]) ->
|
|||
{error, Reason} ->
|
||||
emqx_ctl:print("Authorization cache-clean failed: ~p.~n", [Reason])
|
||||
end;
|
||||
|
||||
authz(["cache-clean", ClientId]) ->
|
||||
emqx_mgmt:clean_authz_cache(ClientId);
|
||||
|
||||
authz(_) ->
|
||||
emqx_ctl:usage(
|
||||
[{"authz cache-clean all", "Clears authorization cache on all nodes"},
|
||||
{"authz cache-clean node <Node>", "Clears authorization cache on given node"},
|
||||
{"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
|
||||
]).
|
||||
|
||||
[
|
||||
{"authz cache-clean all", "Clears authorization cache on all nodes"},
|
||||
{"authz cache-clean node <Node>", "Clears authorization cache on given node"},
|
||||
{"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
|
||||
]
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc OLP (Overload Protection related)
|
||||
olp(["status"]) ->
|
||||
S = case emqx_olp:is_overloaded() of
|
||||
S =
|
||||
case emqx_olp:is_overloaded() of
|
||||
true -> "overloaded";
|
||||
false -> "not overloaded"
|
||||
end,
|
||||
|
@ -617,10 +641,11 @@ olp(["enable"]) ->
|
|||
Res = emqx_olp:enable(),
|
||||
emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]);
|
||||
olp(_) ->
|
||||
emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"},
|
||||
{"olp enable", "Enable overload protection"},
|
||||
{"olp disable", "Disable overload protection"}
|
||||
]).
|
||||
emqx_ctl:usage([
|
||||
{"olp status", "Return OLP status if system is overloaded"},
|
||||
{"olp enable", "Enable overload protection"},
|
||||
{"olp disable", "Disable overload protection"}
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Dump ETS
|
||||
|
@ -634,78 +659,114 @@ dump(Table, Tag) ->
|
|||
|
||||
dump(_Table, _, '$end_of_table', Result) ->
|
||||
lists:reverse(Result);
|
||||
|
||||
dump(Table, Tag, Key, Result) ->
|
||||
PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)],
|
||||
dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
|
||||
|
||||
print({_, []}) ->
|
||||
ok;
|
||||
|
||||
print({client, {ClientId, ChanPid}}) ->
|
||||
Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of
|
||||
undefined -> #{};
|
||||
Attrs0 -> Attrs0
|
||||
end,
|
||||
Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of
|
||||
undefined -> #{};
|
||||
Stats0 -> maps:from_list(Stats0)
|
||||
end,
|
||||
Attrs =
|
||||
case emqx_cm:get_chan_info(ClientId, ChanPid) of
|
||||
undefined -> #{};
|
||||
Attrs0 -> Attrs0
|
||||
end,
|
||||
Stats =
|
||||
case emqx_cm:get_chan_stats(ClientId, ChanPid) of
|
||||
undefined -> #{};
|
||||
Stats0 -> maps:from_list(Stats0)
|
||||
end,
|
||||
ClientInfo = maps:get(clientinfo, Attrs, #{}),
|
||||
ConnInfo = maps:get(conninfo, Attrs, #{}),
|
||||
Session = maps:get(session, Attrs, #{}),
|
||||
Connected = case maps:get(conn_state, Attrs) of
|
||||
connected -> true;
|
||||
_ -> false
|
||||
end,
|
||||
Info = lists:foldl(fun(Items, Acc) ->
|
||||
maps:merge(Items, Acc)
|
||||
end, #{connected => Connected},
|
||||
[maps:with([subscriptions_cnt, inflight_cnt, awaiting_rel_cnt,
|
||||
mqueue_len, mqueue_dropped, send_msg], Stats),
|
||||
maps:with([clientid, username], ClientInfo),
|
||||
maps:with([peername, clean_start, keepalive, expiry_interval,
|
||||
connected_at, disconnected_at], ConnInfo),
|
||||
maps:with([created_at], Session)]),
|
||||
InfoKeys = [clientid, username, peername, clean_start, keepalive,
|
||||
expiry_interval, subscriptions_cnt, inflight_cnt,
|
||||
awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
|
||||
connected, created_at, connected_at] ++
|
||||
case maps:is_key(disconnected_at, Info) of
|
||||
true -> [disconnected_at];
|
||||
false -> []
|
||||
end,
|
||||
Connected =
|
||||
case maps:get(conn_state, Attrs) of
|
||||
connected -> true;
|
||||
_ -> false
|
||||
end,
|
||||
Info = lists:foldl(
|
||||
fun(Items, Acc) ->
|
||||
maps:merge(Items, Acc)
|
||||
end,
|
||||
#{connected => Connected},
|
||||
[
|
||||
maps:with(
|
||||
[
|
||||
subscriptions_cnt,
|
||||
inflight_cnt,
|
||||
awaiting_rel_cnt,
|
||||
mqueue_len,
|
||||
mqueue_dropped,
|
||||
send_msg
|
||||
],
|
||||
Stats
|
||||
),
|
||||
maps:with([clientid, username], ClientInfo),
|
||||
maps:with(
|
||||
[
|
||||
peername,
|
||||
clean_start,
|
||||
keepalive,
|
||||
expiry_interval,
|
||||
connected_at,
|
||||
disconnected_at
|
||||
],
|
||||
ConnInfo
|
||||
),
|
||||
maps:with([created_at], Session)
|
||||
]
|
||||
),
|
||||
InfoKeys =
|
||||
[
|
||||
clientid,
|
||||
username,
|
||||
peername,
|
||||
clean_start,
|
||||
keepalive,
|
||||
expiry_interval,
|
||||
subscriptions_cnt,
|
||||
inflight_cnt,
|
||||
awaiting_rel_cnt,
|
||||
send_msg,
|
||||
mqueue_len,
|
||||
mqueue_dropped,
|
||||
connected,
|
||||
created_at,
|
||||
connected_at
|
||||
] ++
|
||||
case maps:is_key(disconnected_at, Info) of
|
||||
true -> [disconnected_at];
|
||||
false -> []
|
||||
end,
|
||||
Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
|
||||
emqx_ctl:print(
|
||||
"Client(~ts, username=~ts, peername=~ts, clean_start=~ts, "
|
||||
"keepalive=~w, session_expiry_interval=~w, subscriptions=~w, "
|
||||
"inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, "
|
||||
"dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w"
|
||||
++ case maps:is_key(disconnected_at, Info1) of
|
||||
true -> ", disconnected_at=~w)~n";
|
||||
false -> ")~n"
|
||||
end,
|
||||
[format(K, maps:get(K, Info1)) || K <- InfoKeys]);
|
||||
|
||||
"dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" ++
|
||||
case maps:is_key(disconnected_at, Info1) of
|
||||
true -> ", disconnected_at=~w)~n";
|
||||
false -> ")~n"
|
||||
end,
|
||||
[format(K, maps:get(K, Info1)) || K <- InfoKeys]
|
||||
);
|
||||
print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
|
||||
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
|
||||
print({emqx_route, #route{topic = Topic, dest = Node}}) ->
|
||||
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
|
||||
|
||||
print(#plugin{name = Name, descr = Descr, active = Active}) ->
|
||||
emqx_ctl:print("Plugin(~ts, description=~ts, active=~ts)~n",
|
||||
[Name, Descr, Active]);
|
||||
|
||||
emqx_ctl:print(
|
||||
"Plugin(~ts, description=~ts, active=~ts)~n",
|
||||
[Name, Descr, Active]
|
||||
);
|
||||
print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) ->
|
||||
emqx_ctl:print("~ts -> ~ts~n", [maps:get(subid, Options), Topic]).
|
||||
|
||||
format(_, undefined) ->
|
||||
undefined;
|
||||
|
||||
format(peername, {IPAddr, Port}) ->
|
||||
IPStr = emqx_mgmt_util:ntoa(IPAddr),
|
||||
io_lib:format("~ts:~p", [IPStr, Port]);
|
||||
|
||||
format(_, Val) ->
|
||||
Val.
|
||||
|
||||
|
|
|
@ -50,15 +50,25 @@ t_crud_listeners_by_id(_) ->
|
|||
%% create
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
|
||||
[#{<<"listeners">> := [Create]}] = request(put, NewPath, [], TcpListener#{
|
||||
NewConf = TcpListener#{
|
||||
<<"id">> => NewListenerId,
|
||||
<<"bind">> => <<"0.0.0.0:2883">>
|
||||
}),
|
||||
},
|
||||
[#{<<"listeners">> := [Create]}] = request(put, NewPath, [], NewConf),
|
||||
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
|
||||
[#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []),
|
||||
?assertMatch(Create, Get1),
|
||||
?assert(is_running(NewListenerId)),
|
||||
|
||||
%% bad create(same port)
|
||||
BadId = <<"tcp:bad">>,
|
||||
BadPath = emqx_mgmt_api_test_util:api_path(["listeners", BadId]),
|
||||
BadConf = TcpListener#{
|
||||
<<"id">> => BadId,
|
||||
<<"bind">> => <<"0.0.0.0:2883">>
|
||||
},
|
||||
?assertEqual({error, {"HTTP/1.1", 400, "Bad Request"}}, request(put, BadPath, [], BadConf)),
|
||||
|
||||
%% update
|
||||
#{<<"acceptors">> := Acceptors} = Create,
|
||||
Acceptors1 = Acceptors + 10,
|
||||
|
@ -144,51 +154,8 @@ delete(Url) ->
|
|||
{ok, Res} = emqx_mgmt_api_test_util:request_api(delete, Url, AuthHeader),
|
||||
Res.
|
||||
|
||||
get_api(Path) ->
|
||||
{ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path),
|
||||
LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()),
|
||||
case emqx_json:decode(ListenersData, [return_maps]) of
|
||||
[#{<<"node">> := _, <<"listeners">> := [Listener]}] ->
|
||||
ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8),
|
||||
Filter =
|
||||
fun(Local) ->
|
||||
maps:get(id, Local) =:= ID
|
||||
end,
|
||||
LocalListener = hd(lists:filter(Filter, LocalListeners)),
|
||||
comparison_listener(LocalListener, Listener);
|
||||
[#{<<"node">> := _, <<"listeners">> := Listeners}] when is_list(Listeners) ->
|
||||
?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)),
|
||||
Fun =
|
||||
fun(LocalListener) ->
|
||||
ID = maps:get(id, LocalListener),
|
||||
IDBinary = atom_to_binary(ID, utf8),
|
||||
Filter =
|
||||
fun(Listener) ->
|
||||
maps:get(<<"id">>, Listener) =:= IDBinary
|
||||
end,
|
||||
Listener = hd(lists:filter(Filter, Listeners)),
|
||||
comparison_listener(LocalListener, Listener)
|
||||
end,
|
||||
lists:foreach(Fun, LocalListeners);
|
||||
Listener when is_map(Listener) ->
|
||||
ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8),
|
||||
Filter =
|
||||
fun(Local) ->
|
||||
maps:get(id, Local) =:= ID
|
||||
end,
|
||||
LocalListener = hd(lists:filter(Filter, LocalListeners)),
|
||||
comparison_listener(LocalListener, Listener)
|
||||
end.
|
||||
|
||||
comparison_listener(Local, Response) ->
|
||||
?assertEqual(maps:get(id, Local), binary_to_atom(maps:get(<<"id">>, Response))),
|
||||
?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))),
|
||||
?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)),
|
||||
?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)).
|
||||
|
||||
|
||||
listener_stats(Listener, ExpectedStats) ->
|
||||
?assertEqual(ExpectedStats, maps:get(<<"running">>, Listener)).
|
||||
|
||||
is_running(Id) ->
|
||||
emqx_listeners:is_running(Id).
|
||||
emqx_listeners:is_running(binary_to_atom(Id)).
|
||||
|
|
|
@ -92,7 +92,8 @@ do_request_api(Method, Request)->
|
|||
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
|
||||
when Code >= 200 andalso Code =< 299 ->
|
||||
{ok, Return};
|
||||
{ok, {Reason, _, _}} ->
|
||||
{ok, {Reason, _, _} = Error} ->
|
||||
ct:pal("error: ~p~n", [Error]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
|
|
Loading…
Reference in New Issue