From 7d65ad42add1ba667b10b5aa6dc7bfe22fdb5081 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Feb 2017 16:56:16 +0800 Subject: [PATCH] Add '[{fullsweep_after, 10}]' opts and 'force_gc_count' to tune the memory usage --- src/emqttd_client.erl | 32 ++++++++++++++++++-------------- src/emqttd_session.erl | 20 +++++++++++++++----- src/emqttd_ws_client.erl | 28 +++++++++++++++++----------- 3 files changed, 50 insertions(+), 30 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index af36418d9..3cc7b341d 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -55,7 +55,7 @@ %% Unused fields: connname, peerhost, peerport -record(client_state, {connection, peername, conn_state, await_recv, rate_limit, packet_size, parser, proto_state, - keepalive, enable_stats}). + keepalive, enable_stats, force_gc_count}). -define(INFO_KEYS, [peername, conn_state, await_recv]). @@ -66,7 +66,7 @@ [esockd_net:format(State#client_state.peername) | Args])). start_link(Conn, Env) -> - {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. + {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]], [{fullsweep_after, 10}])}. info(CPid) -> gen_server2:call(CPid, info). @@ -114,15 +114,17 @@ do_init(Conn, Env, Peername) -> Parser = emqttd_parser:initial_state(PacketSize), ProtoState = emqttd_protocol:init(Peername, SendFun, Env), EnableStats = get_value(client_enable_stats, Env, false), - State = run_socket(#client_state{connection = Conn, - peername = Peername, - await_recv = false, - conn_state = running, - rate_limit = RateLimit, - packet_size = PacketSize, - parser = Parser, - proto_state = ProtoState, - enable_stats = EnableStats}), + ForceGcCount = emqttd_gc:conn_max_gc_count(), + State = run_socket(#client_state{connection = Conn, + peername = Peername, + await_recv = false, + conn_state = running, + rate_limit = RateLimit, + packet_size = PacketSize, + parser = Parser, + proto_state = ProtoState, + enable_stats = EnableStats, + force_gc_count = ForceGcCount}), IdleTimout = get_value(client_idle_timeout, Env, 30000), gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, {backoff, 1000, 1000, 10000}). @@ -147,7 +149,7 @@ prioritise_info(Msg, _Len, _State) -> case Msg of {redeliver, _} -> 5; _ -> 0 end. handle_pre_hibernate(State) -> - {hibernate, emit_stats(State)}. + {hibernate, emit_stats(emqttd_gc:reset_conn_gc_count(State))}. handle_call(info, From, State = #client_state{proto_state = ProtoState}) -> ProtoInfo = emqttd_protocol:info(ProtoState), @@ -237,7 +239,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> shutdown(Reason, State); handle_info({inet_reply, _Sock, ok}, State) -> - {noreply, State, hibernate}; + {noreply, gc(State), hibernate}; %% Tune GC handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); @@ -291,7 +293,7 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse tcp data received(<<>>, State) -> - {noreply, State, hibernate}; + {noreply, gc(State), hibernate}; received(Bytes, State = #client_state{parser = Parser, packet_size = PacketSize, @@ -370,3 +372,5 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. +gc(State) -> + emqttd_gc:maybe_force_gc(#client_state.force_gc_count, State). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 06f988bb7..1c575b3ed 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -147,6 +147,9 @@ %% Enable Stats enable_stats :: boolean(), + %% Force GC Count + force_gc_count :: undefined | integer(), + created_at :: erlang:timestamp() }). @@ -157,7 +160,8 @@ -define(STATE_KEYS, [clean_sess, client_id, username, binding, client_pid, old_client_pid, next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, - await_rel_timeout, expiry_interval, enable_stats, created_at]). + await_rel_timeout, expiry_interval, enable_stats, force_gc_count, + created_at]). -define(LOG(Level, Format, Args, State), lager:Level([{client, State#state.client_id}], @@ -166,7 +170,8 @@ %% @doc Start a Session -spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}). start_link(CleanSess, {ClientId, Username}, ClientPid) -> - gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []). + gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], + [{fullsweep_after, 10}]). %% Tune GC. %%-------------------------------------------------------------------- %% PubSub API @@ -280,6 +285,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> {ok, QEnv} = emqttd:env(queue), MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), + ForceGcCount = emqttd_gc:conn_max_gc_count(), MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), State = #state{clean_sess = CleanSess, binding = binding(ClientPid), @@ -298,6 +304,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> max_awaiting_rel = get_value(max_awaiting_rel, Env), expiry_interval = get_value(expiry_interval, Env), enable_stats = EnableStats, + force_gc_count = ForceGcCount, created_at = os:timestamp()}, emqttd_sm:register_session(ClientId, CleanSess, info(State)), emqttd_hooks:run('session.created', [ClientId, Username]), @@ -334,7 +341,7 @@ prioritise_info(Msg, _Len, _State) -> end. handle_pre_hibernate(State) -> - {hibernate, emit_stats(State)}. + {hibernate, emit_stats(emqttd_gc:reset_conn_gc_count(State))}. handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PacketId}}, _From, State = #state{awaiting_rel = AwaitingRel, @@ -443,7 +450,7 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> case maps:take(PacketId, AwaitingRel) of {Msg, AwaitingRel1} -> spawn(emqttd_server, publish, [Msg]), %%:) - State#state{awaiting_rel = AwaitingRel1}; + gc(State#state{awaiting_rel = AwaitingRel1}); error -> ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), emqttd_metrics:inc('packets/pubrel/missed'), @@ -521,7 +528,7 @@ handle_cast(Msg, State) -> %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> - {noreply, dispatch(tune_qos(Topic, Msg, State), State), hibernate}; + {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate}; %% Do nothing if the client has been disconnected. handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> @@ -808,3 +815,6 @@ hibernate(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. +gc(State) -> + emqttd_gc:maybe_force_gc(#state.force_gc_count, State). + diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index b4c9d4fe9..4faa44624 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -49,7 +49,7 @@ %% WebSocket Client State -record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive, - enable_stats}). + enable_stats, force_gc_count}). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -59,7 +59,8 @@ %% @doc Start WebSocket Client. start_link(Env, WsPid, Req, ReplyChannel) -> - gen_server2:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []). + gen_server2:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], + [{fullsweep_after, 10}]). %% Tune GC. info(CPid) -> gen_server2:call(CPid, info). @@ -93,11 +94,13 @@ init([Env, WsPid, Req, ReplyChannel]) -> [{ws_initial_headers, Headers} | Env]), IdleTimeout = get_value(client_idle_timeout, Env, 30000), EnableStats = get_value(client_enable_stats, Env, false), - {ok, #wsclient_state{ws_pid = WsPid, - peername = Peername, - connection = Req:get(connection), - proto_state = ProtoState, - enable_stats = EnableStats}, + ForceGcCount = emqttd_gc:conn_max_gc_count(), + {ok, #wsclient_state{ws_pid = WsPid, + peername = Peername, + connection = Req:get(connection), + proto_state = ProtoState, + enable_stats = EnableStats, + force_gc_count = ForceGcCount}, IdleTimeout, {backoff, 1000, 1000, 10000}, ?MODULE}. prioritise_call(Msg, _From, _Len, _State) -> @@ -108,7 +111,7 @@ prioritise_info(Msg, _Len, _State) -> handle_pre_hibernate(State = #wsclient_state{ws_pid = WsPid}) -> erlang:garbage_collect(WsPid),%%TODO: [{async, RequestId}]?? - {hibernate, emit_stats(State)}. + {hibernate, emqttd_gc:reset_conn_gc_count(emit_stats(State))}. handle_call(info, From, State = #wsclient_state{peername = Peername, proto_state = ProtoState}) -> @@ -135,7 +138,7 @@ handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}; + {noreply, gc(State#wsclient_state{proto_state = ProtoState1}), hibernate}; {error, Error} -> ?WSLOG(error, "Protocol error - ~p", [Error], State), shutdown(Error, State); @@ -172,7 +175,7 @@ handle_info({deliver, Message}, State) -> with_proto( fun(ProtoState) -> emqttd_protocol:send(Message, ProtoState) - end, State); + end, gc(State)); handle_info({redeliver, {?PUBREL, PacketId}}, State) -> with_proto( @@ -277,6 +280,9 @@ reply(Reply, State) -> shutdown(Reason, State) -> stop({shutdown, Reason}, State). -stop(Reason, State ) -> +stop(Reason, State) -> {stop, Reason, State}. +gc(State) -> + emqttd_gc:maybe_force_gc(#wsclient_state.force_gc_count, State). +