add emqttd_client:subscribe/1 api. noreply/1 to hibernate

This commit is contained in:
Feng Lee 2015-08-20 19:00:01 +08:00
parent 0894d4f8e4
commit ad1f3ca44a
2 changed files with 21 additions and 15 deletions

View File

@ -34,7 +34,7 @@
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/2, info/1, kick/1]). -export([start_link/2, info/1, kick/1, subscribe/2]).
-behaviour(gen_server). -behaviour(gen_server).
@ -58,11 +58,14 @@
start_link(SockArgs, PktOpts) -> start_link(SockArgs, PktOpts) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}. {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}.
info(Pid) -> info(CPid) ->
gen_server:call(Pid, info, infinity). gen_server:call(CPid, info, infinity).
kick(Pid) -> kick(CPid) ->
gen_server:call(Pid, kick). gen_server:call(CPid, kick).
subscribe(CPid, TopicTable) ->
gen_server:cast(CPid, {subscribe, TopicTable}).
init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
% Transform if ssl. % Transform if ssl.
@ -95,6 +98,10 @@ handle_call(Req, _From, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
{reply, {error, unsupported_request}, State}. {reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
noreply(State#state{proto_state = ProtoState1});
handle_cast(Msg, State = #state{peername = Peername}) -> handle_cast(Msg, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
{noreply, State}. {noreply, State}.
@ -110,18 +117,14 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
{noreply, State#state{proto_state = ProtoState1}, hibernate}; noreply(State#state{proto_state = ProtoState1});
handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) -> handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}, hibernate}; noreply(State#state{proto_state = ProtoState1});
handle_info({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate}; noreply(State);
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
@ -138,7 +141,7 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
{noreply, State#state{ keepalive = KeepAlive }}; noreply(State#state{keepalive = KeepAlive});
handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
case emqttd_keepalive:resume(KeepAlive) of case emqttd_keepalive:resume(KeepAlive) of
@ -147,7 +150,7 @@ handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
{resumed, KeepAlive1} -> {resumed, KeepAlive1} ->
lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
{noreply, State#state{keepalive = KeepAlive1}} noreply(State#state{keepalive = KeepAlive1})
end; end;
handle_info(Info, State = #state{peername = Peername}) -> handle_info(Info, State = #state{peername = Peername}) ->
@ -168,6 +171,9 @@ terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
noreply(State) ->
{noreply, State, hibernate}.
%------------------------------------------------------- %-------------------------------------------------------
% receive and parse tcp data % receive and parse tcp data
%------------------------------------------------------- %-------------------------------------------------------

View File

@ -47,7 +47,7 @@ load(Opts) ->
client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) -> client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) ->
F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end, F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end,
ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]);
client_connected(_ConnAck, _Client, _Topics) -> client_connected(_ConnAck, _Client, _Topics) ->
ignore. ignore.