From 64a34b216c5088e93ea102a5c729c538edf7c573 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 7 Nov 2015 20:46:31 +0800 Subject: [PATCH 1/3] format --- src/emqttd.erl | 1 - src/emqttd_acl_internal.erl | 1 - src/emqttd_alarm.erl | 1 - src/emqttd_bridge.erl | 1 - src/emqttd_bridge_sup.erl | 1 - src/emqttd_cli.erl | 1 - src/emqttd_ctl.erl | 1 - src/emqttd_dist.erl | 1 - src/emqttd_guid.erl | 1 - src/emqttd_http.erl | 1 - src/emqttd_keepalive.erl | 1 - src/emqttd_metrics.erl | 1 - src/emqttd_mnesia.erl | 1 - src/emqttd_mod_autosub.erl | 1 - src/emqttd_mod_presence.erl | 1 - src/emqttd_mod_rewrite.erl | 1 - src/emqttd_mod_sup.erl | 1 - src/emqttd_mqueue.erl | 1 - src/emqttd_opts.erl | 1 - src/emqttd_packet.erl | 1 - src/emqttd_plugins.erl | 1 - src/emqttd_pooler_sup.erl | 1 - src/emqttd_pubsub_sup.erl | 1 - src/emqttd_retained.erl | 4 ++-- src/emqttd_serialiser.erl | 1 - src/emqttd_sm_helper.erl | 1 - src/emqttd_sm_sup.erl | 1 - src/emqttd_stats.erl | 1 - src/emqttd_sup.erl | 1 - src/emqttd_sysmon.erl | 1 - src/emqttd_topic.erl | 1 - src/emqttd_trace.erl | 1 - src/emqttd_vm.erl | 41 ++++++++++++++++++------------------- 33 files changed, 22 insertions(+), 54 deletions(-) diff --git a/src/emqttd.erl b/src/emqttd.erl index eeb5bab4d..540a11fd1 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd). -author("Feng Lee "). diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index c2ce3184c..b6dc0b81b 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_acl_internal). -author("Feng Lee "). diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index fc3ce729e..4f176af8f 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_alarm). -author("Feng Lee "). diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index cb6349803..12126dcd6 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_bridge). -author("Feng Lee "). diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index c780a6f6b..35003f152 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_bridge_sup). -author("Feng Lee "). diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 59441d516..340125c09 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_cli). -author("Feng Lee "). diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 5d34bd449..bde8344b0 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_ctl). -author("Feng Lee "). diff --git a/src/emqttd_dist.erl b/src/emqttd_dist.erl index 25e977660..fc440bac9 100644 --- a/src/emqttd_dist.erl +++ b/src/emqttd_dist.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_dist). -import(lists, [concat/1]). diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index bf668a384..a97ba1f15 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -35,7 +35,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_guid). -author("Feng Lee "). diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 48724e1e0..5164f8adf 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_http). -author("Feng Lee "). diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index 6b9042d13..bce8b6b6f 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -23,7 +23,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_keepalive). -author("Feng Lee "). diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index 29dd15a3d..5f36aa042 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_metrics). -author("Feng Lee "). diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 3f94071ed..82fece19b 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_mnesia). -author("Feng Lee "). diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_autosub.erl index a1c8d1649..00dc3dc0c 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_autosub.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_mod_autosub). -author("Feng Lee "). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index da3a05c00..81f7348d4 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_mod_presence). -author("Feng Lee "). diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index 09357211c..5ddf2b2ee 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_mod_rewrite). -author("Feng Lee "). diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index a2b1a717f..8fa3ced59 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_mod_sup). -author("Feng Lee "). diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 398db256a..59bb2883b 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -47,7 +47,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_mqueue). -author("Feng Lee "). diff --git a/src/emqttd_opts.erl b/src/emqttd_opts.erl index 938d002cc..dfd5b74b6 100644 --- a/src/emqttd_opts.erl +++ b/src/emqttd_opts.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_opts). -author("Feng Lee "). diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index 0bc5190a8..286289af3 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_packet). -author("Feng Lee "). diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index b02b1d427..4bd86592e 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_plugins). -author("Feng Lee "). diff --git a/src/emqttd_pooler_sup.erl b/src/emqttd_pooler_sup.erl index e503f7c1a..d5f3a0ee5 100644 --- a/src/emqttd_pooler_sup.erl +++ b/src/emqttd_pooler_sup.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_pooler_sup). -author("Feng Lee "). diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index e1c5095cf..a0293c226 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_pubsub_sup). -author("Feng Lee "). diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index 685006200..bf5ad51b0 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -21,11 +21,11 @@ %%%----------------------------------------------------------------------------- %%% @doc %%% MQTT retained message storage. +%%% +%%% TODO: should match topic tree %%% %%% @end %%%----------------------------------------------------------------------------- - -%% TODO: should match topic tree -module(emqttd_retained). -author("Feng Lee "). diff --git a/src/emqttd_serialiser.erl b/src/emqttd_serialiser.erl index 85d3239b1..471904bce 100644 --- a/src/emqttd_serialiser.erl +++ b/src/emqttd_serialiser.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_serialiser). -author("Feng Lee "). diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 509419335..43a2da90b 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_sm_helper). -author("Feng Lee "). diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 2c49ef01e..d75d0f3bf 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_sm_sup). -author("Feng Lee "). diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index c7c7eaa5c..6f382259b 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_stats). -author("Feng Lee "). diff --git a/src/emqttd_sup.erl b/src/emqttd_sup.erl index ceeb1fc12..d6bd648fe 100644 --- a/src/emqttd_sup.erl +++ b/src/emqttd_sup.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_sup). -author("Feng Lee "). diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 8ba4ff569..31c7a0d3e 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_sysmon). -author("Feng Lee "). diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 8e2b22973..07ed4482e 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_topic). -author("Feng Lee "). diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 8edcebad0..2bd430b05 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_trace). -author("Feng Lee "). diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index eda247247..5d0619631 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -1,30 +1,29 @@ -%%------------------------------------------------------------------------------ +%%%----------------------------------------------------------------------------- %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- %%% @doc %%% emqttd erlang vm. %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_vm). -author('huangdan'). From 21a5f3ee333f05e43784d9ae6c6d188fb6e9da1f Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 7 Nov 2015 22:45:57 +0800 Subject: [PATCH 2/3] hibernate --- src/emqttd_client.erl | 25 +++++++----- src/emqttd_session.erl | 91 ++++++++++++++++++++++-------------------- 2 files changed, 61 insertions(+), 55 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index a684737a5..99e09422a 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -76,8 +76,8 @@ subscribe(CPid, TopicTable) -> unsubscribe(CPid, Topics) -> gen_server:cast(CPid, {unsubscribe, Topics}). -init([Connection0, MqttEnv]) -> - {ok, Connection} = Connection0:wait(), +init([OriginConn, MqttEnv]) -> + {ok, Connection} = OriginConn:wait(), {PeerHost, PeerPort, PeerName} = case Connection:peername() of {ok, Peer = {Host, Port}} -> @@ -125,7 +125,7 @@ handle_call(info, _From, State = #client_state{connection = Connection, ProtoInfo = emqttd_protocol:info(ProtoState), {ok, SockStats} = Connection:getstat(?SOCK_STATS), {reply, lists:append([ClientInfo, [{proto_info, ProtoInfo}, - {sock_stats, SockStats}]]), State}; + {sock_stats, SockStats}]]), State}; handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; @@ -173,7 +173,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> shutdown(conflict, State); handle_info(activate_sock, State) -> - noreply(run_socket(State#client_state{conn_state = running})); + hibernate(run_socket(State#client_state{conn_state = running})); handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = size(Data), @@ -185,7 +185,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> shutdown(Reason, State); handle_info({inet_reply, _Sock, ok}, State) -> - noreply(State); + hibernate(State); handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); @@ -199,12 +199,12 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con end end, KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), - noreply(State#client_state{keepalive = KeepAlive}); + hibernate(State#client_state{keepalive = KeepAlive}); handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - noreply(State#client_state{keepalive = KeepAlive1}); + hibernate(State#client_state{keepalive = KeepAlive1}); {error, timeout} -> ?LOG(debug, "Keepalive timeout", [], State), shutdown(keepalive_timeout, State); @@ -240,21 +240,21 @@ code_change(_OldVsn, State, _Extra) -> with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), - noreply(State#client_state{proto_state = ProtoState1}). + hibernate(State#client_state{proto_state = ProtoState1}). with_session(Fun, State = #client_state{proto_state = ProtoState}) -> Fun(emqttd_protocol:session(ProtoState)), - noreply(State). + hibernate(State). %% receive and parse tcp data received(<<>>, State) -> - noreply(State); + hibernate(State); received(Bytes, State = #client_state{parser_fun = ParserFun, packet_opts = PacketOpts, proto_state = ProtoState}) -> case catch ParserFun(Bytes) of - {more, NewParser} -> + {more, NewParser} -> noreply(run_socket(State#client_state{parser_fun = NewParser})); {ok, Packet, Rest} -> emqttd_metrics:received(Packet), @@ -300,6 +300,9 @@ run_socket(State = #client_state{connection = Connection}) -> State#client_state{await_recv = true}. noreply(State) -> + {noreply, State}. + +hibernate(State) -> {noreply, State, hibernate}. shutdown(Reason, State) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 98c6df3b8..7f1df1baa 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -289,26 +289,24 @@ prioritise_info(Msg, _Len, _State) -> end. handle_call(info, _From, State) -> - {reply, sess_info(State), State}; + {reply, sess_info(State), State, hibernate}; -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, - Session = #session{client_id = ClientId, - awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, + _From, Session = #session{awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> case check_awaiting_rel(Session) of true -> TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; false -> - lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " - "for too many awaiting_rel: ~p", [ClientId, Msg]), - {reply, {error, dropped}, Session} + ?LOG(critical, "Dropped Qos2 message for too many awaiting_rel: ~p", [Msg], Session), + {reply, {error, dropped}, Session, hibernate} end; handle_call(Req, _From, State) -> ?LOG(critical, "Unexpected Request: ~p", [Req], State), - {reply, {error, unsupported_req}, State}. + {reply, {error, unsupported_req}, State, hibernate}. handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> @@ -318,7 +316,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli case TopicTable -- Subscriptions of [] -> AckFun([Qos || {_, Qos} <- TopicTable]), - noreply(Session); + hibernate(Session); _ -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), @@ -347,10 +345,10 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli [{Topic, Qos} | Acc] end end, Subscriptions, TopicTable), - noreply(Session#session{subscriptions = Subscriptions1}) + hibernate(Session#session{subscriptions = Subscriptions1}) end; -handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, +handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), @@ -358,7 +356,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, %% unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics), - lager:info([{client, ClientId}], "Session(~s) unsubscribe ~p", [ClientId, Topics]), + ?LOG(info, "unsubscribe ~p", [Topics], Session), Subscriptions1 = lists:foldl(fun(Topic, Acc) -> @@ -370,11 +368,11 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, end end, Subscriptions, Topics), - noreply(Session#session{subscriptions = Subscriptions1}); + hibernate(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> ?LOG(warning, "destroyed", [], Session), - {stop, {shutdown, destroy}, Session}; + shutdown(destroy, Session); handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, client_pid = OldClientPid, @@ -428,17 +426,17 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C end, Session1, lists:reverse(InflightQ)), %% Dequeue pending messages - noreply(dequeue(Session2)); + hibernate(dequeue(Session2)); %% PUBACK handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of {ok, TRef} -> cancel_timer(TRef), - noreply(dequeue(acked(PktId, Session))); + hibernate(dequeue(acked(PktId, Session))); error -> ?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; %% PUBREC @@ -451,10 +449,10 @@ handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck, TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), - noreply(dequeue(Session1)); + hibernate(dequeue(Session1)); error -> ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; %% PUBREL @@ -463,10 +461,10 @@ handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> {ok, {Msg, TRef}} -> cancel_timer(TRef), emqttd_pubsub:publish(Msg), - noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); + hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; %% PUBCOMP @@ -474,27 +472,27 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) case maps:find(PktId, AwaitingComp) of {ok, TRef} -> cancel_timer(TRef), - noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); + hibernate(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); error -> ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_cast(Msg, State) -> ?LOG(critical, "Unexpected Msg: ~p", [Msg], State), - noreply(State). + hibernate(State). %% Queue messages when client is offline handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, message_queue = Q}) when is_record(Msg, mqtt_message) -> - noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); %% Dispatch qos0 message directly to client handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, - noreply(Session); + hibernate(Session); handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{message_queue = MsgQ}) @@ -504,13 +502,13 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, true -> noreply(deliver(Msg, Session)); false -> - noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end; handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> %% just remove awaiting - noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); + hibernate(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> @@ -518,39 +516,39 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = {ok, _TRef} -> case lists:keyfind(PktId, 1, InflightQ) of {_, Msg} -> - noreply(redeliver(Msg, Session)); + hibernate(redeliver(Msg, Session)); false -> ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session), - noreply(dequeue(Session)) + hibernate(dequeue(Session)) end; error -> ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:find(PktId, AwaitingRel) of {ok, {_Msg, _TRef}} -> - ?LOG(error, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session), - noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); + ?LOG(warning, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session), + hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) -> case maps:find(PktId, Awaiting) of {ok, _TRef} -> - ?LOG(error, "Awaiting PUBCOMP Timout: ~p", [PktId], Session), - noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); + ?LOG(warning, "Awaiting PUBCOMP Timout: ~p", [PktId], Session), + hibernate(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); error -> ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session), - noreply(Session) + hibernate(Session) end; handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), - noreply(start_collector(Session)); + hibernate(start_collector(Session)); handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> @@ -561,22 +559,21 @@ handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = fals expired_after = Expires}) -> ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session), TRef = timer(Expires, expired), - erlang:garbage_collect(), %%TODO: ??? - noreply(Session#session{client_pid = undefined, expired_timer = TRef}); + hibernate(Session#session{client_pid = undefined, expired_timer = TRef}); handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) -> ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", [ClientPid, Pid, Reason], Session), - noreply(Session); + hibernate(Session); handle_info(expired, Session) -> ?LOG(info, "expired, shutdown now.", [], Session), - {stop, {shutdown, expired}, Session}; + shutdown(expired, Session); handle_info(Info, Session) -> ?LOG(critical, "Unexpected info: ~p", [Info], Session), - {noreply, Session}. + hibernate(Session). terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> emqttd_sm:unregister_session(CleanSess, ClientId). @@ -693,8 +690,14 @@ cancel_timer(Ref) -> catch erlang:cancel_timer(Ref). noreply(State) -> + {noreply, State}. + +hibernate(State) -> {noreply, State, hibernate}. +shutdown(Reason, State) -> + {stop, {shutdown, Reason}, State}. + start_collector(Session = #session{collect_interval = 0}) -> Session; From e25856e9a1682fffbd6a4c6cd2cf33f8580fc4a6 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 7 Nov 2015 23:09:31 +0800 Subject: [PATCH 3/3] fix issue #374 --- src/emqttd_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 7f1df1baa..193b6b3fe 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -174,7 +174,7 @@ destroy(SessPid, ClientId) -> %%------------------------------------------------------------------------------ -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok. subscribe(SessPid, TopicTable) -> - subscribe(SessPid, TopicTable, fun(_) -> ok end). + gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}). -spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok. subscribe(SessPid, PacketId, TopicTable) ->