Merge pull request #1142 from emqtt/emq22

Version 2.2.0
This commit is contained in:
huangdan 2017-07-08 14:54:39 +08:00 committed by GitHub
commit f0fc53b9fe
7 changed files with 54 additions and 15 deletions

View File

@ -39,6 +39,11 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1
COVER = true COVER = true
PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia
DIALYZER_DIRS := ebin/
DIALYZER_OPTS := --verbose --statistics -Werror_handling \
-Wrace_conditions #-Wunmatched_returns
include erlang.mk include erlang.mk
app:: rebar.config app:: rebar.config

View File

@ -122,7 +122,7 @@ topics() -> emqttd_router:topics().
subscribers(Topic) -> subscribers(Topic) ->
emqttd_server:subscribers(iolist_to_binary(Topic)). emqttd_server:subscribers(iolist_to_binary(Topic)).
-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]). -spec(subscriptions(subscriber()) -> [{binary(), binary(), list(suboption())}]).
subscriptions(Subscriber) -> subscriptions(Subscriber) ->
emqttd_server:subscriptions(Subscriber). emqttd_server:subscriptions(Subscriber).

View File

@ -249,8 +249,6 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
ok -> ok ->
?PRINT_MSG("ok~n"); ?PRINT_MSG("ok~n");
{error, already_existed} ->
?PRINT_MSG("Error: already existed~n");
{error, Reason} -> {error, Reason} ->
?PRINT("Error: ~p~n", [Reason]) ?PRINT("Error: ~p~n", [Reason])
end end
@ -477,8 +475,34 @@ listeners([]) ->
end, Info) end, Info)
end, esockd:listeners()); end, esockd:listeners());
listeners(["restart", Proto, ListenOn]) ->
ListenOn1 = case string:tokens(ListenOn, ":") of
[Port] -> list_to_integer(Port);
[IP, Port] -> {IP, list_to_integer(Port)}
end,
case emqttd_app:restart_listener({list_to_atom(Proto), ListenOn1, []}) of
{ok, _Pid} ->
io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]);
{error, Error} ->
io:format("Failed to restart ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
end;
listeners(["stop", Proto, ListenOn]) ->
ListenOn1 = case string:tokens(ListenOn, ":") of
[Port] -> list_to_integer(Port);
[IP, Port] -> {IP, list_to_integer(Port)}
end,
case emqttd_app:stop_listener({list_to_atom(Proto), ListenOn1, []}) of
ok ->
io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
{error, Error} ->
io:format("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
end;
listeners(_) -> listeners(_) ->
?PRINT_CMD("listeners", "List listeners"). ?USAGE([{"listeners", "List listeners"},
{"listeners restart <Proto> <Port>", "Restart a listener"},
{"listeners stop <Proto> <Port>", "Stop a listener"}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Dump ETS %% Dump ETS

View File

@ -64,14 +64,24 @@ cast(Msg) -> gen_server:cast(?SERVER, Msg).
%% @doc Run a command %% @doc Run a command
-spec(run([string()]) -> any()). -spec(run([string()]) -> any()).
run([]) -> usage(); run([]) -> usage(), ok;
run(["help"]) -> usage(); run(["help"]) -> usage(), ok;
run([CmdS|Args]) -> run([CmdS|Args]) ->
case lookup(list_to_atom(CmdS)) of case lookup(list_to_atom(CmdS)) of
[{Mod, Fun}] -> Mod:Fun(Args); [{Mod, Fun}] ->
[] -> usage() try Mod:Fun(Args) of
_ -> ok
catch
_:Reason ->
io:format("Reason:~p, get_stacktrace:~p~n",
[Reason, erlang:get_stacktrace()]),
{error, Reason}
end;
[] ->
usage(),
{error, cmd_not_found}
end. end.
%% @doc Lookup a command %% @doc Lookup a command

View File

@ -102,7 +102,7 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Count packets received. %% @doc Count packets received.
-spec(received(mqtt_packet()) -> ok). -spec(received(mqtt_packet()) -> ignore | non_neg_integer()).
received(Packet) -> received(Packet) ->
inc('packets/received'), inc('packets/received'),
received1(Packet). received1(Packet).
@ -140,7 +140,7 @@ qos_received(?QOS_2) ->
inc('messages/qos2/received'). inc('messages/qos2/received').
%% @doc Count packets received. Will not count $SYS PUBLISH. %% @doc Count packets received. Will not count $SYS PUBLISH.
-spec(sent(mqtt_packet()) -> ok). -spec(sent(mqtt_packet()) -> ignore | non_neg_integer()).
sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) -> sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
ignore; ignore;
sent(Packet) -> sent(Packet) ->
@ -169,7 +169,7 @@ sent2(?UNSUBACK) ->
sent2(?PINGRESP) -> sent2(?PINGRESP) ->
inc('packets/pingresp'); inc('packets/pingresp');
sent2(_Type) -> sent2(_Type) ->
ingore. ignore.
qos_sent(?QOS_0) -> qos_sent(?QOS_0) ->
inc('messages/qos0/sent'); inc('messages/qos0/sent');
qos_sent(?QOS_1) -> qos_sent(?QOS_1) ->

View File

@ -130,7 +130,7 @@ async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}). call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
-spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]). -spec(subscriptions(emqttd:subscriber()) -> [{binary(), binary(), list(emqttd:suboption())}]).
subscriptions(Subscriber) -> subscriptions(Subscriber) ->
lists:map(fun({_, {_Share, Topic}}) -> lists:map(fun({_, {_Share, Topic}}) ->
subscription(Topic, Subscriber); subscription(Topic, Subscriber);

View File

@ -120,7 +120,7 @@
retry_interval = 20000 :: timeout(), retry_interval = 20000 :: timeout(),
%% Retry Timer %% Retry Timer
retry_timer :: reference(), retry_timer :: reference() | undefined,
%% All QoS1, QoS2 messages published to when client is disconnected. %% All QoS1, QoS2 messages published to when client is disconnected.
%% QoS 1 and QoS 2 messages pending transmission to the Client. %% QoS 1 and QoS 2 messages pending transmission to the Client.
@ -138,13 +138,13 @@
await_rel_timeout = 20000 :: timeout(), await_rel_timeout = 20000 :: timeout(),
%% Awaiting PUBREL timer %% Awaiting PUBREL timer
await_rel_timer :: reference(), await_rel_timer :: reference() | undefined,
%% Session Expiry Interval %% Session Expiry Interval
expiry_interval = 7200000 :: timeout(), expiry_interval = 7200000 :: timeout(),
%% Expired Timer %% Expired Timer
expiry_timer :: reference(), expiry_timer :: reference() | undefined,
%% Enable Stats %% Enable Stats
enable_stats :: boolean(), enable_stats :: boolean(),