diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 01a728b8d..f438e16ef 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -113,12 +113,12 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ peer_name = PeerName, handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); -handle_info({inet_reply, _Sock, {error, Reason}}, State) -> - lager:critical("unexpected inet_reply '~p'", [Reason]), +handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = PeerName}) -> + lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]), {noreply, State}; handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) -> - lager:info("Client: ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]), + lager:info("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]), KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}), {noreply, State#state{ keepalive = KeepAlive }}; @@ -126,27 +126,25 @@ handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) -> case emqtt_keepalive:resume(KeepAlive) of timeout -> lager:info("Client ~s: Keepalive Timeout!", [State#state.peer_name]), - stop({shutdown, keepalive_timeout}, State); + stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {resumed, KeepAlive1} -> lager:info("Client ~s: Keepalive Resumed", [State#state.peer_name]), {noreply, State#state{ keepalive = KeepAlive1 }} end; -handle_info(Info, State) -> - lager:error("badinfo :~p",[Info]), +handle_info(Info, State = #state{peer_name = PeerName}) -> + lager:critical("Client ~s: unexpected info ~p",[PeerName, Info]), {stop, {badinfo, Info}, State}. -terminate(Reason, #state{proto_state = unefined}) -> - io:format("client terminated: ~p, reason: ~p~n", [self(), Reason]), - %%TODO: fix keep_alive... - %%emqtt_keep_alive:cancel(KeepAlive), - %emqtt_protocol:connection_lost(ProtoState), - ok; - -terminate(_Reason, #state { keepalive = KeepAlive, proto_state = ProtoState }) -> - %%TODO: fix keep_alive... +terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState }) -> + lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]), emqtt_keepalive:cancel(KeepAlive), - emqtt_protocol:connection_lost(ProtoState), + case {ProtoState, Reason} of + {undefined, _} -> ok; + {_, {shutdown, Error}} -> + emqtt_protocol:shutdown(Error, ProtoState); + {_, _} -> ok %TODO: + end, ok. code_change(_OldVsn, State, _Extra) -> @@ -194,12 +192,8 @@ process_received_bytes(Bytes, end. %%---------------------------------------------------------------------------- -network_error(Reason, - State = #state{ conn_name = ConnStr}) -> - lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), - %%TODO: where to SEND WILL MSG?? - %%send_will_msg(State), - % todo: flush channel after publish +network_error(Reason, State = #state{ peer_name = PeerName, conn_name = ConnStr }) -> + lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]), stop({shutdown, conn_closed}, State). run_socket(State = #state{ conn_state = blocked }) -> diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 889ca5ee6..a99f062fe 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -32,7 +32,7 @@ -export([initial_state/2]). --export([handle_packet/2, send_message/2, send_packet/2, connection_lost/1]). +-export([handle_packet/2, send_message/2, send_packet/2, shutdown/2]). -export([info/1]). @@ -125,7 +125,7 @@ handle_packet(?CONNECT, Packet = #mqtt_packet { ClientId1 = clientid(ClientId, State), start_keepalive(KeepAlive), emqtt_cm:register(ClientId1, self()), - {?CONNACK_ACCEPT, State#proto_state{ will_msg = make_will_msg(Var), + {?CONNACK_ACCEPT, State#proto_state{ will_msg = make_willmsg(Var), clean_sess = CleanSess, client_id = ClientId1 }}; false -> @@ -207,8 +207,9 @@ handle_packet(?PINGREQ, #mqtt_packet{}, State) -> send_packet(make_packet(?PINGRESP), State); handle_packet(?DISCONNECT, #mqtt_packet{}, State) -> - %%how to handle session? - {stop, normal, State}. + %%TODO: how to handle session? + % clean willmsg + {stop, normal, State#proto_state{will_msg = undefined}}. make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT -> #mqtt_packet{ header = #mqtt_packet_header { type = Type } }. @@ -266,10 +267,11 @@ send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, cl erlang:port_command(Sock, Data), {ok, State}. -%%TODO: fix me later... -connection_lost(#proto_state{client_id = ClientId} = State) -> +shutdown(Error, State = #proto_state{peer_name = PeerName, client_id = ClientId, will_msg = WillMsg}) -> + send_willmsg(WillMsg), + try_unregister(ClientId, self()), + lager:info("Protocol ~s@~s Shutdown: ~p", [ClientId, PeerName, Error]), ok. - %emqtt_cm:unregister(ClientId, self()). make_message(#mqtt_packet { header = #mqtt_packet_header{ @@ -288,10 +290,10 @@ make_message(#mqtt_packet { msgid = PacketId, payload = Payload}. -make_will_msg(#mqtt_packet_connect{ will_flag = false }) -> +make_willmsg(#mqtt_packet_connect{ will_flag = false }) -> undefined; -make_will_msg(#mqtt_packet_connect{ will_retain = Retain, +make_willmsg(#mqtt_packet_connect{ will_retain = Retain, will_qos = Qos, will_topic = Topic, will_msg = Msg }) -> @@ -307,8 +309,6 @@ next_packet_id(State = #proto_state{ packet_id = PacketId }) -> State #proto_state{ packet_id = PacketId + 1 }. - - clientid(<<>>, #proto_state{peer_name = PeerName}) -> <<"eMQTT/", (base64:encode(PeerName))/binary>>; @@ -320,10 +320,9 @@ maybe_clean_sess(false, _Conn, _ClientId) -> %%---------------------------------------------------------------------------- -send_will_msg(#proto_state{will_msg = undefined}) -> - ignore; -send_will_msg(#proto_state{will_msg = WillMsg }) -> - emqtt_router:route(WillMsg). +send_willmsg(undefined) -> ignore; +%%TODO:should call session... +send_willmsg(WillMsg) -> emqtt_router:route(WillMsg). start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> @@ -394,3 +393,6 @@ validate_qos(undefined) -> true; validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(_) -> false. +try_unregister(undefined, _) -> ok; +try_unregister(ClientId, _) -> emqtt_cm:unregister(ClientId, self()). +