diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 406ba9598..2d5ccf55a 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -74,8 +74,9 @@ init(SockArgs = {Transport, Sock, _SockFun}) -> proto_state = emqtt_protocol:init(Transport, NewSock, Peername)}), gen_server:enter_loop(?MODULE, [], State, 10000). -handle_call(info, _From, State = #state{ - conn_name=ConnName, proto_state = ProtoState}) -> +%%TODO: Not enough... +handle_call(info, _From, State = #state{conn_name=ConnName, + proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; handle_call(Req, _From, State) -> @@ -87,7 +88,8 @@ handle_cast(Msg, State) -> handle_info(timeout, State) -> stop({shutdown, timeout}, State); -handle_info({stop, duplicate_id, _NewPid}, State=#state{ proto_state = ProtoState, conn_name=ConnName}) -> +handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, + conn_name=ConnName}) -> %% TODO: to... %% need transfer data??? %% emqtt_client:transfer(NewPid, Data), @@ -107,7 +109,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; -handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{ peer_name = PeerName, socket = Sock }) -> +handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) -> lager:debug("RECV from ~s: ~p", [PeerName, Data]), process_received_bytes( Data, control_throttle(State #state{ await_recv = false })); @@ -124,28 +126,29 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), {noreply, State#state{ keepalive = KeepAlive }}; -handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) -> +handle_info({keepalive, timeout}, State = #state{keepalive = KeepAlive}) -> case emqtt_keepalive:resume(KeepAlive) of timeout -> lager:info("Client ~s: Keepalive Timeout!", [State#state.peer_name]), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {resumed, KeepAlive1} -> lager:info("Client ~s: Keepalive Resumed", [State#state.peer_name]), - {noreply, State#state{ keepalive = KeepAlive1 }} + {noreply, State#state{keepalive = KeepAlive1}} end; handle_info(Info, State = #state{peer_name = PeerName}) -> lager:critical("Client ~s: unexpected info ~p",[PeerName, Info]), {stop, {badinfo, Info}, State}. -terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState }) -> +terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) -> lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]), emqtt_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> ok; {_, {shutdown, Error}} -> emqtt_protocol:shutdown(Error, ProtoState); - {_, _} -> ok %TODO: + {_, _} -> + ok end, ok. @@ -158,22 +161,19 @@ code_change(_OldVsn, State, _Extra) -> process_received_bytes(<<>>, State) -> {noreply, State, hibernate}; -process_received_bytes(Bytes, - State = #state{ parse_state = ParseState, - proto_state = ProtoState, - conn_name = ConnStr }) -> +process_received_bytes(Bytes, State = #state{parse_state = ParseState, + proto_state = ProtoState, + conn_name = ConnStr}) -> case emqtt_parser:parse(Bytes, ParseState) of {more, ParseState1} -> {noreply, - control_throttle( State #state{ parse_state = ParseState1 }), + control_throttle( State #state{parse_state = ParseState1}), hibernate}; {ok, Packet, Rest} -> case emqtt_protocol:handle_packet(Packet, ProtoState) of {ok, ProtoState1} -> - process_received_bytes( - Rest, - State#state{ parse_state = emqtt_parser:init(), - proto_state = ProtoState1 }); + process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(), + proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), stop({shutdown, Error}, State); @@ -188,7 +188,7 @@ process_received_bytes(Bytes, end. %%---------------------------------------------------------------------------- -network_error(Reason, State = #state{ peer_name = PeerName }) -> +network_error(Reason, State = #state{peer_name = PeerName}) -> lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]), stop({shutdown, conn_closed}, State). @@ -200,12 +200,11 @@ run_socket(State = #state{transport = Transport, socket = Sock}) -> Transport:async_recv(Sock, 0, infinity), State#state{ await_recv = true }. -control_throttle(State = #state{ conn_state = Flow, - conserve = Conserve }) -> +control_throttle(State = #state{conn_state = Flow, + conserve = Conserve}) -> case {Flow, Conserve} of - {running, true} -> State #state{ conn_state = blocked }; - {blocked, false} -> run_socket(State #state{ - conn_state = running }); + {running, true} -> State #state{conn_state = blocked}; + {blocked, false} -> run_socket(State #state{conn_state = running}); {_, _} -> run_socket(State) end. diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index aa7c72341..adb65f433 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -19,7 +19,6 @@ %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %% SOFTWARE. %%------------------------------------------------------------------------------ - -module(emqtt_protocol). -include("emqtt.hrl"). @@ -29,14 +28,12 @@ %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ - -export([init/3, client_id/1]). -export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]). -export([info/1]). - %% ------------------------------------------------------------------ %% Protocol State %% ------------------------------------------------------------------ @@ -54,22 +51,14 @@ will_msg }). -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - -type(proto_state() :: #proto_state{}). -spec(send_message({pid() | tuple(), mqtt_message()}, proto_state()) -> {ok, proto_state()}). -spec(handle_packet(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}). --endif. - -%%---------------------------------------------------------------------------- - -define(PACKET_TYPE(Packet, Type), - Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}). + Packet = #mqtt_packet{header = #mqtt_packet_header { type = Type }}). -define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }).