%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mgmt_cli). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include("emqx_mgmt.hrl"). -elvis([{elvis_style, invalid_dynamic_call, disable}]). -define(PRINT_CMD(Cmd, Desc), io:format("~-48s# ~s~n", [Cmd, Desc])). -export([load/0]). -export([ status/1 , broker/1 , cluster/1 , clients/1 , routes/1 , subscriptions/1 , plugins/1 , listeners/1 , vm/1 , mnesia/1 , trace/1 , traces/1 , log/1 , mgmt/1 , data/1 , acl/1 ]). -define(PROC_INFOKEYS, [status, memory, message_queue_len, total_heap_size, heap_size, stack_size, reductions]). -define(MAX_LIMIT, 10000). -define(APP, emqx). -spec(load() -> ok). load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). is_cmd(Fun) -> not lists:member(Fun, [init, load, module_info]). mgmt(["insert", AppId, Name]) -> case emqx_mgmt_auth:add_app(list_to_binary(AppId), list_to_binary(Name)) of {ok, Secret} -> emqx_ctl:print("AppSecret: ~s~n", [Secret]); {error, already_existed} -> emqx_ctl:print("Error: already existed~n"); {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason]) end; mgmt(["lookup", AppId]) -> case emqx_mgmt_auth:lookup_app(list_to_binary(AppId)) of undefined -> emqx_ctl:print("Not Found.~n"); App -> print_app_info(App) end; mgmt(["update", AppId, Status]) -> case emqx_mgmt_auth:update_app(list_to_binary(AppId), list_to_atom(Status)) of ok -> emqx_ctl:print("update successfully.~n"); {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason]) end; mgmt(["delete", AppId]) -> case emqx_mgmt_auth:del_app(list_to_binary(AppId)) of ok -> emqx_ctl:print("ok~n"); {error, not_found} -> emqx_ctl:print("Error: app not found~n"); {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason]) end; mgmt(["list"]) -> lists:foreach(fun print_app_info/1, emqx_mgmt_auth:list_apps()); mgmt(_) -> emqx_ctl:usage([{"mgmt list", "List Applications"}, {"mgmt insert ", "Add Application of REST API"}, {"mgmt update ", "Update Application of REST API"}, {"mgmt lookup ", "Get Application of REST API"}, {"mgmt delete ", "Delete Application of REST API"}]). %%-------------------------------------------------------------------- %% @doc Node status status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), emqx_ctl:print("Node ~p ~s is ~p~n", [node(), emqx_app:get_release(), InternalStatus]); status(_) -> emqx_ctl:usage("status", "Show broker status"). %%-------------------------------------------------------------------- %% @doc Query broker broker([]) -> Funs = [sysdescr, version, uptime, datetime], [emqx_ctl:print("~-10s: ~s~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs]; broker(["stats"]) -> [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) || {Stat, Val} <- lists:sort(emqx_stats:getstats())]; broker(["metrics"]) -> [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqx_metrics:all())]; broker(_) -> emqx_ctl:usage([{"broker", "Show broker version, uptime and description"}, {"broker stats", "Show broker statistics of clients, topics, subscribers"}, {"broker metrics", "Show broker metrics"}]). %%----------------------------------------------------------------------------- %% @doc Cluster with other nodes cluster(["join", SNode]) -> case ekka:join(ekka_node:parse_name(SNode)) of ok -> emqx_ctl:print("Join the cluster successfully.~n"), cluster(["status"]); ignore -> emqx_ctl:print("Ignore.~n"); {error, Error} -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; cluster(["leave"]) -> case ekka:leave() of ok -> emqx_ctl:print("Leave the cluster successfully.~n"), cluster(["status"]); {error, Error} -> emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error]) end; cluster(["force-leave", SNode]) -> case ekka:force_leave(ekka_node:parse_name(SNode)) of ok -> emqx_ctl:print("Remove the node from cluster successfully.~n"), cluster(["status"]); ignore -> emqx_ctl:print("Ignore.~n"); {error, Error} -> emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error]) end; cluster(["status"]) -> emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]); cluster(_) -> emqx_ctl:usage([{"cluster join ", "Join the cluster"}, {"cluster leave", "Leave the cluster"}, {"cluster force-leave ","Force the node leave from cluster"}, {"cluster status", "Cluster status"}]). %%-------------------------------------------------------------------- %% @doc Query clients clients(["list"]) -> dump(emqx_channel, client); clients(["show", ClientId]) -> if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> ok = emqx_cm:kick_session(bin(ClientId)), emqx_ctl:print("ok~n"); clients(_) -> emqx_ctl:usage([{"clients list", "List all clients"}, {"clients show ", "Show a client"}, {"clients kick ", "Kick out a client"}]). if_client(ClientId, Fun) -> case ets:lookup(emqx_channel, (bin(ClientId))) of [] -> emqx_ctl:print("Not Found.~n"); [Channel] -> Fun({client, Channel}) end. %%-------------------------------------------------------------------- %% @doc Routes Command routes(["list"]) -> dump(emqx_route); routes(["show", Topic]) -> Routes = ets:lookup(emqx_route, bin(Topic)), [print({emqx_route, Route}) || Route <- Routes]; routes(_) -> emqx_ctl:usage([{"routes list", "List all routes"}, {"routes show ", "Show a route"}]). subscriptions(["list"]) -> lists:foreach(fun(Suboption) -> print({emqx_suboption, Suboption}) end, ets:tab2list(emqx_suboption)); subscriptions(["show", ClientId]) -> case ets:lookup(emqx_subid, bin(ClientId)) of [] -> emqx_ctl:print("Not Found.~n"); [{_, Pid}] -> case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] end end; subscriptions(["add", ClientId, Topic, QoS]) -> if_valid_qos(QoS, fun(IntQos) -> case ets:lookup(emqx_channel, bin(ClientId)) of [] -> emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> {Topic1, Options} = emqx_topic:parse(bin(Topic)), Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]}, emqx_ctl:print("ok~n") end end); subscriptions(["del", ClientId, Topic]) -> case ets:lookup(emqx_channel, bin(ClientId)) of [] -> emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]}, emqx_ctl:print("ok~n") end; subscriptions(_) -> emqx_ctl:usage([{"subscriptions list", "List all subscriptions"}, {"subscriptions show ", "Show subscriptions of a client"}, {"subscriptions add ", "Add a static subscription manually"}, {"subscriptions del ", "Delete a static subscription manually"}]). if_valid_qos(QoS, Fun) -> try list_to_integer(QoS) of Int when ?IS_QOS(Int) -> Fun(Int); _ -> emqx_ctl:print("QoS should be 0, 1, 2~n") catch _:_ -> emqx_ctl:print("QoS should be 0, 1, 2~n") end. plugins(["list"]) -> lists:foreach(fun print/1, emqx_plugins:list()); plugins(["load", Name]) -> case emqx_plugins:load(list_to_atom(Name)) of ok -> emqx_ctl:print("Plugin ~s loaded successfully.~n", [Name]); {error, Reason} -> emqx_ctl:print("Load plugin ~s error: ~p.~n", [Name, Reason]) end; plugins(["unload", "emqx_management"])-> emqx_ctl:print("Plugin emqx_management can not be unloaded.~n"); plugins(["unload", Name]) -> case emqx_plugins:unload(list_to_atom(Name)) of ok -> emqx_ctl:print("Plugin ~s unloaded successfully.~n", [Name]); {error, Reason} -> emqx_ctl:print("Unload plugin ~s error: ~p.~n", [Name, Reason]) end; plugins(["reload", Name]) -> try list_to_existing_atom(Name) of PluginName -> case emqx_mgmt:reload_plugin(node(), PluginName) of ok -> emqx_ctl:print("Plugin ~s reloaded successfully.~n", [Name]); {error, Reason} -> emqx_ctl:print("Reload plugin ~s error: ~p.~n", [Name, Reason]) end catch error:badarg -> emqx_ctl:print("Reload plugin ~s error: The plugin doesn't exist.~n", [Name]) end; plugins(_) -> emqx_ctl:usage([{"plugins list", "Show loaded plugins"}, {"plugins load ", "Load plugin"}, {"plugins unload ", "Unload plugin"}, {"plugins reload ", "Reload plugin"} ]). %%-------------------------------------------------------------------- %% @doc vm command vm([]) -> vm(["all"]); vm(["all"]) -> [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]]; vm(["load"]) -> [emqx_ctl:print("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqx_vm:loads()]; vm(["memory"]) -> [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; vm(["process"]) -> [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{limit, process_limit}, {count, process_count}]]; vm(["io"]) -> IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))), [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) || Key <- [max_fds, active_fds]]; vm(["ports"]) -> [emqx_ctl:print("ports/~-16s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{count, port_count}, {limit, port_limit}]]; vm(_) -> emqx_ctl:usage([{"vm all", "Show info of Erlang VM"}, {"vm load", "Show load of Erlang VM"}, {"vm memory", "Show memory of Erlang VM"}, {"vm process", "Show process of Erlang VM"}, {"vm io", "Show IO of Erlang VM"}, {"vm ports", "Show Ports of Erlang VM"}]). %%-------------------------------------------------------------------- %% @doc mnesia Command mnesia([]) -> mnesia:system_info(); mnesia(_) -> emqx_ctl:usage([{"mnesia", "Mnesia system info"}]). %%-------------------------------------------------------------------- %% @doc Logger Command log(["set-level", Level]) -> case emqx_logger:set_log_level(list_to_atom(Level)) of ok -> emqx_ctl:print("~s~n", [Level]); Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error]) end; log(["primary-level"]) -> Level = emqx_logger:get_primary_log_level(), emqx_ctl:print("~s~n", [Level]); log(["primary-level", Level]) -> _ = emqx_logger:set_primary_log_level(list_to_atom(Level)), emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]); log(["handlers", "list"]) -> _ = [emqx_ctl:print("LogHandler(id=~s, level=~s, destination=~s, status=~s)~n", [Id, Level, Dst, Status]) || #{id := Id, level := Level, dst := Dst, status := Status} <- emqx_logger:get_log_handlers()], ok; log(["handlers", "start", HandlerId]) -> case emqx_logger:start_log_handler(list_to_atom(HandlerId)) of ok -> emqx_ctl:print("log handler ~s started~n", [HandlerId]); {error, Reason} -> emqx_ctl:print("[error] failed to start log handler ~s: ~p~n", [HandlerId, Reason]) end; log(["handlers", "stop", HandlerId]) -> case emqx_logger:stop_log_handler(list_to_atom(HandlerId)) of ok -> emqx_ctl:print("log handler ~s stopped~n", [HandlerId]); {error, Reason} -> emqx_ctl:print("[error] failed to stop log handler ~s: ~p~n", [HandlerId, Reason]) end; log(["handlers", "set-level", HandlerId, Level]) -> case emqx_logger:set_log_handler_level(list_to_atom(HandlerId), list_to_atom(Level)) of ok -> #{level := NewLevel} = emqx_logger:get_log_handler(list_to_atom(HandlerId)), emqx_ctl:print("~s~n", [NewLevel]); {error, Error} -> emqx_ctl:print("[error] ~p~n", [Error]) end; log(_) -> emqx_ctl:usage([{"log set-level ", "Set the overall log level"}, {"log primary-level", "Show the primary log level now"}, {"log primary-level ","Set the primary log level"}, {"log handlers list", "Show log handlers"}, {"log handlers start ", "Start a log handler"}, {"log handlers stop ", "Stop a log handler"}, {"log handlers set-level ", "Set log level of a log handler"}]). %%-------------------------------------------------------------------- %% @doc Trace Command trace(["list"]) -> lists:foreach(fun(Trace) -> #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace, emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) end, emqx_trace_handler:running()); trace(["stop", Operation, ClientId]) -> case trace_type(Operation) of {ok, Type} -> trace_off(Type, ClientId); error -> trace([]) end; trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); trace(["start", Operation, ClientId, LogFile, Level]) -> case trace_type(Operation) of {ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile); error -> trace([]) end; trace(_) -> emqx_ctl:usage([{"trace list", "List all traces started on local node"}, {"trace start client []", "Traces for a client on local node"}, {"trace stop client ", "Stop tracing for a client on local node"}, {"trace start topic [] ", "Traces for a topic on local node"}, {"trace stop topic ", "Stop tracing for a topic on local node"}, {"trace start ip_address [] ", "Traces for a client ip on local node"}, {"trace stop ip_addresss ", "Stop tracing for a client ip on local node"} ]). trace_on(Who, Name, Level, LogFile) -> case emqx_trace_handler:install(Who, Name, Level, LogFile) of ok -> emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]); {error, Error} -> emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Who, Name, Error]) end. trace_off(Who, Name) -> case emqx_trace_handler:uninstall(Who, Name) of ok -> emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]); {error, Error} -> emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error]) end. %%-------------------------------------------------------------------- %% @doc Trace Cluster Command traces(["list"]) -> {ok, List} = emqx_trace_api:list_trace(get, []), case List of [] -> emqx_ctl:print("Cluster Trace is empty~n", []); _ -> lists:foreach(fun(Trace) -> #{type := Type, name := Name, status := Status, log_size := LogSize} = Trace, emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n", [Name, Type, maps:get(Type, Trace), Status, LogSize]) end, List) end, length(List); traces(["stop", Name]) -> trace_cluster_off(Name); traces(["delete", Name]) -> trace_cluster_del(Name); traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter, "900"]); traces(["start", Name, Operation, Filter, DurationS]) -> case trace_type(Operation) of {ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS); error -> traces([]) end; traces(_) -> emqx_ctl:usage([{"traces list", "List all cluster traces started"}, {"traces start client ", "Traces for a client in cluster"}, {"traces start topic ", "Traces for a topic in cluster"}, {"traces start ip_address ", "Traces for a IP in cluster"}, {"traces stop ", "Stop trace in cluster"}, {"traces delete ", "Delete trace in cluster"} ]). trace_cluster_on(Name, Type, Filter, DurationS0) -> case erlang:whereis(emqx_trace) of undefined -> emqx_ctl:print("[error] Tracer module not started~n" "Please run `emqx_ctl modules start tracer` " "or `emqx_ctl modules start emqx_mod_trace` first~n", []); _ -> DurationS = list_to_integer(DurationS0), Now = erlang:system_time(second), Trace = #{ name => list_to_binary(Name) , type => atom_to_binary(Type) , Type => list_to_binary(Filter) , start_at => list_to_binary(calendar:system_time_to_rfc3339(Now)) , end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS)) }, case emqx_trace:create(Trace) of ok -> emqx_ctl:print("Cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]); {error, Error} -> emqx_ctl:print("[error] Cluster_trace ~s ~s=~s ~p~n", [Name, Type, Filter, Error]) end end. trace_cluster_del(Name) -> case emqx_trace:delete(list_to_binary(Name)) of ok -> emqx_ctl:print("Del cluster_trace ~s successfully~n", [Name]); {error, Error} -> emqx_ctl:print("[error] Del cluster_trace ~s: ~p~n", [Name, Error]) end. trace_cluster_off(Name) -> case emqx_trace:update(list_to_binary(Name), false) of ok -> emqx_ctl:print("Stop cluster_trace ~s successfully~n", [Name]); {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error]) end. trace_type("client") -> {ok, clientid}; trace_type("topic") -> {ok, topic}; trace_type("ip_address") -> {ok, ip_address}; trace_type(_) -> error. %%-------------------------------------------------------------------- %% @doc Listeners Command listeners([]) -> lists:foreach(fun({{Protocol, ListenOn}, _Pid}) -> Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}}, {acceptors, esockd:get_acceptors({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})}, {current_conn, esockd:get_current_connections({Protocol, ListenOn})}, {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})} ], emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]), lists:foreach(fun indent_print/1, Info) end, esockd:listeners()), lists:foreach(fun({Protocol, Opts}) -> Port = proplists:get_value(port, Opts), Acceptors = maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0), Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}}, {acceptors, Acceptors}, {max_conns, proplists:get_value(max_connections, Opts)}, {current_conn, proplists:get_value(all_connections, Opts)}, {shutdown_count, []}], emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]), lists:foreach(fun indent_print/1, Info) end, ranch:info()); listeners(["stop", Name = "http" ++ _N | _MaybePort]) -> %% _MaybePort is to be backward compatible, to stop http listener, %% there is no need for the port number case minirest:stop_http(list_to_atom(Name)) of ok -> emqx_ctl:print("Stop ~s listener successfully.~n", [Name]); {error, Error} -> emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [Name, Error]) end; listeners(["stop", "mqtt:" ++ _ = Identifier]) -> stop_listener(emqx_listeners:find_by_id(Identifier), Identifier); listeners(["stop", _Proto, ListenOn]) -> %% this clause is kept to be backward compatible ListenOn1 = case string:tokens(ListenOn, ":") of [Port] -> list_to_integer(Port); [IP, Port] -> {IP, list_to_integer(Port)} end, stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1); listeners(["restart", "http:management"]) -> restart_http_listener(http, emqx_management); listeners(["restart", "https:management"]) -> restart_http_listener(https, emqx_management); listeners(["restart", "http:dashboard"]) -> restart_http_listener(http, emqx_dashboard); listeners(["restart", "https:dashboard"]) -> restart_http_listener(https, emqx_dashboard); listeners(["restart", Identifier]) -> case emqx_listeners:restart_listener(Identifier) of ok -> emqx_ctl:print("Restarted ~s listener successfully.~n", [Identifier]); {error, Error} -> emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [Identifier, Error]) end; listeners(_) -> emqx_ctl:usage([{"listeners", "List listeners"}, {"listeners stop ", "Stop a listener"}, {"listeners stop ", "Stop a listener"}, {"listeners restart ", "Restart a listener"} ]). stop_listener(false, Input) -> emqx_ctl:print("No such listener ~p~n", [Input]); stop_listener(#{listen_on := ListenOn} = Listener, _Input) -> ID = emqx_listeners:identifier(Listener), ListenOnStr = emqx_listeners:format_listen_on(ListenOn), case emqx_listeners:stop_listener(Listener) of ok -> emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]); {error, Reason} -> emqx_ctl:print("Failed to stop ~s listener on ~s: ~0p~n", [ID, ListenOnStr, Reason]) end. %%-------------------------------------------------------------------- %% @doc data Command data(["export"]) -> case emqx_mgmt_data_backup:export() of {ok, #{filename := Filename}} -> emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [Filename]); {error, Reason} -> emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason]) end; data(["import", Filename]) -> data(["import", Filename, "--env", "{}"]); data(["import", Filename, "--env", Env]) -> case emqx_mgmt_data_backup:import(Filename, Env) of ok -> emqx_ctl:print("The emqx data has been imported successfully.~n"); {error, import_failed} -> emqx_ctl:print("The emqx data import failed.~n"); {error, unsupported_version} -> emqx_ctl:print("The emqx data import failed: Unsupported version.~n"); {error, Reason} -> emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename]) end; data(_) -> emqx_ctl:usage([{"data import [--env '']", "Import data from the specified file, possibly with overrides"}, {"data export", "Export data"}]). %%-------------------------------------------------------------------- %% @doc acl Command acl(["cache-clean", "node", Node]) -> case emqx_mgmt:clean_acl_cache_all(erlang:list_to_existing_atom(Node)) of ok -> emqx_ctl:print("ACL cache drain started on node ~s.~n", [Node]); {error, Reason} -> emqx_ctl:print("ACL drain failed on node ~s: ~0p.~n", [Node, Reason]) end; acl(["cache-clean", "all"]) -> case emqx_mgmt:clean_acl_cache_all() of ok -> emqx_ctl:print("Started ACL cache drain in all nodes~n"); {error, Reason} -> emqx_ctl:print("ACL cache-clean failed: ~p.~n", [Reason]) end; acl(["cache-clean", ClientId]) -> emqx_mgmt:clean_acl_cache(ClientId); acl(_) -> emqx_ctl:usage([{"acl cache-clean all", "Clears acl cache on all nodes"}, {"acl cache-clean node ", "Clears acl cache on given node"}, {"acl cache-clean ", "Clears acl cache for given client"} ]). %%-------------------------------------------------------------------- %% Dump ETS %%-------------------------------------------------------------------- dump(Table) -> dump(Table, Table, ets:first(Table), []). dump(Table, Tag) -> dump(Table, Tag, ets:first(Table), []). dump(_Table, _, '$end_of_table', Result) -> lists:reverse(Result); dump(Table, Tag, Key, Result) -> Ls = lists:foldl(fun(Record, Acc) -> try [print({Tag, Record}) | Acc] catch Class : Reason : Stk -> logger:error("Failed to print ~p, error: {~p, ~p}. " "Stacktrace: ~0p", [Record, Class, Reason, Stk]), Acc end end, [], ets:lookup(Table, Key)), dump(Table, Tag, ets:next(Table, Key), [lists:reverse(Ls) | Result]). print({_, []}) -> ok; print({client, {ClientId, ChanPid}}) -> Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of undefined -> #{}; Attrs0 -> Attrs0 end, Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of undefined -> #{}; Stats0 -> maps:from_list(Stats0) end, ClientInfo = maps:get(clientinfo, Attrs, #{}), ConnInfo = maps:get(conninfo, Attrs, #{}), Session = maps:get(session, Attrs, #{}), Connected = case maps:get(conn_state, Attrs, undefined) of connected -> true; _ -> false end, Info = lists:foldl(fun(Items, Acc) -> maps:merge(Items, Acc) end, #{connected => Connected}, [maps:with([subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, mqueue_len, mqueue_dropped, send_msg], Stats), maps:with([clientid, username], ClientInfo), maps:with([peername, clean_start, keepalive, expiry_interval, connected_at, disconnected_at], ConnInfo), maps:with([created_at], Session)]), InfoKeys = [clientid, username, peername, clean_start, keepalive, expiry_interval, subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of true -> [disconnected_at]; false -> [] end, emqx_ctl:print("Client(~s, username=~s, peername=~s, " "clean_start=~s, keepalive=~w, session_expiry_interval=~w, " "subscriptions=~w, inflight=~w, awaiting_rel=~w, " "delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, " "connected=~s, created_at=~w, connected_at=~w" ++ case maps:is_key(disconnected_at, Info) of true -> ", disconnected_at=~w)~n"; false -> ")~n" end, [format(K, maps:get(K, Info)) || K <- InfoKeys]); print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~s -> ~s~n", [Topic, Node]); print({emqx_route, #route{topic = Topic, dest = Node}}) -> emqx_ctl:print("~s -> ~s~n", [Topic, Node]); print(#plugin{name = Name, descr = Descr, active = Active}) -> emqx_ctl:print("Plugin(~s, description=~s, active=~s)~n", [Name, Descr, Active]); print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) -> emqx_ctl:print("~s -> ~s~n", [maps:get(subid, Options), Topic]). format(_, undefined) -> undefined; format(peername, {IPAddr, Port}) -> IPStr = emqx_mgmt_util:ntoa(IPAddr), io_lib:format("~s:~p", [IPStr, Port]); format(_, Val) -> Val. bin(S) -> iolist_to_binary(S). indent_print({Key, {string, Val}}) -> emqx_ctl:print(" ~-16s: ~s~n", [Key, Val]); indent_print({Key, Val}) -> emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]). listener_identifier(Protocol, ListenOn) -> case emqx_listeners:find_id_by_listen_on(ListenOn) of false -> atom_to_list(Protocol); ID -> ID end. restart_http_listener(Scheme, AppName) -> Listeners = application:get_env(AppName, listeners, []), case lists:keyfind(Scheme, 1, Listeners) of false -> emqx_ctl:print("Listener ~s not exists!~n", [AppName]); {Scheme, Port, Options} -> ModName = http_mod_name(AppName), ModName:stop_listener({Scheme, Port, Options}), ModName:start_listener({Scheme, Port, Options}) end. http_mod_name(emqx_management) -> emqx_mgmt_http; http_mod_name(Name) -> Name. print_app_info({AppId, AppSecret, Name, Desc, Status, Expired}) -> emqx_ctl:print("app_id: ~s, secret: ~s, name: ~s, desc: ~s, status: ~s, expired: ~p~n", [AppId, AppSecret, Name, Desc, Status, Expired]).