commit
a7434f27b9
|
@ -122,7 +122,7 @@ topics() -> emqttd_router:topics().
|
||||||
subscribers(Topic) ->
|
subscribers(Topic) ->
|
||||||
emqttd_server:subscribers(iolist_to_binary(Topic)).
|
emqttd_server:subscribers(iolist_to_binary(Topic)).
|
||||||
|
|
||||||
-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
|
-spec(subscriptions(subscriber()) -> [{binary(), binary(), list(suboption())}]).
|
||||||
subscriptions(Subscriber) ->
|
subscriptions(Subscriber) ->
|
||||||
emqttd_server:subscriptions(Subscriber).
|
emqttd_server:subscriptions(Subscriber).
|
||||||
|
|
||||||
|
|
|
@ -249,8 +249,6 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
|
||||||
case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
|
case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
|
||||||
ok ->
|
ok ->
|
||||||
?PRINT_MSG("ok~n");
|
?PRINT_MSG("ok~n");
|
||||||
{error, already_existed} ->
|
|
||||||
?PRINT_MSG("Error: already existed~n");
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?PRINT("Error: ~p~n", [Reason])
|
?PRINT("Error: ~p~n", [Reason])
|
||||||
end
|
end
|
||||||
|
|
|
@ -102,7 +102,7 @@ start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
%% @doc Count packets received.
|
%% @doc Count packets received.
|
||||||
-spec(received(mqtt_packet()) -> ok).
|
-spec(received(mqtt_packet()) -> ignore | non_neg_integer()).
|
||||||
received(Packet) ->
|
received(Packet) ->
|
||||||
inc('packets/received'),
|
inc('packets/received'),
|
||||||
received1(Packet).
|
received1(Packet).
|
||||||
|
@ -140,7 +140,7 @@ qos_received(?QOS_2) ->
|
||||||
inc('messages/qos2/received').
|
inc('messages/qos2/received').
|
||||||
|
|
||||||
%% @doc Count packets received. Will not count $SYS PUBLISH.
|
%% @doc Count packets received. Will not count $SYS PUBLISH.
|
||||||
-spec(sent(mqtt_packet()) -> ok).
|
-spec(sent(mqtt_packet()) -> ignore | non_neg_integer()).
|
||||||
sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
|
sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
|
||||||
ignore;
|
ignore;
|
||||||
sent(Packet) ->
|
sent(Packet) ->
|
||||||
|
@ -169,7 +169,7 @@ sent2(?UNSUBACK) ->
|
||||||
sent2(?PINGRESP) ->
|
sent2(?PINGRESP) ->
|
||||||
inc('packets/pingresp');
|
inc('packets/pingresp');
|
||||||
sent2(_Type) ->
|
sent2(_Type) ->
|
||||||
ingore.
|
ignore.
|
||||||
qos_sent(?QOS_0) ->
|
qos_sent(?QOS_0) ->
|
||||||
inc('messages/qos0/sent');
|
inc('messages/qos0/sent');
|
||||||
qos_sent(?QOS_1) ->
|
qos_sent(?QOS_1) ->
|
||||||
|
|
|
@ -130,7 +130,7 @@ async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
|
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
|
||||||
call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
|
call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
|
||||||
|
|
||||||
-spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]).
|
-spec(subscriptions(emqttd:subscriber()) -> [{binary(), binary(), list(emqttd:suboption())}]).
|
||||||
subscriptions(Subscriber) ->
|
subscriptions(Subscriber) ->
|
||||||
lists:map(fun({_, {_Share, Topic}}) ->
|
lists:map(fun({_, {_Share, Topic}}) ->
|
||||||
subscription(Topic, Subscriber);
|
subscription(Topic, Subscriber);
|
||||||
|
|
|
@ -120,7 +120,7 @@
|
||||||
retry_interval = 20000 :: timeout(),
|
retry_interval = 20000 :: timeout(),
|
||||||
|
|
||||||
%% Retry Timer
|
%% Retry Timer
|
||||||
retry_timer :: reference(),
|
retry_timer :: reference() | undefined,
|
||||||
|
|
||||||
%% All QoS1, QoS2 messages published to when client is disconnected.
|
%% All QoS1, QoS2 messages published to when client is disconnected.
|
||||||
%% QoS 1 and QoS 2 messages pending transmission to the Client.
|
%% QoS 1 and QoS 2 messages pending transmission to the Client.
|
||||||
|
@ -138,13 +138,13 @@
|
||||||
await_rel_timeout = 20000 :: timeout(),
|
await_rel_timeout = 20000 :: timeout(),
|
||||||
|
|
||||||
%% Awaiting PUBREL timer
|
%% Awaiting PUBREL timer
|
||||||
await_rel_timer :: reference(),
|
await_rel_timer :: reference() | undefined,
|
||||||
|
|
||||||
%% Session Expiry Interval
|
%% Session Expiry Interval
|
||||||
expiry_interval = 7200000 :: timeout(),
|
expiry_interval = 7200000 :: timeout(),
|
||||||
|
|
||||||
%% Expired Timer
|
%% Expired Timer
|
||||||
expiry_timer :: reference(),
|
expiry_timer :: reference() | undefined,
|
||||||
|
|
||||||
%% Enable Stats
|
%% Enable Stats
|
||||||
enable_stats :: boolean(),
|
enable_stats :: boolean(),
|
||||||
|
|
Loading…
Reference in New Issue