From 21a5f3ee333f05e43784d9ae6c6d188fb6e9da1f Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 7 Nov 2015 22:45:57 +0800 Subject: [PATCH] hibernate --- src/emqttd_client.erl | 25 +++++++----- src/emqttd_session.erl | 91 ++++++++++++++++++++++-------------------- 2 files changed, 61 insertions(+), 55 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index a684737a5..99e09422a 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -76,8 +76,8 @@ subscribe(CPid, TopicTable) -> unsubscribe(CPid, Topics) -> gen_server:cast(CPid, {unsubscribe, Topics}). -init([Connection0, MqttEnv]) -> - {ok, Connection} = Connection0:wait(), +init([OriginConn, MqttEnv]) -> + {ok, Connection} = OriginConn:wait(), {PeerHost, PeerPort, PeerName} = case Connection:peername() of {ok, Peer = {Host, Port}} -> @@ -125,7 +125,7 @@ handle_call(info, _From, State = #client_state{connection = Connection, ProtoInfo = emqttd_protocol:info(ProtoState), {ok, SockStats} = Connection:getstat(?SOCK_STATS), {reply, lists:append([ClientInfo, [{proto_info, ProtoInfo}, - {sock_stats, SockStats}]]), State}; + {sock_stats, SockStats}]]), State}; handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; @@ -173,7 +173,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> shutdown(conflict, State); handle_info(activate_sock, State) -> - noreply(run_socket(State#client_state{conn_state = running})); + hibernate(run_socket(State#client_state{conn_state = running})); handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = size(Data), @@ -185,7 +185,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> shutdown(Reason, State); handle_info({inet_reply, _Sock, ok}, State) -> - noreply(State); + hibernate(State); handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); @@ -199,12 +199,12 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con end end, KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), - noreply(State#client_state{keepalive = KeepAlive}); + hibernate(State#client_state{keepalive = KeepAlive}); handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - noreply(State#client_state{keepalive = KeepAlive1}); + hibernate(State#client_state{keepalive = KeepAlive1}); {error, timeout} -> ?LOG(debug, "Keepalive timeout", [], State), shutdown(keepalive_timeout, State); @@ -240,21 +240,21 @@ code_change(_OldVsn, State, _Extra) -> with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), - noreply(State#client_state{proto_state = ProtoState1}). + hibernate(State#client_state{proto_state = ProtoState1}). with_session(Fun, State = #client_state{proto_state = ProtoState}) -> Fun(emqttd_protocol:session(ProtoState)), - noreply(State). + hibernate(State). %% receive and parse tcp data received(<<>>, State) -> - noreply(State); + hibernate(State); received(Bytes, State = #client_state{parser_fun = ParserFun, packet_opts = PacketOpts, proto_state = ProtoState}) -> case catch ParserFun(Bytes) of - {more, NewParser} -> + {more, NewParser} -> noreply(run_socket(State#client_state{parser_fun = NewParser})); {ok, Packet, Rest} -> emqttd_metrics:received(Packet), @@ -300,6 +300,9 @@ run_socket(State = #client_state{connection = Connection}) -> State#client_state{await_recv = true}. noreply(State) -> + {noreply, State}. + +hibernate(State) -> {noreply, State, hibernate}. shutdown(Reason, State) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 98c6df3b8..7f1df1baa 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -289,26 +289,24 @@ prioritise_info(Msg, _Len, _State) -> end. handle_call(info, _From, State) -> - {reply, sess_info(State), State}; + {reply, sess_info(State), State, hibernate}; -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, - Session = #session{client_id = ClientId, - awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, + _From, Session = #session{awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> case check_awaiting_rel(Session) of true -> TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; false -> - lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " - "for too many awaiting_rel: ~p", [ClientId, Msg]), - {reply, {error, dropped}, Session} + ?LOG(critical, "Dropped Qos2 message for too many awaiting_rel: ~p", [Msg], Session), + {reply, {error, dropped}, Session, hibernate} end; handle_call(Req, _From, State) -> ?LOG(critical, "Unexpected Request: ~p", [Req], State), - {reply, {error, unsupported_req}, State}. + {reply, {error, unsupported_req}, State, hibernate}. handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> @@ -318,7 +316,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli case TopicTable -- Subscriptions of [] -> AckFun([Qos || {_, Qos} <- TopicTable]), - noreply(Session); + hibernate(Session); _ -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), @@ -347,10 +345,10 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli [{Topic, Qos} | Acc] end end, Subscriptions, TopicTable), - noreply(Session#session{subscriptions = Subscriptions1}) + hibernate(Session#session{subscriptions = Subscriptions1}) end; -handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, +handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), @@ -358,7 +356,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, %% unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics), - lager:info([{client, ClientId}], "Session(~s) unsubscribe ~p", [ClientId, Topics]), + ?LOG(info, "unsubscribe ~p", [Topics], Session), Subscriptions1 = lists:foldl(fun(Topic, Acc) -> @@ -370,11 +368,11 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, end end, Subscriptions, Topics), - noreply(Session#session{subscriptions = Subscriptions1}); + hibernate(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> ?LOG(warning, "destroyed", [], Session), - {stop, {shutdown, destroy}, Session}; + shutdown(destroy, Session); handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, client_pid = OldClientPid, @@ -428,17 +426,17 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C end, Session1, lists:reverse(InflightQ)), %% Dequeue pending messages - noreply(dequeue(Session2)); + hibernate(dequeue(Session2)); %% PUBACK handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of {ok, TRef} -> cancel_timer(TRef), - noreply(dequeue(acked(PktId, Session))); + hibernate(dequeue(acked(PktId, Session))); error -> ?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; %% PUBREC @@ -451,10 +449,10 @@ handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck, TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), - noreply(dequeue(Session1)); + hibernate(dequeue(Session1)); error -> ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; %% PUBREL @@ -463,10 +461,10 @@ handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> {ok, {Msg, TRef}} -> cancel_timer(TRef), emqttd_pubsub:publish(Msg), - noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); + hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; %% PUBCOMP @@ -474,27 +472,27 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) case maps:find(PktId, AwaitingComp) of {ok, TRef} -> cancel_timer(TRef), - noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); + hibernate(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); error -> ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_cast(Msg, State) -> ?LOG(critical, "Unexpected Msg: ~p", [Msg], State), - noreply(State). + hibernate(State). %% Queue messages when client is offline handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, message_queue = Q}) when is_record(Msg, mqtt_message) -> - noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); %% Dispatch qos0 message directly to client handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, - noreply(Session); + hibernate(Session); handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{message_queue = MsgQ}) @@ -504,13 +502,13 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, true -> noreply(deliver(Msg, Session)); false -> - noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end; handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> %% just remove awaiting - noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); + hibernate(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> @@ -518,39 +516,39 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = {ok, _TRef} -> case lists:keyfind(PktId, 1, InflightQ) of {_, Msg} -> - noreply(redeliver(Msg, Session)); + hibernate(redeliver(Msg, Session)); false -> ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session), - noreply(dequeue(Session)) + hibernate(dequeue(Session)) end; error -> ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:find(PktId, AwaitingRel) of {ok, {_Msg, _TRef}} -> - ?LOG(error, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session), - noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); + ?LOG(warning, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session), + hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) -> case maps:find(PktId, Awaiting) of {ok, _TRef} -> - ?LOG(error, "Awaiting PUBCOMP Timout: ~p", [PktId], Session), - noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); + ?LOG(warning, "Awaiting PUBCOMP Timout: ~p", [PktId], Session), + hibernate(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); error -> ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), - noreply(start_collector(Session)); + hibernate(start_collector(Session)); handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> @@ -561,22 +559,21 @@ handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = fals expired_after = Expires}) -> ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session), TRef = timer(Expires, expired), - erlang:garbage_collect(), %%TODO: ??? - noreply(Session#session{client_pid = undefined, expired_timer = TRef}); + hibernate(Session#session{client_pid = undefined, expired_timer = TRef}); handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) -> ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", [ClientPid, Pid, Reason], Session), - noreply(Session); + hibernate(Session); handle_info(expired, Session) -> ?LOG(info, "expired, shutdown now.", [], Session), - {stop, {shutdown, expired}, Session}; + shutdown(expired, Session); handle_info(Info, Session) -> ?LOG(critical, "Unexpected info: ~p", [Info], Session), - {noreply, Session}. + hibernate(Session). terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> emqttd_sm:unregister_session(CleanSess, ClientId). @@ -693,8 +690,14 @@ cancel_timer(Ref) -> catch erlang:cancel_timer(Ref). noreply(State) -> + {noreply, State}. + +hibernate(State) -> {noreply, State, hibernate}. +shutdown(Reason, State) -> + {stop, {shutdown, Reason}, State}. + start_collector(Session = #session{collect_interval = 0}) -> Session;