Fix the 'subscriptions' CLI
This commit is contained in:
parent
c751e958db
commit
405d5d9d29
|
@ -243,25 +243,32 @@ subscriptions(["list"]) ->
|
|||
end, ets:tab2list(mqtt_subscription));
|
||||
|
||||
subscriptions(["show", ClientId]) ->
|
||||
case ets:lookup(mqtt_subscription, bin(ClientId)) of
|
||||
[] -> ?PRINT_MSG("Not Found.~n");
|
||||
Records -> [print(subscription, Subscription) || Subscription <- Records]
|
||||
case emqttd:subscriptions(bin(ClientId)) of
|
||||
[] ->
|
||||
?PRINT_MSG("Not Found.~n");
|
||||
Subscriptions ->
|
||||
[print(subscription, Sub) || Sub<- Subscriptions]
|
||||
end;
|
||||
|
||||
subscriptions(["add", ClientId, Topic, QoS]) ->
|
||||
Add = fun(IntQos) ->
|
||||
case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
|
||||
ok ->
|
||||
?PRINT_MSG("ok~n");
|
||||
{error, Reason} ->
|
||||
?PRINT("Error: ~p~n", [Reason])
|
||||
if_valid_qos(QoS, fun(IntQos) ->
|
||||
case emqttd_sm:lookup_session(bin(ClientId)) of
|
||||
undefined ->
|
||||
?PRINT_MSG("Error: Session not found!");
|
||||
#mqtt_session{sess_pid = SessPid} ->
|
||||
emqttd_session:subscribe(SessPid, [{bin(Topic), IntQos}]),
|
||||
?PRINT_MSG("ok~n")
|
||||
end
|
||||
end,
|
||||
if_valid_qos(QoS, Add);
|
||||
end);
|
||||
|
||||
subscriptions(["del", ClientId, Topic]) ->
|
||||
Ok = emqttd:unsubscribe(bin(Topic), bin(ClientId)),
|
||||
?PRINT("~p~n", [Ok]);
|
||||
case emqttd_sm:lookup_session(bin(ClientId)) of
|
||||
undefined ->
|
||||
?PRINT_MSG("Error: Session not found!");
|
||||
#mqtt_session{sess_pid = SessPid} ->
|
||||
emqttd_session:unsubscribe(SessPid, [bin(Topic)]),
|
||||
?PRINT_MSG("ok~n")
|
||||
end;
|
||||
|
||||
subscriptions(_) ->
|
||||
?USAGE([{"subscriptions list", "List all subscriptions"},
|
||||
|
@ -570,14 +577,16 @@ print({ClientId, _ClientPid, _Persistent, SessInfo}) ->
|
|||
"deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n",
|
||||
[ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]).
|
||||
|
||||
print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) ->
|
||||
print(subscription, {Sub, {share, _Share, Topic}}) when is_pid(Sub) ->
|
||||
?PRINT("~p -> ~s~n", [Sub, Topic]);
|
||||
print(subscription, {Sub, Topic}) when is_pid(Sub) ->
|
||||
?PRINT("~p -> ~s~n", [Sub, Topic]);
|
||||
print(subscription, {Sub, {_Share, Topic}}) ->
|
||||
?PRINT("~s -> ~s~n", [Sub, Topic]);
|
||||
print(subscription, {Sub, Topic}) ->
|
||||
?PRINT("~s -> ~s~n", [Sub, Topic]).
|
||||
print(subscription, {{SubId, SubPid}, {share, _Share, Topic}})
|
||||
when is_binary(SubId), is_pid(SubPid) ->
|
||||
?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]);
|
||||
print(subscription, {{SubId, SubPid}, Topic})
|
||||
when is_binary(SubId), is_pid(SubPid) ->
|
||||
?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]).
|
||||
|
||||
format(created_at, Val) ->
|
||||
emqttd_time:now_secs(Val);
|
||||
|
|
Loading…
Reference in New Issue