From 30ff3b3f4e15a4e0fd91ad2a80413fd14e4b3ea8 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 12 Jun 2015 22:47:16 +0800 Subject: [PATCH 1/4] SESSION_TAB --- apps/emqttd/src/emqttd_sm.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index 05d9e62b0..d5ecf0a8e 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -53,7 +53,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {id, tabid, statsfun}). +-record(state, {id, statsfun}). -define(SM_POOL, sm_pool). @@ -91,7 +91,7 @@ table() -> ?SESSION_TAB. %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(emqttd_sm_sup:table(), ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. @@ -156,8 +156,8 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Tab}) -> - ets:match_delete(Tab, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> From 7bfc673c28abc56f18839ce6861925cf43c0d706 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 14 Jun 2015 23:51:07 +0800 Subject: [PATCH 2/4] publish willmsg when normal exit --- apps/emqttd/src/emqttd_protocol.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index d519a182e..2c07abbee 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -302,10 +302,16 @@ shutdown(duplicate_id, _State) -> shutdown(_, #proto_state{clientid = undefined}) -> ignore; -shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) -> +shutdown(normal, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown", [ClientId, emqttd_net:format(Peername)]), try_unregister(ClientId), + if + WillMsg =/= undefined -> + send_willmsg(ClientId, WillMsg); + true -> + ok + end, emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]); shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> @@ -331,6 +337,7 @@ send_willmsg(_ClientId, undefined) -> ignore; %%TODO:should call session... send_willmsg(ClientId, WillMsg) -> + lager:info("Client ~s send willmsg: ~p", [ClientId, WillMsg]), emqttd_pubsub:publish(ClientId, WillMsg). start_keepalive(0) -> ignore; From afa0c1819b9f66c51f712bfbdf497619d8d98474 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 14 Jun 2015 23:56:19 +0800 Subject: [PATCH 3/4] fix issue #175 --- apps/emqttd/src/emqttd_ws_client.erl | 58 ++++++++++++++++------------ 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/apps/emqttd/src/emqttd_ws_client.erl b/apps/emqttd/src/emqttd_ws_client.erl index 52a7ba7d9..4c2da9f8b 100644 --- a/apps/emqttd/src/emqttd_ws_client.erl +++ b/apps/emqttd/src/emqttd_ws_client.erl @@ -49,7 +49,10 @@ parser_state}). %% Client state --record(state, {ws_pid, request, proto_state, keepalive}). +-record(client_state, {ws_pid, + request, + proto_state, + keepalive}). %%------------------------------------------------------------------------------ %% @doc Start WebSocket client. @@ -109,80 +112,85 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> {ok, Peername} = emqttd_net:peername(Socket), SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), - {ok, #state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. + {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. handle_call(_Req, _From, State) -> {reply, error, State}. -handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) -> +handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#client_state{proto_state = ProtoState1}}; {error, Error} -> lager:error("MQTT protocol error ~p", [Error]), stop({shutdown, Error}, State); {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); {stop, Reason, ProtoState1} -> - stop(Reason, State#state{proto_state = ProtoState1}) + stop(Reason, State#client_state{proto_state = ProtoState1}) end; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) -> +handle_info({dispatch, {From, Messages}}, #client_state{proto_state = ProtoState} = State) + when is_list(Messages) -> ProtoState1 = lists:foldl(fun(Message, PState) -> {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1 end, ProtoState, Messages), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#client_state{proto_state = ProtoState1}}; -handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) -> +handle_info({dispatch, {From, Message}}, #client_state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#client_state{proto_state = ProtoState1}}; -handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) -> +handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#client_state{proto_state = ProtoState1}}; -handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> +handle_info({subscribe, Topic, Qos}, #client_state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#client_state{proto_state = ProtoState1}}; -handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState}) -> +handle_info({stop, duplicate_id, _NewPid}, State=#client_state{proto_state = ProtoState}) -> lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), stop({shutdown, duplicate_id}, State); -handle_info({keepalive, start, TimeoutSec}, State = #state{request = Req}) -> +handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) -> lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), %%TODO: fix esockd_transport... KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)}, TimeoutSec, {keepalive, timeout}), - {noreply, State#state{keepalive = KeepAlive}}; + {noreply, State#client_state{keepalive = KeepAlive}}; -handle_info({keepalive, timeout}, State = #state{request = Req, keepalive = KeepAlive}) -> +handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) -> case emqttd_keepalive:resume(KeepAlive) of timeout -> lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), - stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); + stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); {resumed, KeepAlive1} -> lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), - {noreply, State#state{keepalive = KeepAlive1}} + {noreply, State#client_state{keepalive = KeepAlive1}} end; -handle_info({'EXIT', WsPid, Reason}, State = #state{ws_pid = WsPid}) -> - stop(Reason, State); +handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) -> + ClientId = emqttd_protocol:clientid(ProtoState), + lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]), + stop({shutdown, websocket_closed}, State); -handle_info(Info, State = #state{request = Req}) -> +handle_info(Info, State = #client_state{request = Req}) -> lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), {noreply, State}. -terminate(Reason, #state{proto_state = ProtoState, keepalive = KeepAlive}) -> +terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> + lager:info("WebSocket client terminated: ~p", [Reason]), emqttd_keepalive:cancel(KeepAlive), case Reason of {shutdown, Error} -> emqttd_protocol:shutdown(Error, ProtoState); - _ -> ok + _ -> + emqttd_protocol:shutdown(Reason, ProtoState) end. code_change(_OldVsn, State, _Extra) -> From af3faf05db9fd43a9d64fd0840418e0deba2848d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 15 Jun 2015 00:04:39 +0800 Subject: [PATCH 4/4] 0.8.6 --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8b61e4a7..e969d6bf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ emqttd ChangeLog ================== +0.8.6-beta (2015-06-15) +------------------------- + +Bugfix: issue #175 - should publish Will message when websocket is closed without 'DISCONNECT' packet + + 0.8.5-beta (2015-06-10) -------------------------