add 'routes' command, improve usage

This commit is contained in:
Feng 2016-03-08 13:28:11 +08:00
parent fb273f0eca
commit 0f1347a495
1 changed files with 95 additions and 70 deletions

View File

@ -28,9 +28,9 @@
-export([load/0]). -export([load/0]).
-export([status/1, broker/1, cluster/1, users/1, bridges/1, -export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,
clients/1, sessions/1, topics/1, subscriptions/1, routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,
plugins/1, listeners/1, vm/1, mnesia/1, trace/1]). listeners/1, vm/1, mnesia/1, trace/1]).
-define(PROC_INFOKEYS, [status, -define(PROC_INFOKEYS, [status,
memory, memory,
@ -67,7 +67,7 @@ status([]) ->
?PRINT("emqttd ~s is running~n", [Vsn]) ?PRINT("emqttd ~s is running~n", [Vsn])
end; end;
status(_) -> status(_) ->
?PRINT_CMD("status", "query broker status"). ?PRINT_CMD("status", "Show broker status").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Query broker %% @doc Query broker
@ -98,10 +98,10 @@ broker(["pubsub"]) ->
end, lists:reverse(Pubsubs)); end, lists:reverse(Pubsubs));
broker(_) -> broker(_) ->
?USAGE([{"broker", "query broker version, uptime and description"}, ?USAGE([{"broker", "Show broker version, uptime and description"},
{"broker pubsub", "query process_info of pubsub"}, {"broker pubsub", "Show process_info of pubsub"},
{"broker stats", "query broker statistics of clients, topics, subscribers"}, {"broker stats", "Show broker statistics of clients, topics, subscribers"},
{"broker metrics", "query broker metrics"}]). {"broker metrics", "Show broker metrics"}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Cluster with other nodes %% @doc Cluster with other nodes
@ -157,9 +157,9 @@ clients(["kick", ClientId]) ->
if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end); if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end);
clients(_) -> clients(_) ->
?USAGE([{"clients list", "list all clients"}, ?USAGE([{"clients list", "List all clients"},
{"clients show <ClientId>", "show a client"}, {"clients show <ClientId>", "Show a client"},
{"clients kick <ClientId>", "kick a client"}]). {"clients kick <ClientId>", "Kick out a client"}]).
if_client(ClientId, Fun) -> if_client(ClientId, Fun) ->
case emqttd_cm:lookup(bin(ClientId)) of case emqttd_cm:lookup(bin(ClientId)) of
@ -191,10 +191,23 @@ sessions(["show", ClientId]) ->
end; end;
sessions(_) -> sessions(_) ->
?USAGE([{"sessions list", "list all sessions"}, ?USAGE([{"sessions list", "List all sessions"},
{"sessions list persistent", "list all persistent sessions"}, {"sessions list persistent", "List all persistent sessions"},
{"sessions list transient", "list all transient sessions"}, {"sessions list transient", "List all transient sessions"},
{"sessions show <ClientId>", "show a session"}]). {"sessions show <ClientId>", "Show a session"}]).
%%--------------------------------------------------------------------
%% @doc Routes Command
routes(["list"]) ->
Print = fun(Topic, Records) -> print(route, Topic, Records) end,
if_could_print(route, Print);
routes(["show", Topic]) ->
print(route, Topic, mnesia:dirty_read(route, bin(Topic)));
routes(_) ->
?USAGE([{"routes list", "List all routes"},
{"routes show <Topic>", "Show a route"}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Topics Command %% @doc Topics Command
@ -206,48 +219,54 @@ topics(["show", Topic]) ->
print(topic, Topic, ets:lookup(topic, bin(Topic))); print(topic, Topic, ets:lookup(topic, bin(Topic)));
topics(_) -> topics(_) ->
?USAGE([{"topics list", "list all topics"}, ?USAGE([{"topics list", "List all topics"},
{"topics show <Topic>", "show a topic"}]). {"topics show <Topic>", "Show a topic"}]).
subscriptions(["list"]) -> subscriptions(["list"]) ->
Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end,
if_subscription(fun() -> if_could_print(subscription, Print) end); if_could_print(subscription, Print);
subscriptions(["list", "static"]) ->
Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end,
if_could_print(static_subscription, Print);
subscriptions(["show", ClientId]) -> subscriptions(["show", ClientId]) ->
if_subscription(fun() -> case mnesia:dirty_read(subscription, bin(ClientId)) of
case emqttd_pubsub:lookup(subscription, bin(ClientId)) of [] -> ?PRINT_MSG("Not Found.~n");
[] -> ?PRINT_MSG("Not Found.~n"); Records -> print(subscription, ClientId, Records)
Records -> print(subscription, ClientId, Records) end;
end
end);
subscriptions(["add", ClientId, Topic, QoS]) -> subscriptions(["add", ClientId, Topic, QoS]) ->
Create = fun(IntQos) -> Add = fun(IntQos) ->
Subscription = {bin(ClientId), bin(Topic), IntQos}, Subscription = #mqtt_subscription{subid = bin(ClientId),
case emqttd_pubsub:create(subscription, Subscription) of topic = bin(Topic),
ok -> ?PRINT_MSG("ok~n"); qos = IntQos},
{error, Error} -> ?PRINT("Error: ~p~n", [Error]) case emqttd_backend:add_static_subscription(Subscription) of
end {atomic, ok} ->
end, ?PRINT_MSG("ok~n");
if_subscription(fun() -> if_valid_qos(QoS, Create) end); {aborted, {error, existed}} ->
?PRINT_MSG("Error: already existed~n");
{aborted, Reason} ->
?PRINT("Error: ~p~n", [Reason])
end
end,
if_valid_qos(QoS, Add);
subscriptions(["del", ClientId]) ->
Ok = emqttd_backend:del_static_subscriptions(bin(ClientId)),
?PRINT("~p~n", [Ok]);
subscriptions(["del", ClientId, Topic]) -> subscriptions(["del", ClientId, Topic]) ->
if_subscription(fun() -> Ok = emqttd_backend:del_static_subscription(bin(ClientId), bin(Topic)),
Ok = emqttd_pubsub:delete(subscription, {bin(ClientId), bin(Topic)}), ?PRINT("~p~n", [Ok]);
?PRINT("~p~n", [Ok])
end);
subscriptions(_) -> subscriptions(_) ->
?USAGE([{"subscriptions list", "list all subscriptions"}, ?USAGE([{"subscriptions list", "List all subscriptions"},
{"subscriptions show <ClientId>", "show subscriptions of a client"}, {"subscriptions list static", "List all static subscriptions"},
{"subscriptions add <ClientId> <Topic> <QoS>", "add subscription"}, {"subscriptions show <ClientId>", "Show subscriptions of a client"},
{"subscriptions del <ClientId> <Topic>", "delete subscription"}]). {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
{"subscriptions del <ClientId>", "Delete static subscriptions manually"},
if_subscription(Fun) -> {"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
case ets:info(subscription, name) of
undefined -> ?PRINT_MSG("Error: subscription table not found!~n");
_ -> Fun()
end.
if_could_print(Tab, Fun) -> if_could_print(Tab, Fun) ->
case mnesia:table_info(Tab, size) of case mnesia:table_info(Tab, size) of
@ -286,9 +305,9 @@ plugins(["unload", Name]) ->
end; end;
plugins(_) -> plugins(_) ->
?USAGE([{"plugins list", "show loaded plugins"}, ?USAGE([{"plugins list", "Show loaded plugins"},
{"plugins load <Plugin>", "load plugin"}, {"plugins load <Plugin>", "Load plugin"},
{"plugins unload <Plugin>", "unload plugin"}]). {"plugins unload <Plugin>", "Unload plugin"}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Bridges command %% @doc Bridges command
@ -326,11 +345,11 @@ bridges(["stop", SNode, Topic]) ->
end; end;
bridges(_) -> bridges(_) ->
?USAGE([{"bridges list", "query bridges"}, ?USAGE([{"bridges list", "List bridges"},
{"bridges options", "bridge options"}, {"bridges options", "Bridge options"},
{"bridges start <Node> <Topic>", "start bridge"}, {"bridges start <Node> <Topic>", "Start a bridge"},
{"bridges start <Node> <Topic> <Options>", "start bridge with options"}, {"bridges start <Node> <Topic> <Options>", "Start a bridge with options"},
{"bridges stop <Node> <Topic>", "stop bridge"}]). {"bridges stop <Node> <Topic>", "Stop a bridge"}]).
parse_opts(Cmd, OptStr) -> parse_opts(Cmd, OptStr) ->
Tokens = string:tokens(OptStr, ","), Tokens = string:tokens(OptStr, ","),
@ -373,11 +392,11 @@ vm(["io"]) ->
end, [max_fds, active_fds]); end, [max_fds, active_fds]);
vm(_) -> vm(_) ->
?USAGE([{"vm all", "query info of erlang vm"}, ?USAGE([{"vm all", "Show info of erlang vm"},
{"vm load", "query load of erlang vm"}, {"vm load", "Show load of erlang vm"},
{"vm memory", "query memory of erlang vm"}, {"vm memory", "Show memory of erlang vm"},
{"vm process", "query process of erlang vm"}, {"vm process", "Show process of erlang vm"},
{"vm io", "queue io of erlang vm"}]). {"vm io", "Show IO of erlang vm"}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc mnesia Command %% @doc mnesia Command
@ -385,7 +404,7 @@ mnesia([]) ->
mnesia:system_info(); mnesia:system_info();
mnesia(_) -> mnesia(_) ->
?PRINT_CMD("mnesia", "mnesia system info"). ?PRINT_CMD("mnesia", "Mnesia system info").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Trace Command %% @doc Trace Command
@ -407,11 +426,11 @@ trace(["topic", Topic, LogFile]) ->
trace_on(topic, Topic, LogFile); trace_on(topic, Topic, LogFile);
trace(_) -> trace(_) ->
?USAGE([{"trace list", "query all traces"}, ?USAGE([{"trace list", "List all traces"},
{"trace client <ClientId> <LogFile>","trace client with ClientId"}, {"trace client <ClientId> <LogFile>","Trace a client"},
{"trace client <ClientId> off", "stop tracing client"}, {"trace client <ClientId> off", "Stop tracing a client"},
{"trace topic <Topic> <LogFile>", "trace topic with Topic"}, {"trace topic <Topic> <LogFile>", "Trace a topic"},
{"trace topic <Topic> off", "stop tracing Topic"}]). {"trace topic <Topic> off", "Stop tracing a Topic"}]).
trace_on(Who, Name, LogFile) -> trace_on(Who, Name, LogFile) ->
case emqttd_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of case emqttd_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of
@ -444,7 +463,7 @@ listeners([]) ->
end, esockd:listeners()); end, esockd:listeners());
listeners(_) -> listeners(_) ->
?PRINT_CMD("listeners", "query broker listeners"). ?PRINT_CMD("listeners", "List listeners").
print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
@ -458,8 +477,11 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess,
emqttd_net:format(Peername), emqttd_net:format(Peername),
emqttd_time:now_to_secs(ConnectedAt)]); emqttd_time:now_to_secs(ConnectedAt)]);
print(#mqtt_topic{topic = Topic, node = Node}) -> print(#mqtt_topic{topic = Topic, flags = Flags}) ->
?PRINT("~s on ~s~n", [Topic, Node]); ?PRINT("~s: ~p~n", [Topic, Flags]);
print(#mqtt_route{topic = Topic, node = Node}) ->
?PRINT("~s: ~s~n", [Topic, Node]);
print({{ClientId, _ClientPid}, SessInfo}) -> print({{ClientId, _ClientPid}, SessInfo}) ->
InfoKeys = [clean_sess, InfoKeys = [clean_sess,
@ -477,10 +499,13 @@ print({{ClientId, _ClientPid}, SessInfo}) ->
"created_at=~w)~n", "created_at=~w)~n",
[ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]).
print(topic, Topic, Records) -> print(route, Topic, Routes) ->
Nodes = [Node || #mqtt_topic{node = Node} <- Records], Nodes = [Node || #mqtt_route{node = Node} <- Routes],
?PRINT("~s: ~p~n", [Topic, Nodes]); ?PRINT("~s: ~p~n", [Topic, Nodes]);
print(topic, _Topic, Records) ->
[print(R) || R <- Records];
print(subscription, ClientId, Subscriptions) -> print(subscription, ClientId, Subscriptions) ->
TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
?PRINT("~s: ~p~n", [ClientId, TopicTable]). ?PRINT("~s: ~p~n", [ClientId, TopicTable]).