From 405d5d9d2983194932d344fea6d9c81e1eca77f4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 18 Nov 2017 11:17:19 +0800 Subject: [PATCH] Fix the 'subscriptions' CLI --- src/emqttd_cli.erl | 47 +++++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index c8836230f..f795ed1be 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -243,25 +243,32 @@ subscriptions(["list"]) -> end, ets:tab2list(mqtt_subscription)); subscriptions(["show", ClientId]) -> - case ets:lookup(mqtt_subscription, bin(ClientId)) of - [] -> ?PRINT_MSG("Not Found.~n"); - Records -> [print(subscription, Subscription) || Subscription <- Records] + case emqttd:subscriptions(bin(ClientId)) of + [] -> + ?PRINT_MSG("Not Found.~n"); + Subscriptions -> + [print(subscription, Sub) || Sub<- Subscriptions] end; subscriptions(["add", ClientId, Topic, QoS]) -> - Add = fun(IntQos) -> - case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of - ok -> - ?PRINT_MSG("ok~n"); - {error, Reason} -> - ?PRINT("Error: ~p~n", [Reason]) - end - end, - if_valid_qos(QoS, Add); + if_valid_qos(QoS, fun(IntQos) -> + case emqttd_sm:lookup_session(bin(ClientId)) of + undefined -> + ?PRINT_MSG("Error: Session not found!"); + #mqtt_session{sess_pid = SessPid} -> + emqttd_session:subscribe(SessPid, [{bin(Topic), IntQos}]), + ?PRINT_MSG("ok~n") + end + end); subscriptions(["del", ClientId, Topic]) -> - Ok = emqttd:unsubscribe(bin(Topic), bin(ClientId)), - ?PRINT("~p~n", [Ok]); + case emqttd_sm:lookup_session(bin(ClientId)) of + undefined -> + ?PRINT_MSG("Error: Session not found!"); + #mqtt_session{sess_pid = SessPid} -> + emqttd_session:unsubscribe(SessPid, [bin(Topic)]), + ?PRINT_MSG("ok~n") + end; subscriptions(_) -> ?USAGE([{"subscriptions list", "List all subscriptions"}, @@ -570,14 +577,16 @@ print({ClientId, _ClientPid, _Persistent, SessInfo}) -> "deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n", [ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]). -print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) -> +print(subscription, {Sub, {share, _Share, Topic}}) when is_pid(Sub) -> ?PRINT("~p -> ~s~n", [Sub, Topic]); print(subscription, {Sub, Topic}) when is_pid(Sub) -> ?PRINT("~p -> ~s~n", [Sub, Topic]); -print(subscription, {Sub, {_Share, Topic}}) -> - ?PRINT("~s -> ~s~n", [Sub, Topic]); -print(subscription, {Sub, Topic}) -> - ?PRINT("~s -> ~s~n", [Sub, Topic]). +print(subscription, {{SubId, SubPid}, {share, _Share, Topic}}) + when is_binary(SubId), is_pid(SubPid) -> + ?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]); +print(subscription, {{SubId, SubPid}, Topic}) + when is_binary(SubId), is_pid(SubPid) -> + ?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]). format(created_at, Val) -> emqttd_time:now_secs(Val);