From 3cdf2377c8d69374770c42870a1a466d5f158a81 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 21 Feb 2017 18:45:40 +0800 Subject: [PATCH] Use gen_server2 to reduce the CPU/memory usage --- src/emqttd_client.erl | 95 +++++++++++++------------ src/emqttd_misc.erl | 4 +- src/emqttd_protocol.erl | 2 + src/emqttd_session.erl | 150 ++++++++++++++++++--------------------- src/emqttd_ws_client.erl | 86 ++++++++++++---------- 5 files changed, 174 insertions(+), 163 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 10c6f6edc..b48e12f39 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -18,7 +18,7 @@ -module(emqttd_client). --behaviour(gen_server). +-behaviour(gen_server2). -author("Feng Lee "). @@ -48,11 +48,13 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). +%% gen_server2 Callbacks +-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]). + %% Client State -record(client_state, {connection, connname, peername, peerhost, peerport, await_recv, conn_state, rate_limit, parser_fun, - proto_state, packet_opts, keepalive, enable_stats, - stats_timer}). + proto_state, packet_opts, keepalive, enable_stats}). -define(INFO_KEYS, [connname, peername, peerhost, peerport, await_recv, conn_state]). @@ -65,19 +67,19 @@ start_link(Conn, Env) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. info(CPid) -> - gen_server:call(CPid, info). + gen_server2:call(CPid, info). stats(CPid) -> - gen_server:call(CPid, stats). + gen_server2:call(CPid, stats). kick(CPid) -> - gen_server:call(CPid, kick). + gen_server2:call(CPid, kick). set_rate_limit(Cpid, Rl) -> - gen_server:call(Cpid, {set_rate_limit, Rl}). + gen_server2:call(Cpid, {set_rate_limit, Rl}). get_rate_limit(Cpid) -> - gen_server:call(Cpid, get_rate_limit). + gen_server2:call(Cpid, get_rate_limit). subscribe(CPid, TopicTable) -> CPid ! {subscribe, TopicTable}. @@ -135,30 +137,41 @@ init([Conn0, Env]) -> packet_opts = Env, enable_stats = EnableStats}), IdleTimout = get_value(client_idle_timeout, Env, 30000), - gen_server:enter_loop(?MODULE, [], maybe_enable_stats(State), IdleTimout). + gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, + {backoff, 1000, 1000, 5000}). + +prioritise_call(Msg, _From, _Len, _State) -> + case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. + +prioritise_info(Msg, _Len, _State) -> + case Msg of {redeliver, _} -> 5; _ -> 0 end. + +handle_pre_hibernate(State = #client_state{connname = Connname}) -> + io:format("Client(~s) will hibernate!~n", [Connname]), + {hibernate, emit_stats(State)}. handle_call(info, From, State = #client_state{proto_state = ProtoState}) -> ProtoInfo = emqttd_protocol:info(ProtoState), ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS), - {reply, Stats, _} = handle_call(stats, From, State), - {reply, lists:append([ClientInfo, ProtoInfo, Stats]), State}; + {reply, Stats, _, _} = handle_call(stats, From, State), + reply(lists:append([ClientInfo, ProtoInfo, Stats]), State); handle_call(stats, _From, State = #client_state{proto_state = ProtoState}) -> - {reply, lists:append([emqttd_misc:proc_stats(), - emqttd_protocol:stats(ProtoState), - sock_stats(State)]), State}; + reply(lists:append([emqttd_misc:proc_stats(), + emqttd_protocol:stats(ProtoState), + sock_stats(State)]), State); handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; handle_call({set_rate_limit, Rl}, _From, State) -> - {reply, ok, State#client_state{rate_limit = Rl}}; + reply(ok, State#client_state{rate_limit = Rl}); handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) -> - {reply, Rl, State}; + reply(Rl, State); handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> - {reply, emqttd_protocol:session(ProtoState), State}; + reply(emqttd_protocol:session(ProtoState), State); handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). @@ -198,12 +211,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) -> emqttd_protocol:pubrel(PacketId, ProtoState) end, State); +handle_info(emit_stats, State) -> + {noreply, emit_stats(State), hibernate}; + handle_info(timeout, State) -> shutdown(idle_timeout, State); -handle_info({timeout, _Timer, emit_stats}, State) -> - hibernate(maybe_enable_stats(emit_stats(State))); - %% Fix issue #535 handle_info({shutdown, Error}, State) -> shutdown(Error, State); @@ -213,7 +226,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> shutdown(conflict, State); handle_info(activate_sock, State) -> - hibernate(run_socket(State#client_state{conn_state = running})); + {noreply, run_socket(State#client_state{conn_state = running}), hibernate}; handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = iolist_size(Data), @@ -239,12 +252,12 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con end end, KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), - {noreply, stats_by_keepalive(State#client_state{keepalive = KeepAlive})}; + {noreply, State#client_state{keepalive = KeepAlive}, hibernate}; handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - hibernate(emit_stats(State#client_state{keepalive = KeepAlive1})); + {noreply, State#client_state{keepalive = KeepAlive1}, hibernate}; {error, timeout} -> ?LOG(debug, "Keepalive timeout", [], State), shutdown(keepalive_timeout, State); @@ -279,7 +292,7 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse tcp data received(<<>>, State) -> - hibernate(State); + {noreply, State, hibernate}; received(Bytes, State = #client_state{parser_fun = ParserFun, packet_opts = PacketOpts, @@ -332,33 +345,25 @@ run_socket(State = #client_state{connection = Conn}) -> with_proto(Fun, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}. + {noreply, State#client_state{proto_state = ProtoState1}, hibernate}. -maybe_enable_stats(State = #client_state{enable_stats = false}) -> - State; -maybe_enable_stats(State = #client_state{enable_stats = keepalive}) -> - State; -maybe_enable_stats(State = #client_state{enable_stats = Interval}) -> - State#client_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}. - -stats_by_keepalive(State) -> - State#client_state{enable_stats = keepalive}. - -emit_stats(State = #client_state{enable_stats = false}) -> - State; emit_stats(State = #client_state{proto_state = ProtoState}) -> - {reply, Stats, _} = handle_call(stats, undefined, State), - emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats), + emit_stats(emqttd_protocol:clientid(ProtoState), State). + +emit_stats(_ClientId, State = #client_state{enable_stats = false}) -> + State; +emit_stats(undefined, State) -> + State; +emit_stats(ClientId, State) -> + {reply, Stats, _, _} = handle_call(stats, undefined, State), + emqttd_stats:set_client_stats(ClientId, Stats), State. sock_stats(#client_state{connection = Conn}) -> - case Conn:getstat(?SOCK_STATS) of - {ok, Ss} -> Ss; - {error, _} -> [] - end. + case Conn:getstat(?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end. -hibernate(State) -> - {noreply, State, hibernate}. +reply(Reply, State) -> + {reply, Reply, State, hibernate}. shutdown(Reason, State) -> stop({shutdown, Reason}, State). diff --git a/src/emqttd_misc.erl b/src/emqttd_misc.erl index 8111264cc..69cba9d8f 100644 --- a/src/emqttd_misc.erl +++ b/src/emqttd_misc.erl @@ -19,7 +19,7 @@ -author("Feng Lee "). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_stats/0, proc_stats/1]). + proc_stats/0, proc_stats/1, inc_stats/1]). %% @doc Merge Options merge_opts(Defaults, Options) -> @@ -61,3 +61,5 @@ proc_stats(Pid) -> {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats), [{mailbox_len, V} | Stats1]. +inc_stats(Key) -> put(Key, get(Key) + 1). + diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index a9cc37778..da235a44e 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -187,6 +187,8 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> emqttd_cm:reg(client(State2)), %% Start keepalive start_keepalive(KeepAlive), + %% Emit Stats + self() ! emit_stats, %% ACCEPT {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index eb3756460..26d7c7a83 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -74,7 +74,8 @@ terminate/2, code_change/3]). %% gen_server2 Message Priorities --export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3, + handle_pre_hibernate/1]). -record(state, { @@ -114,7 +115,7 @@ max_inflight = 32 :: non_neg_integer(), %% Retry interval for redelivering QoS1/2 messages - retry_interval = 20000 :: pos_integer(), + retry_interval = 20000 :: timeout(), %% Retry Timer retry_timer :: reference(), @@ -129,7 +130,7 @@ awaiting_rel :: map(), %% Awaiting PUBREL timeout - await_rel_timeout = 20000 :: pos_integer(), + await_rel_timeout = 20000 :: timeout(), %% Max Packets that Awaiting PUBREL max_awaiting_rel = 100 :: non_neg_integer(), @@ -138,16 +139,13 @@ await_rel_timer :: reference(), %% Session Expiry Interval - expiry_interval = 7200000 :: pos_integer(), + expiry_interval = 7200000 :: timeout(), %% Expired Timer expiry_timer :: reference(), %% Enable Stats - enable_stats :: false | pos_integer(), - - %% Stats Timer - stats_timer :: reference(), + enable_stats :: boolean(), created_at :: erlang:timestamp() }). @@ -301,10 +299,9 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> expiry_interval = get_value(expiry_interval, Env), enable_stats = EnableStats, created_at = os:timestamp()}, - emqttd_stats:set_session_stats(ClientId, stats(State)), emqttd_sm:register_session(ClientId, CleanSess, info(State)), emqttd_hooks:run('session.created', [ClientId, Username]), - {ok, State, hibernate, {backoff, 1000, 1000, 5000}, ?MODULE}. + {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}, ?MODULE}. init_stats(Keys) -> lists:foreach(fun(K) -> put(K, 0) end, Keys). @@ -336,10 +333,13 @@ prioritise_info(Msg, _Len, _State) -> _ -> 0 end. -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, - _From, State = #state{awaiting_rel = AwaitingRel, - await_rel_timer = Timer, - await_rel_timeout = Timeout}) -> +handle_pre_hibernate(State) -> + {hibernate, emit_stats(State)}. + +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, _From, + State = #state{awaiting_rel = AwaitingRel, + await_rel_timer = Timer, + await_rel_timeout = Timeout}) -> case is_awaiting_full(State) of false -> State1 = case Timer == undefined of @@ -413,51 +413,55 @@ handle_cast({unsubscribe, _From, TopicTable}, %% PUBACK: handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) -> - case Inflight:contain(PacketId) of - true -> - noreply(dequeue(acked(puback, PacketId, State))); - false -> - ?LOG(warning, "The PUBACK ~p is not inflight: ~p", - [PacketId, Inflight:window()], State), - emqttd_metrics:inc('packets/puback/missed'), - noreply(State) - end; + {noreply, + case Inflight:contain(PacketId) of + true -> + dequeue(acked(puback, PacketId, State)); + false -> + ?LOG(warning, "PUBACK ~p missed inflight: ~p", + [PacketId, Inflight:window()], State), + emqttd_metrics:inc('packets/puback/missed'), + State + end, hibernate}; %% PUBREC: handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) -> - case Inflight:contain(PacketId) of - true -> - noreply(acked(pubrec, PacketId, State)); - false -> - ?LOG(warning, "The PUBREC ~p is not inflight: ~p", - [PacketId, Inflight:window()], State), - emqttd_metrics:inc('packets/pubrec/missed'), - noreply(State) - end; + {noreply, + case Inflight:contain(PacketId) of + true -> + acked(pubrec, PacketId, State); + false -> + ?LOG(warning, "PUBREC ~p missed inflight: ~p", + [PacketId, Inflight:window()], State), + emqttd_metrics:inc('packets/pubrec/missed'), + State + end, hibernate}; %% PUBREL: handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> - case maps:take(PacketId, AwaitingRel) of - {Msg, AwaitingRel1} -> - spawn(emqttd_server, publish, [Msg]),%%:) - noreply(State#state{awaiting_rel = AwaitingRel1}); - error -> - ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), - emqttd_metrics:inc('packets/pubrel/missed'), - noreply(State) - end; + {noreply, + case maps:take(PacketId, AwaitingRel) of + {Msg, AwaitingRel1} -> + spawn(emqttd_server, publish, [Msg]), %%:) + State#state{awaiting_rel = AwaitingRel1}; + error -> + ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), + emqttd_metrics:inc('packets/pubrel/missed'), + State + end, hibernate}; %% PUBCOMP: handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) -> - case Inflight:contain(PacketId) of - true -> - noreply(dequeue(acked(pubcomp, PacketId, State))); - false -> - ?LOG(warning, "The PUBCOMP ~p is not inflight: ~p", - [PacketId, Inflight:window()], State), - emqttd_metrics:inc('packets/pubcomp/missed'), - noreply(State) - end; + {noreply, + case Inflight:contain(PacketId) of + true -> + dequeue(acked(pubcomp, PacketId, State)); + false -> + ?LOG(warning, "The PUBCOMP ~p is not inflight: ~p", + [PacketId, Inflight:window()], State), + emqttd_metrics:inc('packets/pubcomp/missed'), + State + end, hibernate}; %% RESUME: handle_cast({resume, ClientId, ClientPid}, @@ -466,14 +470,13 @@ handle_cast({resume, ClientId, ClientPid}, clean_sess = CleanSess, retry_timer = RetryTimer, await_rel_timer = AwaitTimer, - stats_timer = StatsTimer, expiry_timer = ExpireTimer}) -> ?LOG(info, "Resumed by ~p", [ClientPid], State), %% Cancel Timers lists:foreach(fun emqttd_misc:cancel_timer/1, - [RetryTimer, AwaitTimer, StatsTimer, ExpireTimer]), + [RetryTimer, AwaitTimer, ExpireTimer]), case kick(ClientId, OldClientPid, ClientPid) of ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], State); @@ -503,13 +506,13 @@ handle_cast({resume, ClientId, ClientPid}, %% Replay delivery and Dequeue pending messages hibernate(emit_stats(dequeue(retry_delivery(true, State1)))); -handle_cast({destroy, ClientId}, State = #state{client_id = ClientId, - client_pid = undefined}) -> +handle_cast({destroy, ClientId}, + State = #state{client_id = ClientId, client_pid = undefined}) -> ?LOG(warning, "Destroyed", [], State), shutdown(destroy, State); -handle_cast({destroy, ClientId}, State = #state{client_id = ClientId, - client_pid = OldClientPid}) -> +handle_cast({destroy, ClientId}, + State = #state{client_id = ClientId, client_pid = OldClientPid}) -> ?LOG(warning, "kickout ~p", [OldClientPid], State), shutdown(conflict, State); @@ -518,27 +521,25 @@ handle_cast(Msg, State) -> %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> - noreply(dispatch(tune_qos(Topic, Msg, State), State)); + {noreply, dispatch(tune_qos(Topic, Msg, State), State), hibernate}; %% Do nothing if the client has been disconnected. handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> hibernate(emit_stats(State#state{retry_timer = undefined})); handle_info({timeout, _Timer, retry_delivery}, State) -> - noreply(emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))); + hibernate(emit_stats(retry_delivery(false, State#state{retry_timer = undefined}))); handle_info({timeout, _Timer, check_awaiting_rel}, State) -> - noreply(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); - -handle_info({timeout, _Timer, emit_stats}, State) -> - hibernate(maybe_enable_stats(emit_stats(State))); + hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); handle_info({timeout, _Timer, expired}, State) -> ?LOG(info, "Expired, shutdown now.", [], State), shutdown(expired, State); handle_info({'EXIT', ClientPid, _Reason}, - State = #state{clean_sess = true, client_pid = ClientPid}) -> + State = #state{clean_sess = true, + client_pid = ClientPid}) -> {stop, normal, State}; handle_info({'EXIT', ClientPid, Reason}, @@ -548,7 +549,7 @@ handle_info({'EXIT', ClientPid, Reason}, ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ExpireTimer = start_timer(Interval, expired), State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, - hibernate(maybe_enable_stats(emit_stats(State1))); + hibernate(emit_stats(State1)); handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> %%ignore @@ -690,7 +691,8 @@ dispatch(Msg = #mqtt_message{qos = QoS}, end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> - inc(enqueue_msg), State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. + emqttd_misc:inc_stats(enqueue_msg), + State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. %%-------------------------------------------------------------------- %% Deliver @@ -700,7 +702,8 @@ redeliver(Msg = #mqtt_message{qos = QoS}, State) -> deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State). deliver(Msg, #state{client_pid = Pid}) -> - inc(deliver_msg), Pid ! {deliver, Msg}. + emqttd_misc:inc_stats(deliver_msg), + Pid ! {deliver, Msg}. %%-------------------------------------------------------------------- %% Awaiting ACK for QoS1/QoS2 Messages @@ -785,14 +788,6 @@ next_msg_id(State = #state{next_msg_id = Id}) -> %% Emit session stats %%-------------------------------------------------------------------- -maybe_enable_stats(State = #state{enable_stats = false}) -> - State; -maybe_enable_stats(State = #state{client_pid = Pid}) when is_pid(Pid) -> - State; -maybe_enable_stats(State = #state{enable_stats = Interval}) -> - StatsTimer = start_timer(Interval, emit_stats), - State#state{stats_timer = StatsTimer}. - emit_stats(State = #state{enable_stats = false}) -> State; emit_stats(State = #state{client_id = ClientId}) -> @@ -803,13 +798,8 @@ emit_stats(State = #state{client_id = ClientId}) -> %% Helper functions %%-------------------------------------------------------------------- -inc(Key) -> put(Key, get(Key) + 1). - reply(Reply, State) -> - {reply, Reply, State}. - -noreply(State) -> - {noreply, State}. + {reply, Reply, State, hibernate}. hibernate(State) -> {noreply, State, hibernate}. diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 3641a98a8..e2d605bf2 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -16,7 +16,7 @@ -module(emqttd_ws_client). --behaviour(gen_server). +-behaviour(gen_server2). -author("Feng Lee "). @@ -40,9 +40,12 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 Callbacks +-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]). + %% WebSocket Client State -record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive, - enable_stats, stats_timer}). + enable_stats}). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -54,13 +57,13 @@ start_link(Env, WsPid, Req, ReplyChannel) -> gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []). info(CPid) -> - gen_server:call(CPid, info). + gen_server2:call(CPid, info). stats(CPid) -> - gen_server:call(CPid, stats). + gen_server2:call(CPid, stats). kick(CPid) -> - gen_server:call(CPid, kick). + gen_server2:call(CPid, kick). subscribe(CPid, TopicTable) -> CPid ! {subscribe, TopicTable}. @@ -69,7 +72,7 @@ unsubscribe(CPid, Topics) -> CPid ! {unsubscribe, Topics}. session(CPid) -> - gen_server:call(CPid, session). + gen_server2:call(CPid, session). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -90,28 +93,39 @@ init([Env, WsPid, Req, ReplyChannel]) -> EnableStats = proplists:get_value(client_enable_stats, Env, false), ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, Headers} | Env]), - {ok, maybe_enable_stats(#wsclient_state{ws_pid = WsPid, - peer = Req:get(peer), - connection = Req:get(connection), - proto_state = ProtoState, - enable_stats = EnableStats}), - proplists:get_value(client_idle_timeout, Env, 30000)}. + IdleTimeout = proplists:get_value(client_idle_timeout, Env, 30000), + {ok, #wsclient_state{ws_pid = WsPid, + peer = Req:get(peer), + connection = Req:get(connection), + proto_state = ProtoState, + enable_stats = EnableStats}, + IdleTimeout, {backoff, 1000, 1000, 5000}, ?MODULE}. + +prioritise_call(Msg, _From, _Len, _State) -> + case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. + +prioritise_info(Msg, _Len, _State) -> + case Msg of {redeliver, _} -> 5; _ -> 0 end. + +handle_pre_hibernate(State = #wsclient_state{peer = Peer}) -> + io:format("WsClient(~s) will hibernate!~n", [Peer]), + {hibernate, emit_stats(State)}. handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)], - {reply, Stats, _} = handle_call(stats, From, State), - {reply, lists:append(Info, Stats), State}; + {reply, Stats, _, _} = handle_call(stats, From, State), + reply(lists:append(Info, Stats), State); handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) -> - {reply, lists:append([emqttd_misc:proc_stats(), - wsock_stats(State), - emqttd_protocol:stats(ProtoState)]), State}; + reply(lists:append([emqttd_misc:proc_stats(), + wsock_stats(State), + emqttd_protocol:stats(ProtoState)]), State); handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> - {reply, emqttd_protocol:session(ProtoState), State}; + reply(emqttd_protocol:session(ProtoState), State); handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> ?WSLOG(error, Peer, "Unexpected request: ~p", [Req]), @@ -166,8 +180,8 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) -> emqttd_protocol:pubrel(PacketId, ProtoState) end, State); -handle_info({timeout, _Timer, emit_stats}, State) -> - {noreply, maybe_enable_stats(emit_stats(State)), hibernate}; +handle_info(emit_stats, State) -> + {noreply, emit_stats(State), hibernate}; handle_info(timeout, State) -> shutdown(idle_timeout, State); @@ -185,7 +199,7 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, c end end, KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), - {noreply, stats_by_keepalive(State#wsclient_state{keepalive = KeepAlive})}; + {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate}; handle_info({keepalive, check}, State = #wsclient_state{peer = Peer, keepalive = KeepAlive}) -> @@ -209,7 +223,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{peer = Peer, ws_pid handle_info(Info, State = #wsclient_state{peer = Peer}) -> ?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]), - {noreply, State}. + {noreply, State, hibernate}. terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> emqttd_keepalive:cancel(KeepAlive), @@ -227,21 +241,16 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -maybe_enable_stats(State = #wsclient_state{enable_stats = false}) -> - State; -maybe_enable_stats(State = #wsclient_state{enable_stats = keepalive}) -> - State; -maybe_enable_stats(State = #wsclient_state{enable_stats = Interval}) -> - State#wsclient_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}. - -stats_by_keepalive(State) -> - State#wsclient_state{enable_stats = keepalive}. - -emit_stats(State = #wsclient_state{enable_stats = false}) -> - State; emit_stats(State = #wsclient_state{proto_state = ProtoState}) -> - {reply, Stats, _} = handle_call(stats, undefined, State), - emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats), + emit_stats(emqttd_protocol:clientid(ProtoState), State). + +emit_stats(_ClientId, State = #wsclient_state{enable_stats = false}) -> + State; +emit_stats(undefined, State) -> + State; +emit_stats(ClientId, State) -> + {reply, Stats, _, _} = handle_call(stats, undefined, State), + emqttd_stats:set_client_stats(ClientId, Stats), State. wsock_stats(#wsclient_state{connection = Conn}) -> @@ -252,7 +261,10 @@ wsock_stats(#wsclient_state{connection = Conn}) -> with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), - {noreply, State#wsclient_state{proto_state = ProtoState1}}. + {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. shutdown(Reason, State) -> stop({shutdown, Reason}, State).