diff --git a/Makefile b/Makefile index 687e2208d..3738111b4 100644 --- a/Makefile +++ b/Makefile @@ -40,6 +40,11 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1 COVER = true +PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia +DIALYZER_DIRS := ebin/ +DIALYZER_OPTS := --verbose --statistics -Werror_handling \ + -Wrace_conditions #-Wunmatched_returns + include erlang.mk app:: rebar.config diff --git a/src/emqttd.erl b/src/emqttd.erl index 28692d2e6..d4cdd8437 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -125,7 +125,7 @@ topics() -> emqttd_router:topics(). subscribers(Topic) -> emqttd_server:subscribers(iolist_to_binary(Topic)). --spec(subscriptions(subscriber()) -> [{binary(), suboption()}]). +-spec(subscriptions(subscriber()) -> [{binary(), binary(), list(suboption())}]). subscriptions(Subscriber) -> emqttd_server:subscriptions(Subscriber). diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index ae7e531a6..acca547be 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -27,7 +27,7 @@ %% Application callbacks -export([start/2, stop/1]). --export([start_listener/1, stop_listener/1]). +-export([start_listener/1, stop_listener/1, restart_listener/1]). -type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). @@ -214,6 +214,21 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> stop_listener({Proto, ListenOn, _Opts}) -> esockd:close(Proto, ListenOn). +%% @doc Restart Listeners +restart_listener({tcp, ListenOn, _Opts}) -> + esockd:reopen('mqtt:tcp', ListenOn); +restart_listener({ssl, ListenOn, _Opts}) -> + esockd:reopen('mqtt:ssl', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> + mochiweb:restart_http('mqtt:ws', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> + mochiweb:restart_http('mqtt:wss', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) when Proto == api -> + mochiweb:restart_http('mqtt:api', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) -> + esockd:reopen(Proto, ListenOn). + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). merge_sockopts_test_() -> diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index fec3b0a90..d6301d484 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -249,8 +249,6 @@ subscriptions(["add", ClientId, Topic, QoS]) -> case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of ok -> ?PRINT_MSG("ok~n"); - {error, already_existed} -> - ?PRINT_MSG("Error: already existed~n"); {error, Reason} -> ?PRINT("Error: ~p~n", [Reason]) end @@ -477,8 +475,34 @@ listeners([]) -> end, Info) end, esockd:listeners()); +listeners(["restart", Proto, ListenOn]) -> + ListenOn1 = case string:tokens(ListenOn, ":") of + [Port] -> list_to_integer(Port); + [IP, Port] -> {IP, list_to_integer(Port)} + end, + case emqttd_app:restart_listener({list_to_atom(Proto), ListenOn1, []}) of + {ok, _Pid} -> + io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]); + {error, Error} -> + io:format("Failed to restart ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) + end; + +listeners(["stop", Proto, ListenOn]) -> + ListenOn1 = case string:tokens(ListenOn, ":") of + [Port] -> list_to_integer(Port); + [IP, Port] -> {IP, list_to_integer(Port)} + end, + case emqttd_app:stop_listener({list_to_atom(Proto), ListenOn1, []}) of + ok -> + io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]); + {error, Error} -> + io:format("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) + end; + listeners(_) -> - ?PRINT_CMD("listeners", "List listeners"). + ?USAGE([{"listeners", "List listeners"}, + {"listeners restart ", "Restart a listener"}, + {"listeners stop ", "Stop a listener"}]). %%-------------------------------------------------------------------- %% Dump ETS diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 0aa8ce353..195d3dea0 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -64,14 +64,24 @@ cast(Msg) -> gen_server:cast(?SERVER, Msg). %% @doc Run a command -spec(run([string()]) -> any()). -run([]) -> usage(); +run([]) -> usage(), ok; -run(["help"]) -> usage(); +run(["help"]) -> usage(), ok; run([CmdS|Args]) -> case lookup(list_to_atom(CmdS)) of - [{Mod, Fun}] -> Mod:Fun(Args); - [] -> usage() + [{Mod, Fun}] -> + try Mod:Fun(Args) of + _ -> ok + catch + _:Reason -> + io:format("Reason:~p, get_stacktrace:~p~n", + [Reason, erlang:get_stacktrace()]), + {error, Reason} + end; + [] -> + usage(), + {error, cmd_not_found} end. %% @doc Lookup a command diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index 2c0e42ea0..c21d274a0 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -102,7 +102,7 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% @doc Count packets received. --spec(received(mqtt_packet()) -> ok). +-spec(received(mqtt_packet()) -> ignore | non_neg_integer()). received(Packet) -> inc('packets/received'), received1(Packet). @@ -140,7 +140,7 @@ qos_received(?QOS_2) -> inc('messages/qos2/received'). %% @doc Count packets received. Will not count $SYS PUBLISH. --spec(sent(mqtt_packet()) -> ok). +-spec(sent(mqtt_packet()) -> ignore | non_neg_integer()). sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) -> ignore; sent(Packet) -> @@ -169,7 +169,7 @@ sent2(?UNSUBACK) -> sent2(?PINGRESP) -> inc('packets/pingresp'); sent2(_Type) -> - ingore. + ignore. qos_sent(?QOS_0) -> inc('messages/qos0/sent'); qos_sent(?QOS_1) -> diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index cde9208df..69d18e1e4 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -130,7 +130,7 @@ async_unsubscribe(Topic, Subscriber) when is_binary(Topic) -> setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}). --spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]). +-spec(subscriptions(emqttd:subscriber()) -> [{binary(), binary(), list(emqttd:suboption())}]). subscriptions(Subscriber) -> lists:map(fun({_, {_Share, Topic}}) -> subscription(Topic, Subscriber); diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 59295c112..31934759f 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -120,7 +120,7 @@ retry_interval = 20000 :: timeout(), %% Retry Timer - retry_timer :: reference(), + retry_timer :: reference() | undefined, %% All QoS1, QoS2 messages published to when client is disconnected. %% QoS 1 and QoS 2 messages pending transmission to the Client. @@ -138,13 +138,13 @@ await_rel_timeout = 20000 :: timeout(), %% Awaiting PUBREL timer - await_rel_timer :: reference(), + await_rel_timer :: reference() | undefined, %% Session Expiry Interval expiry_interval = 7200000 :: timeout(), %% Expired Timer - expiry_timer :: reference(), + expiry_timer :: reference() | undefined, %% Enable Stats enable_stats :: boolean(),