topics, subscriptions

This commit is contained in:
Feng 2015-12-16 10:22:37 +08:00
parent b172a78fcd
commit d3ee464789
1 changed files with 108 additions and 30 deletions

View File

@ -29,6 +29,8 @@
-include("emqttd_cli.hrl"). -include("emqttd_cli.hrl").
-include("emqttd_protocol.hrl").
-import(lists, [foreach/2]). -import(lists, [foreach/2]).
-import(proplists, [get_value/2]). -import(proplists, [get_value/2]).
@ -36,10 +38,8 @@
-export([load/0]). -export([load/0]).
-export([status/1, broker/1, cluster/1, bridges/1, -export([status/1, broker/1, cluster/1, bridges/1,
clients/1, sessions/1, plugins/1, listeners/1, clients/1, sessions/1, topics/1, subscriptions/1,
vm/1, mnesia/1, trace/1]). plugins/1, listeners/1, vm/1, mnesia/1, trace/1]).
%% TODO: topics, subscriptions...
-define(PROC_INFOKEYS, [status, -define(PROC_INFOKEYS, [status,
memory, memory,
@ -49,6 +49,8 @@
stack_size, stack_size,
reductions]). reductions]).
-define(MAX_LINES, 20000).
-define(APP, emqttd). -define(APP, emqttd).
load() -> load() ->
@ -70,10 +72,10 @@ status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(), {InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
case lists:keysearch(?APP, 1, application:which_applications()) of case lists:keysearch(?APP, 1, application:which_applications()) of
false -> false ->
?PRINT_MSG("emqttd is not running~n"); ?PRINT_MSG("emqttd is not running~n");
{value, {?APP, _Desc, Vsn}} -> {value, {?APP, _Desc, Vsn}} ->
?PRINT("emqttd ~s is running~n", [Vsn]) ?PRINT("emqttd ~s is running~n", [Vsn])
end; end;
status(_) -> status(_) ->
?PRINT_CMD("status", "query broker status"). ?PRINT_CMD("status", "query broker status").
@ -101,12 +103,12 @@ broker(["metrics"]) ->
broker(["pubsub"]) -> broker(["pubsub"]) ->
Pubsubs = supervisor:which_children(emqttd_pubsub_sup), Pubsubs = supervisor:which_children(emqttd_pubsub_sup),
foreach(fun({{_, Id}, Pid, _, _}) -> foreach(fun({{_, Id}, Pid, _, _}) ->
ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
?PRINT("pubsub: ~w~n", [Id]), ?PRINT("pubsub: ~w~n", [Id]),
foreach(fun({Key, Val}) -> foreach(fun({Key, Val}) ->
?PRINT(" ~-18s: ~w~n", [Key, Val]) ?PRINT(" ~-18s: ~w~n", [Key, Val])
end, ProcInfo) end, ProcInfo)
end, lists:reverse(Pubsubs)); end, lists:reverse(Pubsubs));
broker(_) -> broker(_) ->
?USAGE([{"broker", "query broker version, uptime and description"}, ?USAGE([{"broker", "query broker version, uptime and description"},
@ -123,7 +125,7 @@ cluster([]) ->
?PRINT("cluster nodes: ~p~n", [Nodes]); ?PRINT("cluster nodes: ~p~n", [Nodes]);
cluster(usage) -> cluster(usage) ->
?PRINT_CMD("cluster [<Node>]", "cluster with node, query cluster info "); ?PRINT_CMD("cluster [<Node>]", "cluster with node, query cluster info");
cluster([SNode]) -> cluster([SNode]) ->
Node = emqttd_dist:parse_node(SNode), Node = emqttd_dist:parse_node(SNode),
@ -171,24 +173,22 @@ clients(["list"]) ->
emqttd_mnesia:dump(ets, mqtt_client, fun print/1); emqttd_mnesia:dump(ets, mqtt_client, fun print/1);
clients(["show", ClientId]) -> clients(["show", ClientId]) ->
case emqttd_cm:lookup(list_to_binary(ClientId)) of if_client(ClientId, fun print/1);
undefined -> ?PRINT_MSG("Not Found.~n");
Client -> print(Client)
end;
clients(["kick", ClientId]) -> clients(["kick", ClientId]) ->
case emqttd_cm:lookup(list_to_binary(ClientId)) of if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end);
undefined ->
?PRINT_MSG("Not Found.~n");
#mqtt_client{client_pid = Pid} ->
emqttd_client:kick(Pid)
end;
clients(_) -> clients(_) ->
?USAGE([{"clients list", "list all clients"}, ?USAGE([{"clients list", "list all clients"},
{"clients show <ClientId>", "show a client"}, {"clients show <ClientId>", "show a client"},
{"clients kick <ClientId>", "kick a client"}]). {"clients kick <ClientId>", "kick a client"}]).
if_client(ClientId, Fun) ->
case emqttd_cm:lookup(bin(ClientId)) of
undefined -> ?PRINT_MSG("Not Found.~n");
Client -> Fun(Client)
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Sessions Command %% @doc Sessions Command
%% @end %% @end
@ -203,7 +203,7 @@ sessions(["list", "transient"]) ->
emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1); emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1);
sessions(["show", ClientId]) -> sessions(["show", ClientId]) ->
MP = {{list_to_binary(ClientId), '_'}, '_'}, MP = {{bin(ClientId), '_'}, '_'},
case {ets:match_object(mqtt_transient_session, MP), case {ets:match_object(mqtt_transient_session, MP),
ets:match_object(mqtt_persistent_session, MP)} of ets:match_object(mqtt_persistent_session, MP)} of
{[], []} -> {[], []} ->
@ -220,6 +220,70 @@ sessions(_) ->
{"sessions list transient", "list all transient sessions"}, {"sessions list transient", "list all transient sessions"},
{"sessions show <ClientId>", "show a session"}]). {"sessions show <ClientId>", "show a session"}]).
%%------------------------------------------------------------------------------
%% @doc Topics Command
%% @end
%%------------------------------------------------------------------------------
topics(["list"]) ->
Print = fun(Topic, Records) -> print(topic, Topic, Records) end,
if_could_print(topic, Print);
topics(["show", Topic]) ->
print(topic, Topic, ets:lookup(topic, bin(Topic)));
topics(_) ->
?USAGE([{"topics list", "list all topics"},
{"topics show <Topic>", "show a topic"}]).
subscriptions(["list"]) ->
Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end,
if_subscription(fun() -> if_could_print(subscription, Print) end);
subscriptions(["show", ClientId]) ->
if_subscription(fun() ->
case emqttd_pubsub:lookup(subscription, ClientId) of
[] -> ?PRINT_MSG("Not Found.~n");
Records -> print(subscription, ClientId, Records)
end
end);
subscriptions(["add", ClientId, Topic, QoS]) ->
Create = fun(IntQos) -> emqttd_pubsub:create(subscription, {ClientId, Topic, IntQos}) end,
if_subscription(fun() -> if_valid_qos(QoS, Create) end);
subscriptions(["del", ClientId, Topic]) ->
if_subscription(fun() ->
emqttd_pubsub:delete(subscription, {ClientId, Topic})
end);
subscriptions(_) ->
?USAGE([{"subscriptions list", "list all subscriptions"},
{"subscriptions show <ClientId>", "show subscriptions of a client"},
{"subscriptions add <ClientId> <Topic> <QoS>", "add subscription"},
{"subscriptions del <ClientId> <Topic>", "delete subscription"}]).
if_subscription(Fun) ->
case ets:info(subscription, name) of
undefined -> ?PRINT_MSG("Error: subscription table not found!~n");
_ -> Fun()
end.
if_could_print(Tab, Fun) ->
case mnesia:table_info(Tab, size) of
Size when Size >= ?MAX_LINES ->
?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]);
_Size ->
Keys = mnesia:dirty_all_keys(Tab),
foreach(fun(Key) -> Fun(Key, ets:lookup(Tab, Key)) end, Keys)
end.
if_valid_qos(QoS, Fun) ->
try list_to_integer(QoS) of
Int when ?IS_QOS(Int) -> Fun(Int)
catch _:_ ->
?PRINT_MSG("QoS should be 0, 1, 2~n")
end.
plugins(["list"]) -> plugins(["list"]) ->
foreach(fun print/1, emqttd_plugins:list()); foreach(fun print/1, emqttd_plugins:list());
@ -296,9 +360,9 @@ parse_opts(Cmd, OptStr) ->
parse_opt(bridge, qos, Qos) -> parse_opt(bridge, qos, Qos) ->
{qos, list_to_integer(Qos)}; {qos, list_to_integer(Qos)};
parse_opt(bridge, suffix, Suffix) -> parse_opt(bridge, suffix, Suffix) ->
{topic_suffix, list_to_binary(Suffix)}; {topic_suffix, bin(Suffix)};
parse_opt(bridge, prefix, Prefix) -> parse_opt(bridge, prefix, Prefix) ->
{topic_prefix, list_to_binary(Prefix)}; {topic_prefix, bin(Prefix)};
parse_opt(bridge, queue, Len) -> parse_opt(bridge, queue, Len) ->
{max_queue_len, list_to_integer(Len)}; {max_queue_len, list_to_integer(Len)};
parse_opt(_Cmd, Opt, _Val) -> parse_opt(_Cmd, Opt, _Val) ->
@ -377,7 +441,7 @@ trace(_) ->
{"trace topic <Topic> off", "stop to trace Topic"}]). {"trace topic <Topic> off", "stop to trace Topic"}]).
trace_on(Who, Name, LogFile) -> trace_on(Who, Name, LogFile) ->
case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of
ok -> ok ->
?PRINT("trace ~s ~s successfully.~n", [Who, Name]); ?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} -> {error, Error} ->
@ -385,7 +449,7 @@ trace_on(Who, Name, LogFile) ->
end. end.
trace_off(Who, Name) -> trace_off(Who, Name) ->
case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of case emqttd_trace:stop_trace({Who, bin(Name)}) of
ok -> ok ->
?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} -> {error, Error} ->
@ -423,6 +487,9 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess,
emqttd_net:format(Peername), emqttd_net:format(Peername),
emqttd_util:now_to_secs(ConnectedAt)]); emqttd_util:now_to_secs(ConnectedAt)]);
print(#mqtt_topic{topic = Topic, node = Node}) ->
?PRINT("~s on ~s~n", [Topic, Node]);
print({{ClientId, _ClientPid}, SessInfo}) -> print({{ClientId, _ClientPid}, SessInfo}) ->
InfoKeys = [clean_sess, InfoKeys = [clean_sess,
max_inflight, max_inflight,
@ -440,6 +507,14 @@ print({{ClientId, _ClientPid}, SessInfo}) ->
"created_at=~w, subscriptions=~s)~n", "created_at=~w, subscriptions=~s)~n",
[ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]).
print(topic, Topic, Records) ->
Nodes = [Node || #mqtt_topic{node = Node} <- Records],
?PRINT("~s: on ~p~n", [Topic, Nodes]);
print(subscription, ClientId, Subscriptions) ->
TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
?PRINT("~s: ~p~n", [ClientId, TopicTable]).
format(created_at, Val) -> format(created_at, Val) ->
emqttd_util:now_to_secs(Val); emqttd_util:now_to_secs(Val);
@ -449,3 +524,6 @@ format(subscriptions, List) ->
format(_, Val) -> format(_, Val) ->
Val. Val.
bin(S) when is_list(S) -> list_to_binary(S);
bin(B) when is_binary(B) -> B.