From 0f1347a495b91004f983aa184b302a74f7a7eb9a Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 8 Mar 2016 13:28:11 +0800 Subject: [PATCH] add 'routes' command, improve usage --- src/emqttd_cli.erl | 165 ++++++++++++++++++++++++++------------------- 1 file changed, 95 insertions(+), 70 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 11a0b23fd..6bfaf104b 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -28,9 +28,9 @@ -export([load/0]). --export([status/1, broker/1, cluster/1, users/1, bridges/1, - clients/1, sessions/1, topics/1, subscriptions/1, - plugins/1, listeners/1, vm/1, mnesia/1, trace/1]). +-export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1, + routes/1, topics/1, subscriptions/1, plugins/1, bridges/1, + listeners/1, vm/1, mnesia/1, trace/1]). -define(PROC_INFOKEYS, [status, memory, @@ -67,7 +67,7 @@ status([]) -> ?PRINT("emqttd ~s is running~n", [Vsn]) end; status(_) -> - ?PRINT_CMD("status", "query broker status"). + ?PRINT_CMD("status", "Show broker status"). %%-------------------------------------------------------------------- %% @doc Query broker @@ -98,10 +98,10 @@ broker(["pubsub"]) -> end, lists:reverse(Pubsubs)); broker(_) -> - ?USAGE([{"broker", "query broker version, uptime and description"}, - {"broker pubsub", "query process_info of pubsub"}, - {"broker stats", "query broker statistics of clients, topics, subscribers"}, - {"broker metrics", "query broker metrics"}]). + ?USAGE([{"broker", "Show broker version, uptime and description"}, + {"broker pubsub", "Show process_info of pubsub"}, + {"broker stats", "Show broker statistics of clients, topics, subscribers"}, + {"broker metrics", "Show broker metrics"}]). %%-------------------------------------------------------------------- %% @doc Cluster with other nodes @@ -157,9 +157,9 @@ clients(["kick", ClientId]) -> if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end); clients(_) -> - ?USAGE([{"clients list", "list all clients"}, - {"clients show ", "show a client"}, - {"clients kick ", "kick a client"}]). + ?USAGE([{"clients list", "List all clients"}, + {"clients show ", "Show a client"}, + {"clients kick ", "Kick out a client"}]). if_client(ClientId, Fun) -> case emqttd_cm:lookup(bin(ClientId)) of @@ -191,10 +191,23 @@ sessions(["show", ClientId]) -> end; sessions(_) -> - ?USAGE([{"sessions list", "list all sessions"}, - {"sessions list persistent", "list all persistent sessions"}, - {"sessions list transient", "list all transient sessions"}, - {"sessions show ", "show a session"}]). + ?USAGE([{"sessions list", "List all sessions"}, + {"sessions list persistent", "List all persistent sessions"}, + {"sessions list transient", "List all transient sessions"}, + {"sessions show ", "Show a session"}]). + +%%-------------------------------------------------------------------- +%% @doc Routes Command +routes(["list"]) -> + Print = fun(Topic, Records) -> print(route, Topic, Records) end, + if_could_print(route, Print); + +routes(["show", Topic]) -> + print(route, Topic, mnesia:dirty_read(route, bin(Topic))); + +routes(_) -> + ?USAGE([{"routes list", "List all routes"}, + {"routes show ", "Show a route"}]). %%-------------------------------------------------------------------- %% @doc Topics Command @@ -206,48 +219,54 @@ topics(["show", Topic]) -> print(topic, Topic, ets:lookup(topic, bin(Topic))); topics(_) -> - ?USAGE([{"topics list", "list all topics"}, - {"topics show ", "show a topic"}]). + ?USAGE([{"topics list", "List all topics"}, + {"topics show ", "Show a topic"}]). subscriptions(["list"]) -> Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, - if_subscription(fun() -> if_could_print(subscription, Print) end); + if_could_print(subscription, Print); + +subscriptions(["list", "static"]) -> + Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, + if_could_print(static_subscription, Print); subscriptions(["show", ClientId]) -> - if_subscription(fun() -> - case emqttd_pubsub:lookup(subscription, bin(ClientId)) of - [] -> ?PRINT_MSG("Not Found.~n"); - Records -> print(subscription, ClientId, Records) - end - end); + case mnesia:dirty_read(subscription, bin(ClientId)) of + [] -> ?PRINT_MSG("Not Found.~n"); + Records -> print(subscription, ClientId, Records) + end; subscriptions(["add", ClientId, Topic, QoS]) -> - Create = fun(IntQos) -> - Subscription = {bin(ClientId), bin(Topic), IntQos}, - case emqttd_pubsub:create(subscription, Subscription) of - ok -> ?PRINT_MSG("ok~n"); - {error, Error} -> ?PRINT("Error: ~p~n", [Error]) - end - end, - if_subscription(fun() -> if_valid_qos(QoS, Create) end); + Add = fun(IntQos) -> + Subscription = #mqtt_subscription{subid = bin(ClientId), + topic = bin(Topic), + qos = IntQos}, + case emqttd_backend:add_static_subscription(Subscription) of + {atomic, ok} -> + ?PRINT_MSG("ok~n"); + {aborted, {error, existed}} -> + ?PRINT_MSG("Error: already existed~n"); + {aborted, Reason} -> + ?PRINT("Error: ~p~n", [Reason]) + end + end, + if_valid_qos(QoS, Add); + +subscriptions(["del", ClientId]) -> + Ok = emqttd_backend:del_static_subscriptions(bin(ClientId)), + ?PRINT("~p~n", [Ok]); subscriptions(["del", ClientId, Topic]) -> - if_subscription(fun() -> - Ok = emqttd_pubsub:delete(subscription, {bin(ClientId), bin(Topic)}), - ?PRINT("~p~n", [Ok]) - end); + Ok = emqttd_backend:del_static_subscription(bin(ClientId), bin(Topic)), + ?PRINT("~p~n", [Ok]); subscriptions(_) -> - ?USAGE([{"subscriptions list", "list all subscriptions"}, - {"subscriptions show ", "show subscriptions of a client"}, - {"subscriptions add ", "add subscription"}, - {"subscriptions del ", "delete subscription"}]). - -if_subscription(Fun) -> - case ets:info(subscription, name) of - undefined -> ?PRINT_MSG("Error: subscription table not found!~n"); - _ -> Fun() - end. + ?USAGE([{"subscriptions list", "List all subscriptions"}, + {"subscriptions list static", "List all static subscriptions"}, + {"subscriptions show ", "Show subscriptions of a client"}, + {"subscriptions add ", "Add a static subscription manually"}, + {"subscriptions del ", "Delete static subscriptions manually"}, + {"subscriptions del ", "Delete a static subscription manually"}]). if_could_print(Tab, Fun) -> case mnesia:table_info(Tab, size) of @@ -286,9 +305,9 @@ plugins(["unload", Name]) -> end; plugins(_) -> - ?USAGE([{"plugins list", "show loaded plugins"}, - {"plugins load ", "load plugin"}, - {"plugins unload ", "unload plugin"}]). + ?USAGE([{"plugins list", "Show loaded plugins"}, + {"plugins load ", "Load plugin"}, + {"plugins unload ", "Unload plugin"}]). %%-------------------------------------------------------------------- %% @doc Bridges command @@ -326,11 +345,11 @@ bridges(["stop", SNode, Topic]) -> end; bridges(_) -> - ?USAGE([{"bridges list", "query bridges"}, - {"bridges options", "bridge options"}, - {"bridges start ", "start bridge"}, - {"bridges start ", "start bridge with options"}, - {"bridges stop ", "stop bridge"}]). + ?USAGE([{"bridges list", "List bridges"}, + {"bridges options", "Bridge options"}, + {"bridges start ", "Start a bridge"}, + {"bridges start ", "Start a bridge with options"}, + {"bridges stop ", "Stop a bridge"}]). parse_opts(Cmd, OptStr) -> Tokens = string:tokens(OptStr, ","), @@ -373,11 +392,11 @@ vm(["io"]) -> end, [max_fds, active_fds]); vm(_) -> - ?USAGE([{"vm all", "query info of erlang vm"}, - {"vm load", "query load of erlang vm"}, - {"vm memory", "query memory of erlang vm"}, - {"vm process", "query process of erlang vm"}, - {"vm io", "queue io of erlang vm"}]). + ?USAGE([{"vm all", "Show info of erlang vm"}, + {"vm load", "Show load of erlang vm"}, + {"vm memory", "Show memory of erlang vm"}, + {"vm process", "Show process of erlang vm"}, + {"vm io", "Show IO of erlang vm"}]). %%-------------------------------------------------------------------- %% @doc mnesia Command @@ -385,7 +404,7 @@ mnesia([]) -> mnesia:system_info(); mnesia(_) -> - ?PRINT_CMD("mnesia", "mnesia system info"). + ?PRINT_CMD("mnesia", "Mnesia system info"). %%-------------------------------------------------------------------- %% @doc Trace Command @@ -407,11 +426,11 @@ trace(["topic", Topic, LogFile]) -> trace_on(topic, Topic, LogFile); trace(_) -> - ?USAGE([{"trace list", "query all traces"}, - {"trace client ","trace client with ClientId"}, - {"trace client off", "stop tracing client"}, - {"trace topic ", "trace topic with Topic"}, - {"trace topic off", "stop tracing Topic"}]). + ?USAGE([{"trace list", "List all traces"}, + {"trace client ","Trace a client"}, + {"trace client off", "Stop tracing a client"}, + {"trace topic ", "Trace a topic"}, + {"trace topic off", "Stop tracing a Topic"}]). trace_on(Who, Name, LogFile) -> case emqttd_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of @@ -444,7 +463,7 @@ listeners([]) -> end, esockd:listeners()); listeners(_) -> - ?PRINT_CMD("listeners", "query broker listeners"). + ?PRINT_CMD("listeners", "List listeners"). print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", @@ -458,8 +477,11 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, emqttd_net:format(Peername), emqttd_time:now_to_secs(ConnectedAt)]); -print(#mqtt_topic{topic = Topic, node = Node}) -> - ?PRINT("~s on ~s~n", [Topic, Node]); +print(#mqtt_topic{topic = Topic, flags = Flags}) -> + ?PRINT("~s: ~p~n", [Topic, Flags]); + +print(#mqtt_route{topic = Topic, node = Node}) -> + ?PRINT("~s: ~s~n", [Topic, Node]); print({{ClientId, _ClientPid}, SessInfo}) -> InfoKeys = [clean_sess, @@ -477,10 +499,13 @@ print({{ClientId, _ClientPid}, SessInfo}) -> "created_at=~w)~n", [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). -print(topic, Topic, Records) -> - Nodes = [Node || #mqtt_topic{node = Node} <- Records], +print(route, Topic, Routes) -> + Nodes = [Node || #mqtt_route{node = Node} <- Routes], ?PRINT("~s: ~p~n", [Topic, Nodes]); +print(topic, _Topic, Records) -> + [print(R) || R <- Records]; + print(subscription, ClientId, Subscriptions) -> TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], ?PRINT("~s: ~p~n", [ClientId, TopicTable]).