From ad1f3ca44aeb035621de2a095dd9875c98621d2d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 20 Aug 2015 19:00:01 +0800 Subject: [PATCH 1/3] add emqttd_client:subscribe/1 api. noreply/1 to hibernate --- src/emqttd_client.erl | 34 ++++++++++++++++++++-------------- src/emqttd_mod_autosub.erl | 2 +- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c013c3055..bffb9232f 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -34,7 +34,7 @@ -include("emqttd_protocol.hrl"). %% API Function Exports --export([start_link/2, info/1, kick/1]). +-export([start_link/2, info/1, kick/1, subscribe/2]). -behaviour(gen_server). @@ -58,11 +58,14 @@ start_link(SockArgs, PktOpts) -> {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}. -info(Pid) -> - gen_server:call(Pid, info, infinity). +info(CPid) -> + gen_server:call(CPid, info, infinity). -kick(Pid) -> - gen_server:call(Pid, kick). +kick(CPid) -> + gen_server:call(CPid, kick). + +subscribe(CPid, TopicTable) -> + gen_server:cast(CPid, {subscribe, TopicTable}). init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> % Transform if ssl. @@ -95,6 +98,10 @@ handle_call(Req, _From, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. +handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> + {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), + noreply(State#state{proto_state = ProtoState1}); + handle_cast(Msg, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), {noreply, State}. @@ -110,18 +117,14 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#state{proto_state = ProtoState1}, hibernate}; + noreply(State#state{proto_state = ProtoState1}); handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}, hibernate}; - -handle_info({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + noreply(State#state{proto_state = ProtoState1}); handle_info({inet_reply, _Ref, ok}, State) -> - {noreply, State, hibernate}; + noreply(State); handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), @@ -138,7 +141,7 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), - {noreply, State#state{ keepalive = KeepAlive }}; + noreply(State#state{keepalive = KeepAlive}); handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> case emqttd_keepalive:resume(KeepAlive) of @@ -147,7 +150,7 @@ handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {resumed, KeepAlive1} -> lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), - {noreply, State#state{keepalive = KeepAlive1}} + noreply(State#state{keepalive = KeepAlive1}) end; handle_info(Info, State = #state{peername = Peername}) -> @@ -167,6 +170,9 @@ terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state code_change(_OldVsn, State, _Extra) -> {ok, State}. + +noreply(State) -> + {noreply, State, hibernate}. %------------------------------------------------------- % receive and parse tcp data diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_autosub.erl index 86d0b3e2e..7285920e6 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_autosub.erl @@ -47,7 +47,7 @@ load(Opts) -> client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) -> F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end, - ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; + emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]); client_connected(_ConnAck, _Client, _Topics) -> ignore. From 741ebf2ae1ef8288c176dc0b944c3e227d2a09b2 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 24 Aug 2015 22:04:14 +0800 Subject: [PATCH 2/3] cannot stop emqttd --- src/emqttd.app.src | 2 +- src/emqttd_app.erl | 5 +---- src/emqttd_ctl.erl | 1 + 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index bbde5e17f..2a08549a0 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.10.0"}, + {vsn, "0.10.1"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 159062b16..460f2ce49 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -134,10 +134,7 @@ worker_spec(Name, Opts) -> %% close all listeners first... prep_stop(State) -> - stop_listeners(), - timer:sleep(2), - emqttd_plugins:unload(), - timer:sleep(2), + stop_listeners(), State. stop_listeners() -> diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 7f1a339d0..aaca6be26 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -79,6 +79,7 @@ cluster([SNode]) -> pong -> case emqttd:is_running(Node) of true -> + emqttd_plugins:unload(), application:stop(emqttd), application:stop(esockd), application:stop(gproc), From b08c39db5220ec84138bf7b8ecb4f666dc181749 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 24 Aug 2015 22:20:32 +0800 Subject: [PATCH 3/3] stop_listeners --- src/emqttd_app.erl | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 460f2ce49..446bff8a9 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -31,7 +31,7 @@ -behaviour(application). %% Application callbacks --export([start/2, prep_stop/1, stop/1]). +-export([start/2, stop/1]). -define(PRINT_MSG(Msg), io:format(Msg)). @@ -132,10 +132,9 @@ worker_spec(Name, Opts) -> {Name, start_link, [Opts]}, permanent, 10000, worker, [Name]}. -%% close all listeners first... -prep_stop(State) -> - stop_listeners(), - State. +-spec stop(State :: term()) -> term(). +stop(_State) -> + stop_listeners(). stop_listeners() -> %% ensure that esockd applications is started? @@ -147,7 +146,3 @@ stop_listeners() -> emqttd:close_listeners(Listeners) end. --spec stop(State :: term()) -> term(). -stop(_State) -> - ok. -