merge master, send willmsg when normal terminate
This commit is contained in:
commit
c487348c2a
|
@ -14,6 +14,12 @@ Alarm
|
||||||
Protocol Compliant
|
Protocol Compliant
|
||||||
|
|
||||||
|
|
||||||
|
0.8.6-beta (2015-06-15)
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
Bugfix: issue #175 - should publish Will message when websocket is closed without 'DISCONNECT' packet
|
||||||
|
|
||||||
|
|
||||||
0.8.5-beta (2015-06-10)
|
0.8.5-beta (2015-06-10)
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
|
|
|
@ -294,10 +294,12 @@ shutdown(duplicate_id, _State) ->
|
||||||
shutdown(_, #proto_state{clientid = undefined}) ->
|
shutdown(_, #proto_state{clientid = undefined}) ->
|
||||||
ignore;
|
ignore;
|
||||||
|
|
||||||
shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) ->
|
%%TODO:
|
||||||
|
shutdown(normal, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
|
||||||
lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown",
|
lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown",
|
||||||
[ClientId, emqttd_net:format(Peername)]),
|
[ClientId, emqttd_net:format(Peername)]),
|
||||||
try_unregister(ClientId),
|
try_unregister(ClientId),
|
||||||
|
send_willmsg(ClientId, WillMsg);
|
||||||
emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]);
|
emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]);
|
||||||
|
|
||||||
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
|
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
|
||||||
|
@ -321,9 +323,9 @@ clientid(ClientId, _State) -> ClientId.
|
||||||
|
|
||||||
send_willmsg(_ClientId, undefined) ->
|
send_willmsg(_ClientId, undefined) ->
|
||||||
ignore;
|
ignore;
|
||||||
%%TODO:should call session...
|
send_willmsg(ClientId, WillMsg) ->
|
||||||
send_willmsg(ClientId, WillMsg) ->
|
lager:info("Client ~s send willmsg: ~p", [ClientId, WillMsg]),
|
||||||
emqttd_pubsub:publish(WillMsg#mqtt_message{from = ClientId}).
|
emqttd_pubsub:publish(ClientId, WillMsg).
|
||||||
|
|
||||||
start_keepalive(0) -> ignore;
|
start_keepalive(0) -> ignore;
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,10 @@
|
||||||
parser_state}).
|
parser_state}).
|
||||||
|
|
||||||
%% Client state
|
%% Client state
|
||||||
-record(state, {ws_pid, request, proto_state, keepalive}).
|
-record(client_state, {ws_pid,
|
||||||
|
request,
|
||||||
|
proto_state,
|
||||||
|
keepalive}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Start WebSocket client.
|
%% @doc Start WebSocket client.
|
||||||
|
@ -109,22 +112,22 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
|
||||||
{ok, Peername} = emqttd_net:peername(Socket),
|
{ok, Peername} = emqttd_net:peername(Socket),
|
||||||
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
|
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
|
||||||
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
|
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
|
||||||
{ok, #state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
|
{ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
|
||||||
|
|
||||||
handle_call(_Req, _From, State) ->
|
handle_call(_Req, _From, State) ->
|
||||||
{reply, error, State}.
|
{reply, error, State}.
|
||||||
|
|
||||||
handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) ->
|
handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
{noreply, State#state{proto_state = ProtoState1}};
|
{noreply, State#client_state{proto_state = ProtoState1}};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:error("MQTT protocol error ~p", [Error]),
|
lager:error("MQTT protocol error ~p", [Error]),
|
||||||
stop({shutdown, Error}, State);
|
stop({shutdown, Error}, State);
|
||||||
{error, Error, ProtoState1} ->
|
{error, Error, ProtoState1} ->
|
||||||
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
stop({shutdown, Error}, State#client_state{proto_state = ProtoState1});
|
||||||
{stop, Reason, ProtoState1} ->
|
{stop, Reason, ProtoState1} ->
|
||||||
stop(Reason, State#state{proto_state = ProtoState1})
|
stop(Reason, State#client_state{proto_state = ProtoState1})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
|
@ -134,48 +137,52 @@ handle_info({deliver, Message}, #state{proto_state = ProtoState} = State) ->
|
||||||
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
||||||
{noreply, State#state{proto_state = ProtoState1}};
|
{noreply, State#state{proto_state = ProtoState1}};
|
||||||
|
|
||||||
handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->
|
handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) ->
|
||||||
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
||||||
{noreply, State#state{proto_state = ProtoState1}};
|
{noreply, State#client_state{proto_state = ProtoState1}};
|
||||||
|
|
||||||
handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
|
handle_info({subscribe, Topic, Qos}, #client_state{proto_state = ProtoState} = State) ->
|
||||||
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
|
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
|
||||||
{noreply, State#state{proto_state = ProtoState1}};
|
{noreply, State#client_state{proto_state = ProtoState1}};
|
||||||
|
|
||||||
handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState}) ->
|
handle_info({stop, duplicate_id, _NewPid}, State=#client_state{proto_state = ProtoState}) ->
|
||||||
lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]),
|
lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]),
|
||||||
stop({shutdown, duplicate_id}, State);
|
stop({shutdown, duplicate_id}, State);
|
||||||
|
|
||||||
handle_info({keepalive, start, TimeoutSec}, State = #state{request = Req}) ->
|
handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) ->
|
||||||
lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]),
|
lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]),
|
||||||
%%TODO: fix esockd_transport...
|
%%TODO: fix esockd_transport...
|
||||||
KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)},
|
KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)},
|
||||||
TimeoutSec, {keepalive, timeout}),
|
TimeoutSec, {keepalive, timeout}),
|
||||||
{noreply, State#state{keepalive = KeepAlive}};
|
{noreply, State#client_state{keepalive = KeepAlive}};
|
||||||
|
|
||||||
handle_info({keepalive, timeout}, State = #state{request = Req, keepalive = KeepAlive}) ->
|
handle_info({keepalive, timeout}, State = #client_state{request = Req, keepalive = KeepAlive}) ->
|
||||||
case emqttd_keepalive:resume(KeepAlive) of
|
case emqttd_keepalive:resume(KeepAlive) of
|
||||||
timeout ->
|
timeout ->
|
||||||
lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
|
lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
|
||||||
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
|
||||||
{resumed, KeepAlive1} ->
|
{resumed, KeepAlive1} ->
|
||||||
lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
|
lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
|
||||||
{noreply, State#state{keepalive = KeepAlive1}}
|
{noreply, State#client_state{keepalive = KeepAlive1}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({'EXIT', WsPid, Reason}, State = #state{ws_pid = WsPid}) ->
|
handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) ->
|
||||||
stop(Reason, State);
|
ClientId = emqttd_protocol:clientid(ProtoState),
|
||||||
|
lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]),
|
||||||
|
stop({shutdown, websocket_closed}, State);
|
||||||
|
|
||||||
handle_info(Info, State = #state{request = Req}) ->
|
handle_info(Info, State = #client_state{request = Req}) ->
|
||||||
lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]),
|
lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(Reason, #state{proto_state = ProtoState, keepalive = KeepAlive}) ->
|
terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
|
||||||
|
lager:info("WebSocket client terminated: ~p", [Reason]),
|
||||||
emqttd_keepalive:cancel(KeepAlive),
|
emqttd_keepalive:cancel(KeepAlive),
|
||||||
case Reason of
|
case Reason of
|
||||||
{shutdown, Error} ->
|
{shutdown, Error} ->
|
||||||
emqttd_protocol:shutdown(Error, ProtoState);
|
emqttd_protocol:shutdown(Error, ProtoState);
|
||||||
_ -> ok
|
_ ->
|
||||||
|
emqttd_protocol:shutdown(Reason, ProtoState)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
Loading…
Reference in New Issue