From ad1f3ca44aeb035621de2a095dd9875c98621d2d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 20 Aug 2015 19:00:01 +0800 Subject: [PATCH] add emqttd_client:subscribe/1 api. noreply/1 to hibernate --- src/emqttd_client.erl | 34 ++++++++++++++++++++-------------- src/emqttd_mod_autosub.erl | 2 +- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c013c3055..bffb9232f 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -34,7 +34,7 @@ -include("emqttd_protocol.hrl"). %% API Function Exports --export([start_link/2, info/1, kick/1]). +-export([start_link/2, info/1, kick/1, subscribe/2]). -behaviour(gen_server). @@ -58,11 +58,14 @@ start_link(SockArgs, PktOpts) -> {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}. -info(Pid) -> - gen_server:call(Pid, info, infinity). +info(CPid) -> + gen_server:call(CPid, info, infinity). -kick(Pid) -> - gen_server:call(Pid, kick). +kick(CPid) -> + gen_server:call(CPid, kick). + +subscribe(CPid, TopicTable) -> + gen_server:cast(CPid, {subscribe, TopicTable}). init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> % Transform if ssl. @@ -95,6 +98,10 @@ handle_call(Req, _From, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. +handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> + {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), + noreply(State#state{proto_state = ProtoState1}); + handle_cast(Msg, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), {noreply, State}. @@ -110,18 +117,14 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#state{proto_state = ProtoState1}, hibernate}; + noreply(State#state{proto_state = ProtoState1}); handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}, hibernate}; - -handle_info({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + noreply(State#state{proto_state = ProtoState1}); handle_info({inet_reply, _Ref, ok}, State) -> - {noreply, State, hibernate}; + noreply(State); handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), @@ -138,7 +141,7 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), - {noreply, State#state{ keepalive = KeepAlive }}; + noreply(State#state{keepalive = KeepAlive}); handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> case emqttd_keepalive:resume(KeepAlive) of @@ -147,7 +150,7 @@ handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {resumed, KeepAlive1} -> lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), - {noreply, State#state{keepalive = KeepAlive1}} + noreply(State#state{keepalive = KeepAlive1}) end; handle_info(Info, State = #state{peername = Peername}) -> @@ -167,6 +170,9 @@ terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state code_change(_OldVsn, State, _Extra) -> {ok, State}. + +noreply(State) -> + {noreply, State, hibernate}. %------------------------------------------------------- % receive and parse tcp data diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_autosub.erl index 86d0b3e2e..7285920e6 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_autosub.erl @@ -47,7 +47,7 @@ load(Opts) -> client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) -> F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end, - ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; + emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]); client_connected(_ConnAck, _Client, _Topics) -> ignore.