From d3ee464789e3c3baa532388a09e470c6f0a47c1b Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 16 Dec 2015 10:22:37 +0800 Subject: [PATCH] topics, subscriptions --- src/emqttd_cli.erl | 138 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 108 insertions(+), 30 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index ff8e9806a..a7656f9a3 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -29,6 +29,8 @@ -include("emqttd_cli.hrl"). +-include("emqttd_protocol.hrl"). + -import(lists, [foreach/2]). -import(proplists, [get_value/2]). @@ -36,10 +38,8 @@ -export([load/0]). -export([status/1, broker/1, cluster/1, bridges/1, - clients/1, sessions/1, plugins/1, listeners/1, - vm/1, mnesia/1, trace/1]). - -%% TODO: topics, subscriptions... + clients/1, sessions/1, topics/1, subscriptions/1, + plugins/1, listeners/1, vm/1, mnesia/1, trace/1]). -define(PROC_INFOKEYS, [status, memory, @@ -49,6 +49,8 @@ stack_size, reductions]). +-define(MAX_LINES, 20000). + -define(APP, emqttd). load() -> @@ -70,10 +72,10 @@ status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), case lists:keysearch(?APP, 1, application:which_applications()) of - false -> - ?PRINT_MSG("emqttd is not running~n"); - {value, {?APP, _Desc, Vsn}} -> - ?PRINT("emqttd ~s is running~n", [Vsn]) + false -> + ?PRINT_MSG("emqttd is not running~n"); + {value, {?APP, _Desc, Vsn}} -> + ?PRINT("emqttd ~s is running~n", [Vsn]) end; status(_) -> ?PRINT_CMD("status", "query broker status"). @@ -101,12 +103,12 @@ broker(["metrics"]) -> broker(["pubsub"]) -> Pubsubs = supervisor:which_children(emqttd_pubsub_sup), foreach(fun({{_, Id}, Pid, _, _}) -> - ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), - ?PRINT("pubsub: ~w~n", [Id]), - foreach(fun({Key, Val}) -> - ?PRINT(" ~-18s: ~w~n", [Key, Val]) - end, ProcInfo) - end, lists:reverse(Pubsubs)); + ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), + ?PRINT("pubsub: ~w~n", [Id]), + foreach(fun({Key, Val}) -> + ?PRINT(" ~-18s: ~w~n", [Key, Val]) + end, ProcInfo) + end, lists:reverse(Pubsubs)); broker(_) -> ?USAGE([{"broker", "query broker version, uptime and description"}, @@ -123,7 +125,7 @@ cluster([]) -> ?PRINT("cluster nodes: ~p~n", [Nodes]); cluster(usage) -> - ?PRINT_CMD("cluster []", "cluster with node, query cluster info "); + ?PRINT_CMD("cluster []", "cluster with node, query cluster info"); cluster([SNode]) -> Node = emqttd_dist:parse_node(SNode), @@ -171,24 +173,22 @@ clients(["list"]) -> emqttd_mnesia:dump(ets, mqtt_client, fun print/1); clients(["show", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> ?PRINT_MSG("Not Found.~n"); - Client -> print(Client) - end; + if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> - ?PRINT_MSG("Not Found.~n"); - #mqtt_client{client_pid = Pid} -> - emqttd_client:kick(Pid) - end; + if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end); clients(_) -> ?USAGE([{"clients list", "list all clients"}, {"clients show ", "show a client"}, {"clients kick ", "kick a client"}]). +if_client(ClientId, Fun) -> + case emqttd_cm:lookup(bin(ClientId)) of + undefined -> ?PRINT_MSG("Not Found.~n"); + Client -> Fun(Client) + end. + %%------------------------------------------------------------------------------ %% @doc Sessions Command %% @end @@ -203,7 +203,7 @@ sessions(["list", "transient"]) -> emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1); sessions(["show", ClientId]) -> - MP = {{list_to_binary(ClientId), '_'}, '_'}, + MP = {{bin(ClientId), '_'}, '_'}, case {ets:match_object(mqtt_transient_session, MP), ets:match_object(mqtt_persistent_session, MP)} of {[], []} -> @@ -220,6 +220,70 @@ sessions(_) -> {"sessions list transient", "list all transient sessions"}, {"sessions show ", "show a session"}]). +%%------------------------------------------------------------------------------ +%% @doc Topics Command +%% @end +%%------------------------------------------------------------------------------ +topics(["list"]) -> + Print = fun(Topic, Records) -> print(topic, Topic, Records) end, + if_could_print(topic, Print); + +topics(["show", Topic]) -> + print(topic, Topic, ets:lookup(topic, bin(Topic))); + +topics(_) -> + ?USAGE([{"topics list", "list all topics"}, + {"topics show ", "show a topic"}]). + +subscriptions(["list"]) -> + Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, + if_subscription(fun() -> if_could_print(subscription, Print) end); + +subscriptions(["show", ClientId]) -> + if_subscription(fun() -> + case emqttd_pubsub:lookup(subscription, ClientId) of + [] -> ?PRINT_MSG("Not Found.~n"); + Records -> print(subscription, ClientId, Records) + end + end); + +subscriptions(["add", ClientId, Topic, QoS]) -> + Create = fun(IntQos) -> emqttd_pubsub:create(subscription, {ClientId, Topic, IntQos}) end, + if_subscription(fun() -> if_valid_qos(QoS, Create) end); + +subscriptions(["del", ClientId, Topic]) -> + if_subscription(fun() -> + emqttd_pubsub:delete(subscription, {ClientId, Topic}) + end); + +subscriptions(_) -> + ?USAGE([{"subscriptions list", "list all subscriptions"}, + {"subscriptions show ", "show subscriptions of a client"}, + {"subscriptions add ", "add subscription"}, + {"subscriptions del ", "delete subscription"}]). + +if_subscription(Fun) -> + case ets:info(subscription, name) of + undefined -> ?PRINT_MSG("Error: subscription table not found!~n"); + _ -> Fun() + end. + +if_could_print(Tab, Fun) -> + case mnesia:table_info(Tab, size) of + Size when Size >= ?MAX_LINES -> + ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]); + _Size -> + Keys = mnesia:dirty_all_keys(Tab), + foreach(fun(Key) -> Fun(Key, ets:lookup(Tab, Key)) end, Keys) + end. + +if_valid_qos(QoS, Fun) -> + try list_to_integer(QoS) of + Int when ?IS_QOS(Int) -> Fun(Int) + catch _:_ -> + ?PRINT_MSG("QoS should be 0, 1, 2~n") + end. + plugins(["list"]) -> foreach(fun print/1, emqttd_plugins:list()); @@ -296,9 +360,9 @@ parse_opts(Cmd, OptStr) -> parse_opt(bridge, qos, Qos) -> {qos, list_to_integer(Qos)}; parse_opt(bridge, suffix, Suffix) -> - {topic_suffix, list_to_binary(Suffix)}; + {topic_suffix, bin(Suffix)}; parse_opt(bridge, prefix, Prefix) -> - {topic_prefix, list_to_binary(Prefix)}; + {topic_prefix, bin(Prefix)}; parse_opt(bridge, queue, Len) -> {max_queue_len, list_to_integer(Len)}; parse_opt(_Cmd, Opt, _Val) -> @@ -377,7 +441,7 @@ trace(_) -> {"trace topic off", "stop to trace Topic"}]). trace_on(Who, Name, LogFile) -> - case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of + case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of ok -> ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); {error, Error} -> @@ -385,7 +449,7 @@ trace_on(Who, Name, LogFile) -> end. trace_off(Who, Name) -> - case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of + case emqttd_trace:stop_trace({Who, bin(Name)}) of ok -> ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); {error, Error} -> @@ -423,6 +487,9 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, emqttd_net:format(Peername), emqttd_util:now_to_secs(ConnectedAt)]); +print(#mqtt_topic{topic = Topic, node = Node}) -> + ?PRINT("~s on ~s~n", [Topic, Node]); + print({{ClientId, _ClientPid}, SessInfo}) -> InfoKeys = [clean_sess, max_inflight, @@ -440,6 +507,14 @@ print({{ClientId, _ClientPid}, SessInfo}) -> "created_at=~w, subscriptions=~s)~n", [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). +print(topic, Topic, Records) -> + Nodes = [Node || #mqtt_topic{node = Node} <- Records], + ?PRINT("~s: on ~p~n", [Topic, Nodes]); + +print(subscription, ClientId, Subscriptions) -> + TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], + ?PRINT("~s: ~p~n", [ClientId, TopicTable]). + format(created_at, Val) -> emqttd_util:now_to_secs(Val); @@ -449,3 +524,6 @@ format(subscriptions, List) -> format(_, Val) -> Val. +bin(S) when is_list(S) -> list_to_binary(S); +bin(B) when is_binary(B) -> B. +