hibernate

This commit is contained in:
Feng 2015-11-07 22:45:57 +08:00
parent 64a34b216c
commit 21a5f3ee33
2 changed files with 61 additions and 55 deletions

View File

@ -76,8 +76,8 @@ subscribe(CPid, TopicTable) ->
unsubscribe(CPid, Topics) -> unsubscribe(CPid, Topics) ->
gen_server:cast(CPid, {unsubscribe, Topics}). gen_server:cast(CPid, {unsubscribe, Topics}).
init([Connection0, MqttEnv]) -> init([OriginConn, MqttEnv]) ->
{ok, Connection} = Connection0:wait(), {ok, Connection} = OriginConn:wait(),
{PeerHost, PeerPort, PeerName} = {PeerHost, PeerPort, PeerName} =
case Connection:peername() of case Connection:peername() of
{ok, Peer = {Host, Port}} -> {ok, Peer = {Host, Port}} ->
@ -125,7 +125,7 @@ handle_call(info, _From, State = #client_state{connection = Connection,
ProtoInfo = emqttd_protocol:info(ProtoState), ProtoInfo = emqttd_protocol:info(ProtoState),
{ok, SockStats} = Connection:getstat(?SOCK_STATS), {ok, SockStats} = Connection:getstat(?SOCK_STATS),
{reply, lists:append([ClientInfo, [{proto_info, ProtoInfo}, {reply, lists:append([ClientInfo, [{proto_info, ProtoInfo},
{sock_stats, SockStats}]]), State}; {sock_stats, SockStats}]]), State};
handle_call(kick, _From, State) -> handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
@ -173,7 +173,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
shutdown(conflict, State); shutdown(conflict, State);
handle_info(activate_sock, State) -> handle_info(activate_sock, State) ->
noreply(run_socket(State#client_state{conn_state = running})); hibernate(run_socket(State#client_state{conn_state = running}));
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
Size = size(Data), Size = size(Data),
@ -185,7 +185,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
shutdown(Reason, State); shutdown(Reason, State);
handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, ok}, State) ->
noreply(State); hibernate(State);
handle_info({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
shutdown(Reason, State); shutdown(Reason, State);
@ -199,12 +199,12 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
end end
end, end,
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
noreply(State#client_state{keepalive = KeepAlive}); hibernate(State#client_state{keepalive = KeepAlive});
handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
case emqttd_keepalive:check(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
noreply(State#client_state{keepalive = KeepAlive1}); hibernate(State#client_state{keepalive = KeepAlive1});
{error, timeout} -> {error, timeout} ->
?LOG(debug, "Keepalive timeout", [], State), ?LOG(debug, "Keepalive timeout", [], State),
shutdown(keepalive_timeout, State); shutdown(keepalive_timeout, State);
@ -240,21 +240,21 @@ code_change(_OldVsn, State, _Extra) ->
with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) -> with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState), {ok, ProtoState1} = Fun(ProtoState),
noreply(State#client_state{proto_state = ProtoState1}). hibernate(State#client_state{proto_state = ProtoState1}).
with_session(Fun, State = #client_state{proto_state = ProtoState}) -> with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
Fun(emqttd_protocol:session(ProtoState)), Fun(emqttd_protocol:session(ProtoState)),
noreply(State). hibernate(State).
%% receive and parse tcp data %% receive and parse tcp data
received(<<>>, State) -> received(<<>>, State) ->
noreply(State); hibernate(State);
received(Bytes, State = #client_state{parser_fun = ParserFun, received(Bytes, State = #client_state{parser_fun = ParserFun,
packet_opts = PacketOpts, packet_opts = PacketOpts,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
case catch ParserFun(Bytes) of case catch ParserFun(Bytes) of
{more, NewParser} -> {more, NewParser} ->
noreply(run_socket(State#client_state{parser_fun = NewParser})); noreply(run_socket(State#client_state{parser_fun = NewParser}));
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
emqttd_metrics:received(Packet), emqttd_metrics:received(Packet),
@ -300,6 +300,9 @@ run_socket(State = #client_state{connection = Connection}) ->
State#client_state{await_recv = true}. State#client_state{await_recv = true}.
noreply(State) -> noreply(State) ->
{noreply, State}.
hibernate(State) ->
{noreply, State, hibernate}. {noreply, State, hibernate}.
shutdown(Reason, State) -> shutdown(Reason, State) ->

View File

@ -289,26 +289,24 @@ prioritise_info(Msg, _Len, _State) ->
end. end.
handle_call(info, _From, State) -> handle_call(info, _From, State) ->
{reply, sess_info(State), State}; {reply, sess_info(State), State, hibernate};
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
Session = #session{client_id = ClientId, _From, Session = #session{awaiting_rel = AwaitingRel,
awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
await_rel_timeout = Timeout}) ->
case check_awaiting_rel(Session) of case check_awaiting_rel(Session) of
true -> true ->
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; {reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
false -> false ->
lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " ?LOG(critical, "Dropped Qos2 message for too many awaiting_rel: ~p", [Msg], Session),
"for too many awaiting_rel: ~p", [ClientId, Msg]), {reply, {error, dropped}, Session, hibernate}
{reply, {error, dropped}, Session}
end; end;
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(critical, "Unexpected Request: ~p", [Req], State), ?LOG(critical, "Unexpected Request: ~p", [Req], State),
{reply, {error, unsupported_req}, State}. {reply, {error, unsupported_req}, State, hibernate}.
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
@ -318,7 +316,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
case TopicTable -- Subscriptions of case TopicTable -- Subscriptions of
[] -> [] ->
AckFun([Qos || {_, Qos} <- TopicTable]), AckFun([Qos || {_, Qos} <- TopicTable]),
noreply(Session); hibernate(Session);
_ -> _ ->
%% subscribe first and don't care if the subscriptions have been existed %% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
@ -347,10 +345,10 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
[{Topic, Qos} | Acc] [{Topic, Qos} | Acc]
end end
end, Subscriptions, TopicTable), end, Subscriptions, TopicTable),
noreply(Session#session{subscriptions = Subscriptions1}) hibernate(Session#session{subscriptions = Subscriptions1})
end; end;
handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
@ -358,7 +356,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
%% unsubscribe from topic tree %% unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics), ok = emqttd_pubsub:unsubscribe(Topics),
lager:info([{client, ClientId}], "Session(~s) unsubscribe ~p", [ClientId, Topics]), ?LOG(info, "unsubscribe ~p", [Topics], Session),
Subscriptions1 = Subscriptions1 =
lists:foldl(fun(Topic, Acc) -> lists:foldl(fun(Topic, Acc) ->
@ -370,11 +368,11 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
noreply(Session#session{subscriptions = Subscriptions1}); hibernate(Session#session{subscriptions = Subscriptions1});
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
?LOG(warning, "destroyed", [], Session), ?LOG(warning, "destroyed", [], Session),
{stop, {shutdown, destroy}, Session}; shutdown(destroy, Session);
handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
@ -428,17 +426,17 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C
end, Session1, lists:reverse(InflightQ)), end, Session1, lists:reverse(InflightQ)),
%% Dequeue pending messages %% Dequeue pending messages
noreply(dequeue(Session2)); hibernate(dequeue(Session2));
%% PUBACK %% PUBACK
handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) -> handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) ->
case maps:find(PktId, AwaitingAck) of case maps:find(PktId, AwaitingAck) of
{ok, TRef} -> {ok, TRef} ->
cancel_timer(TRef), cancel_timer(TRef),
noreply(dequeue(acked(PktId, Session))); hibernate(dequeue(acked(PktId, Session)));
error -> error ->
?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session), ?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session),
noreply(Session) hibernate(Session)
end; end;
%% PUBREC %% PUBREC
@ -451,10 +449,10 @@ handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck,
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
noreply(dequeue(Session1)); hibernate(dequeue(Session1));
error -> error ->
?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session), ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session),
noreply(Session) hibernate(Session)
end; end;
%% PUBREL %% PUBREL
@ -463,10 +461,10 @@ handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
{ok, {Msg, TRef}} -> {ok, {Msg, TRef}} ->
cancel_timer(TRef), cancel_timer(TRef),
emqttd_pubsub:publish(Msg), emqttd_pubsub:publish(Msg),
noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
error -> error ->
?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session), ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session),
noreply(Session) hibernate(Session)
end; end;
%% PUBCOMP %% PUBCOMP
@ -474,27 +472,27 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp})
case maps:find(PktId, AwaitingComp) of case maps:find(PktId, AwaitingComp) of
{ok, TRef} -> {ok, TRef} ->
cancel_timer(TRef), cancel_timer(TRef),
noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); hibernate(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
error -> error ->
?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session), ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session),
noreply(Session) hibernate(Session)
end; end;
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(critical, "Unexpected Msg: ~p", [Msg], State), ?LOG(critical, "Unexpected Msg: ~p", [Msg], State),
noreply(State). hibernate(State).
%% Queue messages when client is offline %% Queue messages when client is offline
handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
message_queue = Q}) message_queue = Q})
when is_record(Msg, mqtt_message) -> when is_record(Msg, mqtt_message) ->
noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
%% Dispatch qos0 message directly to client %% Dispatch qos0 message directly to client
handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
Session = #session{client_pid = ClientPid}) -> Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg}, ClientPid ! {deliver, Msg},
noreply(Session); hibernate(Session);
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
Session = #session{message_queue = MsgQ}) Session = #session{message_queue = MsgQ})
@ -504,13 +502,13 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
true -> true ->
noreply(deliver(Msg, Session)); noreply(deliver(Msg, Session));
false -> false ->
noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end; end;
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
awaiting_ack = AwaitingAck}) -> awaiting_ack = AwaitingAck}) ->
%% just remove awaiting %% just remove awaiting
noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); hibernate(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ,
awaiting_ack = AwaitingAck}) -> awaiting_ack = AwaitingAck}) ->
@ -518,39 +516,39 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue =
{ok, _TRef} -> {ok, _TRef} ->
case lists:keyfind(PktId, 1, InflightQ) of case lists:keyfind(PktId, 1, InflightQ) of
{_, Msg} -> {_, Msg} ->
noreply(redeliver(Msg, Session)); hibernate(redeliver(Msg, Session));
false -> false ->
?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session), ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session),
noreply(dequeue(Session)) hibernate(dequeue(Session))
end; end;
error -> error ->
?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session), ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session),
noreply(Session) hibernate(Session)
end; end;
handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
case maps:find(PktId, AwaitingRel) of case maps:find(PktId, AwaitingRel) of
{ok, {_Msg, _TRef}} -> {ok, {_Msg, _TRef}} ->
?LOG(error, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session), ?LOG(warning, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session),
noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
error -> error ->
?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session), ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session),
noreply(Session) hibernate(Session)
end; end;
handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) -> handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) ->
case maps:find(PktId, Awaiting) of case maps:find(PktId, Awaiting) of
{ok, _TRef} -> {ok, _TRef} ->
?LOG(error, "Awaiting PUBCOMP Timout: ~p", [PktId], Session), ?LOG(warning, "Awaiting PUBCOMP Timout: ~p", [PktId], Session),
noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); hibernate(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
error -> error ->
?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session), ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session),
noreply(Session) hibernate(Session)
end; end;
handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
noreply(start_collector(Session)); hibernate(start_collector(Session));
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
@ -561,22 +559,21 @@ handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = fals
expired_after = Expires}) -> expired_after = Expires}) ->
?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session), ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session),
TRef = timer(Expires, expired), TRef = timer(Expires, expired),
erlang:garbage_collect(), %%TODO: ??? hibernate(Session#session{client_pid = undefined, expired_timer = TRef});
noreply(Session#session{client_pid = undefined, expired_timer = TRef});
handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) -> handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) ->
?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
[ClientPid, Pid, Reason], Session), [ClientPid, Pid, Reason], Session),
noreply(Session); hibernate(Session);
handle_info(expired, Session) -> handle_info(expired, Session) ->
?LOG(info, "expired, shutdown now.", [], Session), ?LOG(info, "expired, shutdown now.", [], Session),
{stop, {shutdown, expired}, Session}; shutdown(expired, Session);
handle_info(Info, Session) -> handle_info(Info, Session) ->
?LOG(critical, "Unexpected info: ~p", [Info], Session), ?LOG(critical, "Unexpected info: ~p", [Info], Session),
{noreply, Session}. hibernate(Session).
terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
emqttd_sm:unregister_session(CleanSess, ClientId). emqttd_sm:unregister_session(CleanSess, ClientId).
@ -693,8 +690,14 @@ cancel_timer(Ref) ->
catch erlang:cancel_timer(Ref). catch erlang:cancel_timer(Ref).
noreply(State) -> noreply(State) ->
{noreply, State}.
hibernate(State) ->
{noreply, State, hibernate}. {noreply, State, hibernate}.
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
start_collector(Session = #session{collect_interval = 0}) -> start_collector(Session = #session{collect_interval = 0}) ->
Session; Session;