KeepAlive
This commit is contained in:
parent
24d9b46836
commit
52c3bc9628
|
@ -29,120 +29,126 @@
|
|||
-export([start_link/1, info/1, go/2]).
|
||||
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
code_change/3,
|
||||
terminate/2]).
|
||||
terminate/2]).
|
||||
|
||||
-include("emqtt.hrl").
|
||||
|
||||
%%Client State...
|
||||
-record(state, {
|
||||
socket,
|
||||
conn_name,
|
||||
await_recv,
|
||||
conn_state,
|
||||
conserve,
|
||||
parse_state,
|
||||
proto_state,
|
||||
keep_alive
|
||||
socket,
|
||||
peer_name,
|
||||
conn_name,
|
||||
await_recv,
|
||||
conn_state,
|
||||
conserve,
|
||||
parse_state,
|
||||
proto_state,
|
||||
keepalive
|
||||
}).
|
||||
|
||||
start_link(Sock) ->
|
||||
gen_server:start_link(?MODULE, [Sock], []).
|
||||
|
||||
info(Pid) ->
|
||||
gen_server:call(Pid, info).
|
||||
gen_server:call(Pid, info).
|
||||
|
||||
go(Pid, Sock) ->
|
||||
gen_server:call(Pid, {go, Sock}).
|
||||
gen_server:call(Pid, {go, Sock}).
|
||||
|
||||
init([Sock]) ->
|
||||
io:format("client is created: ~p~n", [self()]),
|
||||
{ok, #state{socket = Sock}, hibernate}.
|
||||
{ok, #state{socket = Sock}, 1000}.
|
||||
|
||||
handle_call({go, Sock}, _From, #state{socket = Sock}) ->
|
||||
{ok, Peername} = emqtt_net:peer_string(Sock),
|
||||
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
|
||||
io:format("conn from ~s~n", [ConnStr]),
|
||||
lager:debug("connection: ~s~n", [ConnStr]),
|
||||
{reply, ok,
|
||||
control_throttle(
|
||||
#state{ socket = Sock,
|
||||
peer_name = Peername,
|
||||
conn_name = ConnStr,
|
||||
await_recv = false,
|
||||
conn_state = running,
|
||||
conserve = false,
|
||||
parse_state = emqtt_packet:initial_state(),
|
||||
proto_state = emqtt_protocol:initial_state(Sock)})};
|
||||
proto_state = emqtt_protocol:initial_state(Sock)}), 10000};
|
||||
|
||||
handle_call(info, _From, State = #state{
|
||||
conn_name=ConnName,
|
||||
proto_state = ProtoState}) ->
|
||||
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
|
||||
conn_name=ConnName, proto_state = ProtoState}) ->
|
||||
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
{stop, {badreq, Req}, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
{stop, {badmsg, Msg}, State}.
|
||||
{stop, {badmsg, Msg}, State}.
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
stop({shutdown, timeout}, State);
|
||||
stop({shutdown, timeout}, State);
|
||||
|
||||
handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) ->
|
||||
%%TODO:
|
||||
%lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
||||
stop({shutdown, duplicate_id}, State);
|
||||
%%TODO:
|
||||
%lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
||||
stop({shutdown, duplicate_id}, State);
|
||||
|
||||
%%TODO: ok??
|
||||
handle_info({dispatch, Msg}, #state{proto_state = ProtoState} = State) ->
|
||||
{ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState),
|
||||
{noreply, State#state{proto_state = ProtoState1}};
|
||||
handle_info({dispatch, Message}, #state{proto_state = ProtoState} = State) ->
|
||||
{ok, ProtoState1} = emqtt_protocol:send_message(Message, ProtoState),
|
||||
{noreply, State#state{proto_state = ProtoState1}};
|
||||
|
||||
handle_info({inet_reply, _Ref, ok}, State) ->
|
||||
{noreply, State, hibernate};
|
||||
|
||||
handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) ->
|
||||
process_received_bytes(
|
||||
Data, control_throttle(State #state{ await_recv = false }));
|
||||
Data, control_throttle(State #state{ await_recv = false }));
|
||||
|
||||
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
||||
network_error(Reason, State);
|
||||
|
||||
%%TODO: HOW TO HANDLE THIS??
|
||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
{noreply, State};
|
||||
lager:critical("unexpected inet_reply '~p'", [Reason]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
|
||||
case emqtt_keep_alive:state(KeepAlive) of
|
||||
idle ->
|
||||
lager:info("keep_alive timeout: ~p", [State#state.conn_name]),
|
||||
{stop, normal, State};
|
||||
active ->
|
||||
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
||||
{noreply, State#state{keep_alive=KeepAlive1}}
|
||||
end;
|
||||
handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) ->
|
||||
lager:info("~s keepalive started: ~p", [State#state.peer_name, TimeoutSec]),
|
||||
KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}),
|
||||
{noreply, State#state{ keepalive = KeepAlive }};
|
||||
|
||||
handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) ->
|
||||
case emqtt_keepalive:resume(KeepAlive) of
|
||||
timeout ->
|
||||
lager:info("~s keepalive timeout!", [State#state.peer_name]),
|
||||
{stop, normal, State};
|
||||
{resumed, KeepAlive1} ->
|
||||
lager:info("~s keepalive resumed.", [State#state.peer_name]),
|
||||
{noreply, State#state{ keepalive = KeepAlive1 }}
|
||||
end;
|
||||
|
||||
handle_info(Info, State) ->
|
||||
lager:error("badinfo :~p",[Info]),
|
||||
{stop, {badinfo, Info}, State}.
|
||||
lager:error("badinfo :~p",[Info]),
|
||||
{stop, {badinfo, Info}, State}.
|
||||
|
||||
terminate(Reason, #state{proto_state = unefined}) ->
|
||||
io:format("client terminated: ~p, reason: ~p~n", [self(), Reason]),
|
||||
%%TODO: fix keep_alive...
|
||||
%%emqtt_keep_alive:cancel(KeepAlive),
|
||||
%emqtt_protocol:client_terminated(ProtoState),
|
||||
ok;
|
||||
%%TODO: fix keep_alive...
|
||||
%%emqtt_keep_alive:cancel(KeepAlive),
|
||||
%emqtt_protocol:client_terminated(ProtoState),
|
||||
ok;
|
||||
|
||||
terminate(_Reason, #state{proto_state = ProtoState}) ->
|
||||
%%TODO: fix keep_alive...
|
||||
%%emqtt_keep_alive:cancel(KeepAlive),
|
||||
emqtt_protocol:client_terminated(ProtoState),
|
||||
ok.
|
||||
terminate(_Reason, #state { keepalive = KeepAlive, proto_state = ProtoState }) ->
|
||||
%%TODO: fix keep_alive...
|
||||
emqtt_keepalive:cancel(KeepAlive),
|
||||
emqtt_protocol:client_terminated(ProtoState),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
async_recv(Sock, Length, infinity) when is_port(Sock) ->
|
||||
prim_inet:async_recv(Sock, Length, -1);
|
||||
|
||||
|
@ -157,38 +163,38 @@ process_received_bytes(<<>>, State) ->
|
|||
|
||||
process_received_bytes(Bytes,
|
||||
State = #state{ parse_state = ParseState,
|
||||
proto_state = ProtoState,
|
||||
proto_state = ProtoState,
|
||||
conn_name = ConnStr }) ->
|
||||
case emqtt_packet:parse(Bytes, ParseState) of
|
||||
{more, ParseState1} ->
|
||||
{noreply,
|
||||
control_throttle( State #state{ parse_state = ParseState1 }),
|
||||
hibernate};
|
||||
{ok, Packet, Rest} ->
|
||||
case emqtt_protocol:handle_packet(Packet, ProtoState) of
|
||||
{ok, ProtoState1} ->
|
||||
process_received_bytes(
|
||||
Rest,
|
||||
State#state{ parse_state = emqtt_packet:initial_state(),
|
||||
proto_state = ProtoState1 });
|
||||
{error, Error} ->
|
||||
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
||||
stop({shutdown, Error}, State);
|
||||
{error, Error, ProtoState1} ->
|
||||
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
||||
{stop, ProtoState1} ->
|
||||
stop(normal, State#state{proto_state = ProtoState1})
|
||||
end;
|
||||
{error, Error} ->
|
||||
lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
|
||||
stop({shutdown, Error}, State)
|
||||
{more, ParseState1} ->
|
||||
{noreply,
|
||||
control_throttle( State #state{ parse_state = ParseState1 }),
|
||||
hibernate};
|
||||
{ok, Packet, Rest} ->
|
||||
case emqtt_protocol:handle_packet(Packet, ProtoState) of
|
||||
{ok, ProtoState1} ->
|
||||
process_received_bytes(
|
||||
Rest,
|
||||
State#state{ parse_state = emqtt_packet:initial_state(),
|
||||
proto_state = ProtoState1 });
|
||||
{error, Error} ->
|
||||
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
||||
stop({shutdown, Error}, State);
|
||||
{error, Error, ProtoState1} ->
|
||||
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
||||
{stop, ProtoState1} ->
|
||||
stop(normal, State#state{proto_state = ProtoState1})
|
||||
end;
|
||||
{error, Error} ->
|
||||
lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
|
||||
stop({shutdown, Error}, State)
|
||||
end.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
network_error(Reason,
|
||||
State = #state{ conn_name = ConnStr}) ->
|
||||
lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
|
||||
%%TODO: where to SEND WILL MSG??
|
||||
%%TODO: where to SEND WILL MSG??
|
||||
%%send_will_msg(State),
|
||||
% todo: flush channel after publish
|
||||
stop({shutdown, conn_closed}, State).
|
||||
|
|
|
@ -24,43 +24,48 @@
|
|||
|
||||
-author('feng@emqtt.io').
|
||||
|
||||
-export([new/2,
|
||||
state/1,
|
||||
activate/1,
|
||||
reset/1,
|
||||
cancel/1]).
|
||||
-export([new/3, resume/1, cancel/1]).
|
||||
|
||||
-record(keep_alive, {state, period, timer, msg}).
|
||||
-record(keepalive, {socket, recv_oct, timeout_sec, timeout_msg, timer_ref}).
|
||||
|
||||
new(undefined, _) ->
|
||||
undefined;
|
||||
new(0, _) ->
|
||||
undefined;
|
||||
new(Period, TimeoutMsg) when is_integer(Period) ->
|
||||
Ref = erlang:send_after(Period, self(), TimeoutMsg),
|
||||
#keep_alive{state=idle, period=Period, timer=Ref, msg=TimeoutMsg}.
|
||||
%%
|
||||
%% @doc create a keepalive.
|
||||
%%
|
||||
new(Socket, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
|
||||
{ok, [{recv_oct, RecvOct}]} = inet:getstate(Socket, [recv_oct]),
|
||||
Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
|
||||
#keepalive { socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref }.
|
||||
|
||||
state(undefined) ->
|
||||
undefined;
|
||||
state(#keep_alive{state=State}) ->
|
||||
State.
|
||||
|
||||
activate(undefined) ->
|
||||
undefined;
|
||||
activate(KeepAlive) when is_record(KeepAlive, keep_alive) ->
|
||||
KeepAlive#keep_alive{state=active}.
|
||||
|
||||
reset(undefined) ->
|
||||
undefined;
|
||||
reset(KeepAlive=#keep_alive{period=Period, timer=Timer, msg=Msg}) ->
|
||||
catch erlang:cancel_timer(Timer),
|
||||
Ref = erlang:send_after(Period, self(), Msg),
|
||||
KeepAlive#keep_alive{state=idle, timer = Ref}.
|
||||
%%
|
||||
%% @doc try to resume keepalive, called when timeout.
|
||||
%%
|
||||
resume(KeepAlive = #keepalive { socket = Socket,
|
||||
recv_oct = RecvOct,
|
||||
timeout_sec = TimeoutSec,
|
||||
timeout_msg = TimeoutMsg,
|
||||
timer_ref = Ref }) ->
|
||||
{ok, [{recv_oct, NewRecvOct}]} = inet:getstate(Socket, [recv_oct]),
|
||||
if
|
||||
NewRecvOct =:= RecvOct ->
|
||||
timeout;
|
||||
true ->
|
||||
%need?
|
||||
cancel(Ref),
|
||||
NewRef = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
|
||||
{resumed, KeepAlive#keepalive { recv_oct = NewRecvOct, timer_ref = NewRef }}
|
||||
end.
|
||||
|
||||
%%
|
||||
%% @doc cancel keepalive
|
||||
%%
|
||||
cancel(#keepalive { timer_ref = Ref }) ->
|
||||
cancel(Ref);
|
||||
cancel(undefined) ->
|
||||
undefined;
|
||||
cancel(KeepAlive=#keep_alive{timer=Timer}) ->
|
||||
catch erlang:cancel_timer(Timer),
|
||||
KeepAlive#keep_alive{timer=undefined}.
|
||||
|
||||
cancel(Ref) ->
|
||||
catch erlang:cancel_timer(Ref).
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
|
||||
-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]).
|
||||
|
||||
-export([connection_string/2]).
|
||||
-export([peername/1, sockname/1, peer_string/1, connection_string/2]).
|
||||
|
||||
-include_lib("kernel/include/inet.hrl").
|
||||
|
||||
|
@ -196,6 +196,14 @@ setopts(Sock, Options) when is_port(Sock) ->
|
|||
|
||||
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
|
||||
|
||||
peer_string(Sock) ->
|
||||
case peername(Sock) of
|
||||
{ok, {Addr, Port}} ->
|
||||
{ok, lists:flatten(io_lib:format("~s:~p", [maybe_ntoab(Addr), Port]))};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
peername(Sock) when is_port(Sock) -> inet:peername(Sock).
|
||||
|
||||
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
|
||||
|
|
|
@ -217,7 +217,7 @@ serialise_variable(#mqtt_packet_header { type = PubAck },
|
|||
<<>> = _Payload)
|
||||
when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC;
|
||||
PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP ->
|
||||
{PacketIdBin = <<PacketId:16/big>>, <<>>};
|
||||
{<<PacketId:16/big>>, <<>>};
|
||||
|
||||
serialise_variable(#mqtt_packet_header { },
|
||||
undefined,
|
||||
|
|
|
@ -103,7 +103,7 @@ handle_packet(?CONNECT, #mqtt_packet {
|
|||
password = Password,
|
||||
proto_ver = ProtoVersion,
|
||||
clean_sess = CleanSess,
|
||||
keep_alive = AlivePeriod,
|
||||
keep_alive = KeepAlive,
|
||||
client_id = ClientId } = Var },
|
||||
State0 = #proto_state{socket = Sock}) ->
|
||||
|
||||
|
@ -118,19 +118,18 @@ handle_packet(?CONNECT, #mqtt_packet {
|
|||
_ ->
|
||||
case emqtt_auth:check(Username, Password) of
|
||||
false ->
|
||||
lager:error_MSG("MQTT login failed - no credentials"),
|
||||
lager:error("MQTT login failed - no credentials"),
|
||||
{?CONNACK_CREDENTIALS, State};
|
||||
true ->
|
||||
lager:info("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
|
||||
%%TODO:
|
||||
%%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
|
||||
lager:info("connect from clientid: ~p, keepalive: ", [ClientId, KeepAlive]),
|
||||
start_keepalive(KeepAlive),
|
||||
emqtt_cm:register(ClientId, self()),
|
||||
{?CONNACK_ACCEPT,
|
||||
State #proto_state{ will_msg = make_will_msg(Var),
|
||||
client_id = ClientId }}
|
||||
end
|
||||
end,
|
||||
lager:info("recv conn...:~p", [ReturnCode]),
|
||||
lager:info("[SENT] MQTT CONNACK: ~p", [ReturnCode]),
|
||||
send_packet(Sock, #mqtt_packet {
|
||||
header = #mqtt_packet_header { type = ?CONNACK },
|
||||
variable = #mqtt_packet_connack{ return_code = ReturnCode }}),
|
||||
|
@ -358,3 +357,7 @@ send_will_msg(#proto_state{will_msg = undefined}) ->
|
|||
send_will_msg(#proto_state{will_msg = WillMsg }) ->
|
||||
emqtt_router:route(WillMsg).
|
||||
|
||||
start_keepalive(0) -> ignore;
|
||||
start_keepalive(Sec) when Sec > 0 ->
|
||||
self() ! {keepalive, start, Sec * 1.5}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue