diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2cf647beb..ddb7e6a85 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -40,10 +40,10 @@ active_n, proto_state, parser_state, + gc_state, keepalive, enable_stats, stats_timer, - incoming, rate_limit, pub_limit, limit_timer, @@ -138,6 +138,8 @@ init([Transport, RawSocket, Options]) -> peercert => Peercert, sendfun => SendFun}, Options), ParserState = emqx_protocol:parser(ProtoState), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), + GcState = emqx_gc:init(GcPolicy), State = run_socket(#state{transport = Transport, socket = Socket, peername = Peername, @@ -147,11 +149,10 @@ init([Transport, RawSocket, Options]) -> pub_limit = PubLimit, proto_state = ProtoState, parser_state = ParserState, + gc_state = GcState, enable_stats = EnableStats, idle_timeout = IdleTimout }), - GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), - ok = emqx_gc:init(GcPolicy), ok = emqx_misc:init_proc_mng_policy(Zone), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], @@ -205,9 +206,8 @@ handle_cast(Msg, State) -> handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> case emqx_protocol:deliver(PubOrAck, ProtoState) of {ok, ProtoState1} -> - State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}), - ok = maybe_gc(State1, PubOrAck), - {noreply, State1}; + State1 = State#state{proto_state = ProtoState1}, + {noreply, maybe_gc(PubOrAck, ensure_stats_timer(State1))}; {error, Reason} -> shutdown(Reason, State) end; @@ -247,11 +247,10 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> handle_info({tcp, _Sock, Data}, State) -> ?LOG(debug, "RECV ~p", [Data]), - Size = iolist_size(Data), - emqx_metrics:trans(inc, 'bytes/received', Size), - emqx_pd:update_counter(incoming_bytes, Size), - Incoming = #{bytes => Size, packets => 0}, - handle_packet(Data, State#state{incoming = Incoming}); + Oct = iolist_size(Data), + emqx_pd:update_counter(incoming_bytes, Oct), + emqx_metrics:trans(inc, 'bytes/received', Oct), + handle_packet(Data, maybe_gc({1, Oct}, State)); %% Rate limit here, cool:) handle_info({tcp_passive, _Sock}, State) -> @@ -325,9 +324,7 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse data handle_packet(<<>>, State) -> - NState = ensure_stats_timer(State), - ok = maybe_gc(NState, incoming), - {noreply, NState}; + {noreply, ensure_stats_timer(State)}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, @@ -407,14 +404,15 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. -%% For incoming messages, bump gc-stats with packet count and totoal volume -%% For outgoing messages, only 'publish' type is taken into account. -maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) -> - ok = emqx_gc:inc(Cnt, Oct); -maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> +maybe_gc(_, State = #state{gc_state = undefined}) -> + State; +maybe_gc({publish, _PacketId, #message{payload = Payload}}, State) -> Oct = iolist_size(Payload), - ok = emqx_gc:inc(1, Oct); -maybe_gc(_, _) -> - ok. + maybe_gc({1, Oct}, State); +maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> + {_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt), + State#state{gc_state = GCSt1}; +maybe_gc(_, State) -> + State. diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 7e98eb37a..d608954a0 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -21,74 +21,83 @@ -module(emqx_gc). --export([init/1, inc/2, reset/0]). +-export([init/1, run/3, info/1, reset/1]). --type st() :: #{ cnt => {integer(), integer()} - , oct => {integer(), integer()} - }. +-type(opts() :: #{count => integer(), + bytes => integer()}). + +-type(st() :: #{cnt => {integer(), integer()}, + oct => {integer(), integer()}}). + +-type(gc_state() :: {?MODULE, st()}). -define(disabled, disabled). -define(ENABLED(X), (is_integer(X) andalso X > 0)). -%% @doc Initialize force GC parameters. --spec init(false | map()) -> ok. +%% @doc Initialize force GC state. +-spec(init(opts() | false) -> gc_state() | undefined). init(#{count := Count, bytes := Bytes}) -> Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)], Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)], - erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)), - ok; -init(_) -> erlang:put(?MODULE, #{}), ok. + {?MODULE, maps:from_list(Cnt ++ Oct)}; +init(false) -> undefined. -%% @doc Increase count and bytes stats in one call, -%% ensure gc is triggered at most once, even if both thresholds are hit. --spec inc(pos_integer(), pos_integer()) -> ok. -inc(Cnt, Oct) -> - mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end). +%% @doc Try to run GC based on reduntions of count or bytes. +-spec(run(pos_integer(), pos_integer(), gc_state()) -> {boolean(), gc_state()}). +run(Cnt, Oct, {?MODULE, St}) -> + {Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St), + {Res, {?MODULE, St1}}; +run(_Cnt, _Oct, undefined) -> + {false, undefined}. -%% @doc Reset counters to zero. --spec reset() -> ok. -reset() -> - mutate_pd_with(fun(St) -> reset(St) end). - -%% ======== Internals ======== - -%% mutate gc stats numbers in process dict with the given function -mutate_pd_with(F) -> - St = F(erlang:get(?MODULE)), - erlang:put(?MODULE, St), - ok. - -%% Increase count and bytes stats in one call, -%% ensure gc is triggered at most once, even if both thresholds are hit. --spec inc(st(), pos_integer(), pos_integer()) -> st(). -inc(St0, Cnt, Oct) -> - case do_inc(St0, cnt, Cnt) of - {true, St} -> - St; +run([], St) -> + {false, St}; +run([{K, N}|T], St) -> + case dec(K, N, St) of + {true, St1} -> + {true, do_gc(St1)}; {false, St1} -> - {_, St} = do_inc(St1, oct, Oct), - St + run(T, St1) end. -%% Reset counters to zero. -reset(St) -> reset(cnt, reset(oct, St)). +%% @doc Info of GC state. +-spec(info(gc_state()) -> map() | undefined). +info({?MODULE, St}) -> + St; +info(undefined) -> + undefined. --spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}. -do_inc(St, Key, Num) -> +%% @doc Reset counters to zero. +-spec(reset(gc_state()) -> gc_state()). +reset({?MODULE, St}) -> + {?MODULE, do_reset(St)}; +reset(undefined) -> + undefined. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +-spec(dec(cnt | oct, pos_integer(), st()) -> {boolean(), st()}). +dec(Key, Num, St) -> case maps:get(Key, St, ?disabled) of ?disabled -> {false, St}; {Init, Remain} when Remain > Num -> {false, maps:put(Key, {Init, Remain - Num}, St)}; _ -> - {true, do_gc(St)} + {true, St} end. do_gc(St) -> - erlang:garbage_collect(), - reset(St). + true = erlang:garbage_collect(), + do_reset(St). -reset(Key, St) -> +do_reset(St) -> + do_reset(cnt, do_reset(oct, St)). + +%% Reset counters to zero. +do_reset(Key, St) -> case maps:get(Key, St, ?disabled) of ?disabled -> St; {Init, _} -> maps:put(Key, {Init, Init}, St) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index e9bc0ff98..f4dbc2b23 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -144,6 +144,9 @@ %% Enqueue stats enqueue_stats = 0, + %% GC State + gc_state, + %% Created at created_at :: erlang:timestamp(), @@ -344,6 +347,7 @@ init([Parent, #{zone := Zone, process_flag(trap_exit, true), true = link(ConnPid), emqx_logger:set_metadata_client_id(ClientId), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), IdleTimout = get_env(Zone, idle_timeout, 30000), State = #state{idle_timeout = IdleTimout, clean_start = CleanStart, @@ -364,6 +368,7 @@ init([Parent, #{zone := Zone, enable_stats = get_env(Zone, enable_stats, true), deliver_stats = 0, enqueue_stats = 0, + gc_state = emqx_gc:init(GcPolicy), created_at = os:timestamp(), will_msg = WillMsg }, @@ -371,8 +376,6 @@ init([Parent, #{zone := Zone, true = emqx_sm:set_session_attrs(ClientId, attrs(State)), true = emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), - GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), - ok = emqx_gc:init(GcPolicy), ok = emqx_misc:init_proc_mng_policy(Zone), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). @@ -605,7 +608,9 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer noreply(ensure_stats_timer(expire_awaiting_rel(State1))); handle_info({timeout, Timer, emit_stats}, - State = #state{client_id = ClientId, stats_timer = Timer}) -> + State = #state{client_id = ClientId, + stats_timer = Timer, + gc_state = GcState}) -> emqx_metrics:commit(), _ = emqx_sm:set_session_stats(ClientId, stats(State)), NewState = State#state{stats_timer = undefined}, @@ -614,8 +619,9 @@ handle_info({timeout, Timer, emit_stats}, continue -> {noreply, NewState}; hibernate -> - ok = emqx_gc:reset(), %% going to hibernate, reset gc stats - {noreply, NewState, hibernate}; + %% going to hibernate, reset gc stats + GcState1 = emqx_gc:reset(GcState), + {noreply, NewState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> ?LOG(warning, "shutdown due to ~p", [Reason], NewState), shutdown(Reason, NewState) @@ -991,9 +997,8 @@ next_pkt_id(State = #state{next_pkt_id = Id}) -> %% Inc stats inc_stats(deliver, Msg, State = #state{deliver_stats = I}) -> - MsgSize = msg_size(Msg), - ok = emqx_gc:inc(1, MsgSize), - State#state{deliver_stats = I + 1}; + State1 = maybe_gc({1, msg_size(Msg)}, State), + State1#state{deliver_stats = I + 1}; inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) -> State#state{enqueue_stats = I + 1}. @@ -1018,3 +1023,9 @@ noreply(State) -> shutdown(Reason, State) -> {stop, Reason, State}. +maybe_gc(_, State = #state{gc_state = undefined}) -> + State; +maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> + {_, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt), + State#state{gc_state = GCSt1}. +