will message

This commit is contained in:
Ery Lee 2015-01-14 19:38:35 +08:00
parent 890b429fad
commit 525a104976
2 changed files with 33 additions and 37 deletions

View File

@ -113,12 +113,12 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ peer_name = PeerName,
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State);
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
lager:critical("unexpected inet_reply '~p'", [Reason]),
handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = PeerName}) ->
lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]),
{noreply, State};
handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) ->
lager:info("Client: ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]),
lager:info("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]),
KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}),
{noreply, State#state{ keepalive = KeepAlive }};
@ -126,27 +126,25 @@ handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) ->
case emqtt_keepalive:resume(KeepAlive) of
timeout ->
lager:info("Client ~s: Keepalive Timeout!", [State#state.peer_name]),
stop({shutdown, keepalive_timeout}, State);
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
{resumed, KeepAlive1} ->
lager:info("Client ~s: Keepalive Resumed", [State#state.peer_name]),
{noreply, State#state{ keepalive = KeepAlive1 }}
end;
handle_info(Info, State) ->
lager:error("badinfo :~p",[Info]),
handle_info(Info, State = #state{peer_name = PeerName}) ->
lager:critical("Client ~s: unexpected info ~p",[PeerName, Info]),
{stop, {badinfo, Info}, State}.
terminate(Reason, #state{proto_state = unefined}) ->
io:format("client terminated: ~p, reason: ~p~n", [self(), Reason]),
%%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive),
%emqtt_protocol:connection_lost(ProtoState),
ok;
terminate(_Reason, #state { keepalive = KeepAlive, proto_state = ProtoState }) ->
%%TODO: fix keep_alive...
terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState }) ->
lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]),
emqtt_keepalive:cancel(KeepAlive),
emqtt_protocol:connection_lost(ProtoState),
case {ProtoState, Reason} of
{undefined, _} -> ok;
{_, {shutdown, Error}} ->
emqtt_protocol:shutdown(Error, ProtoState);
{_, _} -> ok %TODO:
end,
ok.
code_change(_OldVsn, State, _Extra) ->
@ -194,12 +192,8 @@ process_received_bytes(Bytes,
end.
%%----------------------------------------------------------------------------
network_error(Reason,
State = #state{ conn_name = ConnStr}) ->
lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
%%TODO: where to SEND WILL MSG??
%%send_will_msg(State),
% todo: flush channel after publish
network_error(Reason, State = #state{ peer_name = PeerName, conn_name = ConnStr }) ->
lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]),
stop({shutdown, conn_closed}, State).
run_socket(State = #state{ conn_state = blocked }) ->

View File

@ -32,7 +32,7 @@
-export([initial_state/2]).
-export([handle_packet/2, send_message/2, send_packet/2, connection_lost/1]).
-export([handle_packet/2, send_message/2, send_packet/2, shutdown/2]).
-export([info/1]).
@ -125,7 +125,7 @@ handle_packet(?CONNECT, Packet = #mqtt_packet {
ClientId1 = clientid(ClientId, State),
start_keepalive(KeepAlive),
emqtt_cm:register(ClientId1, self()),
{?CONNACK_ACCEPT, State#proto_state{ will_msg = make_will_msg(Var),
{?CONNACK_ACCEPT, State#proto_state{ will_msg = make_willmsg(Var),
clean_sess = CleanSess,
client_id = ClientId1 }};
false ->
@ -207,8 +207,9 @@ handle_packet(?PINGREQ, #mqtt_packet{}, State) ->
send_packet(make_packet(?PINGRESP), State);
handle_packet(?DISCONNECT, #mqtt_packet{}, State) ->
%%how to handle session?
{stop, normal, State}.
%%TODO: how to handle session?
% clean willmsg
{stop, normal, State#proto_state{will_msg = undefined}}.
make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT ->
#mqtt_packet{ header = #mqtt_packet_header { type = Type } }.
@ -266,10 +267,11 @@ send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, cl
erlang:port_command(Sock, Data),
{ok, State}.
%%TODO: fix me later...
connection_lost(#proto_state{client_id = ClientId} = State) ->
shutdown(Error, State = #proto_state{peer_name = PeerName, client_id = ClientId, will_msg = WillMsg}) ->
send_willmsg(WillMsg),
try_unregister(ClientId, self()),
lager:info("Protocol ~s@~s Shutdown: ~p", [ClientId, PeerName, Error]),
ok.
%emqtt_cm:unregister(ClientId, self()).
make_message(#mqtt_packet {
header = #mqtt_packet_header{
@ -288,10 +290,10 @@ make_message(#mqtt_packet {
msgid = PacketId,
payload = Payload}.
make_will_msg(#mqtt_packet_connect{ will_flag = false }) ->
make_willmsg(#mqtt_packet_connect{ will_flag = false }) ->
undefined;
make_will_msg(#mqtt_packet_connect{ will_retain = Retain,
make_willmsg(#mqtt_packet_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
@ -307,8 +309,6 @@ next_packet_id(State = #proto_state{ packet_id = PacketId }) ->
State #proto_state{ packet_id = PacketId + 1 }.
clientid(<<>>, #proto_state{peer_name = PeerName}) ->
<<"eMQTT/", (base64:encode(PeerName))/binary>>;
@ -320,10 +320,9 @@ maybe_clean_sess(false, _Conn, _ClientId) ->
%%----------------------------------------------------------------------------
send_will_msg(#proto_state{will_msg = undefined}) ->
ignore;
send_will_msg(#proto_state{will_msg = WillMsg }) ->
emqtt_router:route(WillMsg).
send_willmsg(undefined) -> ignore;
%%TODO:should call session...
send_willmsg(WillMsg) -> emqtt_router:route(WillMsg).
start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 ->
@ -394,3 +393,6 @@ validate_qos(undefined) -> true;
validate_qos(Qos) when Qos =< ?QOS_2 -> true;
validate_qos(_) -> false.
try_unregister(undefined, _) -> ok;
try_unregister(ClientId, _) -> emqtt_cm:unregister(ClientId, self()).