From e940c1c97079101d79fccd3ece539d3d011c9b53 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sun, 9 Sep 2018 16:30:40 +0200 Subject: [PATCH] Rewrite emqx_gc.erl The implementation prior to this commit supports only one gc enforcement policy which is message count threshold. The new implementation introduces 1 more: volume threshold based --- .gitignore | 2 + priv/emqx.schema | 14 ++++++ src/emqx_connection.erl | 34 ++++++++++----- src/emqx_gc.erl | 97 +++++++++++++++++++++++++++++++---------- src/emqx_session.erl | 35 ++++++++++----- test/emqx_gc_tests.erl | 53 ++++++++++++++++++++++ 6 files changed, 190 insertions(+), 45 deletions(-) create mode 100644 test/emqx_gc_tests.erl diff --git a/.gitignore b/.gitignore index 0927f0bbc..80e3bf42d 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ bbmustache/ etc/gen.emqx.conf compile_commands.json cuttlefish +rebar.lock +xrefr diff --git a/priv/emqx.schema b/priv/emqx.schema index a8ed316c2..9fcd97376 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -824,6 +824,14 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Force connection/session process GC after this number of +%% messages | bytes passed through. +%% Numbers delimited by `|'. Zero or negative is to disable. +{mapping, "zone.$name.force_gc_policy", "emqx.zones", [ + {default, "0|0"}, + {datatype, string} + ]}. + {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; @@ -831,6 +839,10 @@ end}. {mqtt_wildcard_subscription, Val}; ("shared_subscription", Val) -> {mqtt_shared_subscription, Val}; + ("force_gc_policy", Val) -> + [Count, Bytes] = string:tokens(Val, "| "), + {force_gc_policy, #{count => list_to_integer(Count), + bytes => list_to_integer(Bytes)}}; (Opt, Val) -> {list_to_atom(Opt), Val} end, @@ -1750,3 +1762,5 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. + + diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index a7f71d9aa..d9d67f08d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -148,7 +148,10 @@ init([Transport, RawSocket, Options]) -> proto_state = ProtoState, parser_state = ParserState, enable_stats = EnableStats, - idle_timeout = IdleTimout}), + idle_timeout = IdleTimout + }), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), + ok = emqx_gc:init(GcPolicy), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -200,14 +203,18 @@ handle_cast(Msg, State) -> handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> case emqx_protocol:deliver(PubOrAck, ProtoState) of {ok, ProtoState1} -> - {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))}; + State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}), + ok = maybe_gc(State1, PubOrAck), + {noreply, State1}; {error, Reason} -> shutdown(Reason, State) end; - handle_info({timeout, Timer, emit_stats}, - State = #state{stats_timer = Timer, proto_state = ProtoState}) -> + State = #state{stats_timer = Timer, + proto_state = ProtoState + }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), + ok = emqx_gc:reset(), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info(timeout, State) -> @@ -295,9 +302,10 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Receive and parse data -handle_packet(<<>>, State) -> - {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; - +handle_packet(<<>>, State0) -> + State = ensure_stats_timer(ensure_rate_limit(State0)), + ok = maybe_gc(State, incoming), + {noreply, State}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> @@ -381,7 +389,13 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. -maybe_gc(State) -> - %% TODO: gc and shutdown policy - State. +%% For incoming messages, bump gc-stats with packet count and totoal volume +%% For outgoing messages, only 'publish' type is taken into account. +maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) -> + ok = emqx_gc:inc(Cnt, Oct); +maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> + Oct = iolist_size(Payload), + ok = emqx_gc:inc(1, Oct); +maybe_gc(_, _) -> + ok. diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 5a32b43c5..65bbccad1 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -12,38 +12,87 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% GC Utility functions. +%% @doc This module manages an opaque collection of statistics data used to +%% force garbage collection on `self()' process when hitting thresholds. +%% Namely: +%% (1) Total number of messages passed through +%% (2) Total data volume passed through +%% @end -module(emqx_gc). -%% Memory: (10, 100, 1000) -%% +-author("Feng Lee "). --export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2, - maybe_force_gc/3]). +-export([init/1, inc/2, reset/0]). --spec(conn_max_gc_count() -> integer()). -conn_max_gc_count() -> - case emqx_config:get_env(conn_force_gc_count) of - I when is_integer(I), I > 0 -> I + rand:uniform(I); - I when is_integer(I), I =< 0 -> undefined; - undefined -> undefined +-type st() :: #{ cnt => {integer(), integer()} + , oct => {integer(), integer()} + }. + +-define(disabled, disabled). +-define(ENABLED(X), (is_integer(X) andalso X > 0)). + +%% @doc Initialize force GC parameters. +-spec init(false | map()) -> ok. +init(#{count := Count, bytes := Bytes}) -> + Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)], + Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)], + erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)), + ok; +init(_) -> erlang:put(?MODULE, #{}), ok. + +%% @doc Increase count and bytes stats in one call, +%% ensure gc is triggered at most once, even if both thresholds are hit. +-spec inc(pos_integer(), pos_integer()) -> ok. +inc(Cnt, Oct) -> + mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end). + +%% @doc Reset counters to zero. +-spec reset() -> ok. +reset() -> + mutate_pd_with(fun(St) -> reset(St) end). + +%% ======== Internals ======== + +%% mutate gc stats numbers in process dict with the given function +mutate_pd_with(F) -> + St = F(erlang:get(?MODULE)), + erlang:put(?MODULE, St), + ok. + +%% Increase count and bytes stats in one call, +%% ensure gc is triggered at most once, even if both thresholds are hit. +-spec inc(st(), pos_integer(), pos_integer()) -> st(). +inc(St0, Cnt, Oct) -> + case do_inc(St0, cnt, Cnt) of + {true, St} -> + St; + {false, St1} -> + {_, St} = do_inc(St1, oct, Oct), + St end. --spec(reset_conn_gc_count(pos_integer(), tuple()) -> tuple()). -reset_conn_gc_count(Pos, State) -> - case element(Pos, State) of - undefined -> State; - _I -> setelement(Pos, State, conn_max_gc_count()) +%% Reset counters to zero. +reset(St) -> reset(cnt, reset(oct, St)). + +-spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}. +do_inc(St, Key, Num) -> + case maps:get(Key, St, ?disabled) of + ?disabled -> + {false, St}; + {Init, Remain} when Remain > Num -> + {false, maps:put(Key, {Init, Remain - Num}, St)}; + _ -> + {true, do_gc(St)} end. -maybe_force_gc(Pos, State) -> - maybe_force_gc(Pos, State, fun() -> ok end). -maybe_force_gc(Pos, State, Cb) -> - case element(Pos, State) of - undefined -> State; - I when I =< 0 -> Cb(), garbage_collect(), - reset_conn_gc_count(Pos, State); - I -> setelement(Pos, State, I - 1) +do_gc(St) -> + erlang:garbage_collect(), + reset(St). + +reset(Key, St) -> + case maps:get(Key, St, ?disabled) of + ?disabled -> St; + {Init, _} -> maps:put(Key, {Init, Init}, St) end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 118ab6e21..54dcfb1fe 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -350,10 +350,13 @@ init([Parent, #{zone := Zone, enable_stats = get_env(Zone, enable_stats, true), deliver_stats = 0, enqueue_stats = 0, - created_at = os:timestamp()}, + created_at = os:timestamp() + }, emqx_sm:register_session(ClientId, attrs(State)), emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), + ok = emqx_gc:init(GcPolicy), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). @@ -567,8 +570,11 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) -> noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); -handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> +handle_info({timeout, Timer, emit_stats}, + State = #state{client_id = ClientId, + stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), + ok = emqx_gc:reset(), %% going to hibernate, reset gc stats {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> @@ -744,21 +750,22 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> end; %% Deliver qos0 message directly to client -dispatch(Msg = #message{qos = ?QOS0}, State) -> +dispatch(Msg = #message{qos = ?QOS0} = Msg, State) -> deliver(undefined, Msg, State), - inc_stats(deliver, State); + inc_stats(deliver, Msg, State); -dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight}) +dispatch(Msg = #message{qos = QoS} = Msg, + State = #state{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); false -> deliver(PacketId, Msg, State), - await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State))) + await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State))) end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> - inc_stats(enqueue, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). + inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). %%------------------------------------------------------------------------------ %% Deliver @@ -882,11 +889,19 @@ next_pkt_id(State = #state{next_pkt_id = Id}) -> %%------------------------------------------------------------------------------ %% Inc stats -inc_stats(deliver, State = #state{deliver_stats = I}) -> +inc_stats(deliver, Msg, State = #state{deliver_stats = I}) -> + MsgSize = msg_size(Msg), + ok = emqx_gc:inc(1, MsgSize), State#state{deliver_stats = I + 1}; -inc_stats(enqueue, State = #state{enqueue_stats = I}) -> +inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) -> State#state{enqueue_stats = I + 1}. +%% Take only the payload size into account, add other fields if necessary +msg_size(#message{payload = Payload}) -> payload_size(Payload). + +%% Payload should be binary(), but not 100% sure. Need dialyzer! +payload_size(Payload) -> erlang:iolist_size(Payload). + %%------------------------------------------------------------------------------ %% Helper functions @@ -902,5 +917,3 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -%% TODO: GC Policy and Shutdown Policy -%% maybe_gc(State) -> State. diff --git a/test/emqx_gc_tests.erl b/test/emqx_gc_tests.erl new file mode 100644 index 000000000..ffcac91d1 --- /dev/null +++ b/test/emqx_gc_tests.erl @@ -0,0 +1,53 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_gc_tests). + +-include_lib("eunit/include/eunit.hrl"). + +trigger_by_cnt_test() -> + Args = #{count => 2, bytes => 0}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1000), + St1 = inspect(), + ?assertMatch({_, Remain} when Remain > 0, maps:get(cnt, St1)), + ok = emqx_gc:inc(2, 2), + St2 = inspect(), + ok = emqx_gc:inc(0, 2000), + St3 = inspect(), + ?assertEqual(St2, St3), + ?assertMatch({N, N}, maps:get(cnt, St2)), + ?assertNot(maps:is_key(oct, St2)), + ok. + +trigger_by_oct_test() -> + Args = #{count => 2, bytes => 2}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1), + St1 = inspect(), + ?assertMatch({_, Remain} when Remain > 0, maps:get(oct, St1)), + ok = emqx_gc:inc(2, 2), + St2 = inspect(), + ?assertMatch({N, N}, maps:get(oct, St2)), + ?assertMatch({M, M}, maps:get(cnt, St2)), + ok. + +disabled_test() -> + Args = #{count => -1, bytes => false}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1), + ?assertEqual(#{}, inspect()), + ok. + +inspect() -> erlang:get(emqx_gc).