This commit is contained in:
Feng 2016-08-12 16:26:36 +08:00
parent 9a4c44913e
commit dd9217bf73
2 changed files with 18 additions and 17 deletions

View File

@ -172,11 +172,13 @@ if_client(ClientId, Fun) ->
sessions(["list"]) ->
dump(mqtt_local_session);
%% performance issue?
sessions(["list", "persistent"]) ->
dump(mqtt_persistent_session);
lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', false, '_', '_'}));
%% performance issue?
sessions(["list", "transient"]) ->
dump(mqtt_transient_session);
lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', true, '_', '_'}));
sessions(["show", ClientId]) ->
case ets:lookup(mqtt_local_session, bin(ClientId)) of
@ -193,10 +195,10 @@ sessions(_) ->
%%--------------------------------------------------------------------
%% @doc Routes Command
routes(["list"]) ->
if_could_print(route, fun print/1);
if_could_print(mqtt_route, fun print/1);
routes(["show", Topic]) ->
print(mnesia:dirty_read(route, bin(Topic)));
print(mnesia:dirty_read(mqtt_route, bin(Topic)));
routes(_) ->
?USAGE([{"routes list", "List all routes"},
@ -205,23 +207,24 @@ routes(_) ->
%%--------------------------------------------------------------------
%% @doc Topics Command
topics(["list"]) ->
if_could_print(topic, fun print/1);
lists:foreach(fun(Topic) -> ?PRINT("~s~n", [Topic]) end, emqttd:topics());
topics(["show", Topic]) ->
print(mnesia:dirty_read(topic, bin(Topic)));
print(mnesia:dirty_read(mqtt_route, bin(Topic)));
topics(_) ->
?USAGE([{"topics list", "List all topics"},
{"topics show <Topic>", "Show a topic"}]).
subscriptions(["list"]) ->
if_could_print(subscription, fun print/1);
subscriptions(["list", "static"]) ->
if_could_print(backend_subscription, fun print/1);
lists:foreach(fun({Sub, Topic, Opts}) when is_pid(Sub) ->
?PRINT("~p -> ~s: ~p~n", [Sub, Topic, Opts]);
({Sub, Topic, Opts}) ->
?PRINT("~s -> ~s: ~p~n", [Sub, Topic, Opts])
end, emqttd:subscriptions());
subscriptions(["show", ClientId]) ->
case mnesia:dirty_read(subscription, bin(ClientId)) of
case mnesia:dirty_read(mqtt_subscription, bin(ClientId)) of
[] -> ?PRINT_MSG("Not Found.~n");
Records -> print(Records)
end;
@ -252,7 +255,6 @@ subscriptions(["del", ClientId, Topic]) ->
subscriptions(_) ->
?USAGE([{"subscriptions list", "List all subscriptions"},
{"subscriptions list static", "List all static subscriptions"},
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
{"subscriptions del <ClientId>", "Delete static subscriptions manually"},
@ -509,9 +511,8 @@ print(#mqtt_topic{topic = Topic, flags = Flags}) ->
print(#mqtt_route{topic = Topic, node = Node}) ->
?PRINT("~s -> ~s~n", [Topic, Node]);
print({{ClientId, _ClientPid}, SessInfo}) ->
InfoKeys = [clean_sess,
max_inflight,
print({ClientId, _ClientPid, CleanSess, SessInfo}) ->
InfoKeys = [max_inflight,
inflight_queue,
message_queue,
message_dropped,
@ -523,7 +524,7 @@ print({{ClientId, _ClientPid}, SessInfo}) ->
"message_queue=~w, message_dropped=~w, "
"awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
"created_at=~w)~n",
[ClientId | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
[ClientId, CleanSess | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
format(created_at, Val) ->
emqttd_time:now_to_secs(Val);

View File

@ -139,7 +139,7 @@ subscriptions(Subscriber) ->
end, ets:lookup(mqtt_subscription, Subscriber)).
subscription(Topic, Subscriber) ->
{Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}.
{Topic, Subscriber, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}.
subscribers(Topic) -> emqttd_pubsub:subscribers(Topic).