Rewrite emqx_gc.erl
The implementation prior to this commit supports only one gc enforcement policy which is message count threshold. The new implementation introduces 1 more: volume threshold based
This commit is contained in:
parent
3822ff987b
commit
e940c1c970
|
@ -35,3 +35,5 @@ bbmustache/
|
||||||
etc/gen.emqx.conf
|
etc/gen.emqx.conf
|
||||||
compile_commands.json
|
compile_commands.json
|
||||||
cuttlefish
|
cuttlefish
|
||||||
|
rebar.lock
|
||||||
|
xrefr
|
||||||
|
|
|
@ -824,6 +824,14 @@ end}.
|
||||||
{datatype, {enum, [true, false]}}
|
{datatype, {enum, [true, false]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc Force connection/session process GC after this number of
|
||||||
|
%% messages | bytes passed through.
|
||||||
|
%% Numbers delimited by `|'. Zero or negative is to disable.
|
||||||
|
{mapping, "zone.$name.force_gc_policy", "emqx.zones", [
|
||||||
|
{default, "0|0"},
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
{translation, "emqx.zones", fun(Conf) ->
|
{translation, "emqx.zones", fun(Conf) ->
|
||||||
Mapping = fun("retain_available", Val) ->
|
Mapping = fun("retain_available", Val) ->
|
||||||
{mqtt_retain_available, Val};
|
{mqtt_retain_available, Val};
|
||||||
|
@ -831,6 +839,10 @@ end}.
|
||||||
{mqtt_wildcard_subscription, Val};
|
{mqtt_wildcard_subscription, Val};
|
||||||
("shared_subscription", Val) ->
|
("shared_subscription", Val) ->
|
||||||
{mqtt_shared_subscription, Val};
|
{mqtt_shared_subscription, Val};
|
||||||
|
("force_gc_policy", Val) ->
|
||||||
|
[Count, Bytes] = string:tokens(Val, "| "),
|
||||||
|
{force_gc_policy, #{count => list_to_integer(Count),
|
||||||
|
bytes => list_to_integer(Bytes)}};
|
||||||
(Opt, Val) ->
|
(Opt, Val) ->
|
||||||
{list_to_atom(Opt), Val}
|
{list_to_atom(Opt), Val}
|
||||||
end,
|
end,
|
||||||
|
@ -1750,3 +1762,5 @@ end}.
|
||||||
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
|
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
|
||||||
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
|
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -148,7 +148,10 @@ init([Transport, RawSocket, Options]) ->
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
parser_state = ParserState,
|
parser_state = ParserState,
|
||||||
enable_stats = EnableStats,
|
enable_stats = EnableStats,
|
||||||
idle_timeout = IdleTimout}),
|
idle_timeout = IdleTimout
|
||||||
|
}),
|
||||||
|
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
|
||||||
|
ok = emqx_gc:init(GcPolicy),
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
|
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
|
||||||
State, self(), IdleTimout);
|
State, self(), IdleTimout);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -200,14 +203,18 @@ handle_cast(Msg, State) ->
|
||||||
handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
|
handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
|
||||||
case emqx_protocol:deliver(PubOrAck, ProtoState) of
|
case emqx_protocol:deliver(PubOrAck, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
{noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))};
|
State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}),
|
||||||
|
ok = maybe_gc(State1, PubOrAck),
|
||||||
|
{noreply, State1};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
shutdown(Reason, State)
|
shutdown(Reason, State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({timeout, Timer, emit_stats},
|
handle_info({timeout, Timer, emit_stats},
|
||||||
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
|
State = #state{stats_timer = Timer,
|
||||||
|
proto_state = ProtoState
|
||||||
|
}) ->
|
||||||
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
|
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
|
||||||
|
ok = emqx_gc:reset(),
|
||||||
{noreply, State#state{stats_timer = undefined}, hibernate};
|
{noreply, State#state{stats_timer = undefined}, hibernate};
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
|
@ -295,9 +302,10 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% Receive and parse data
|
%% Receive and parse data
|
||||||
handle_packet(<<>>, State) ->
|
handle_packet(<<>>, State0) ->
|
||||||
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
|
State = ensure_stats_timer(ensure_rate_limit(State0)),
|
||||||
|
ok = maybe_gc(State, incoming),
|
||||||
|
{noreply, State};
|
||||||
handle_packet(Data, State = #state{proto_state = ProtoState,
|
handle_packet(Data, State = #state{proto_state = ProtoState,
|
||||||
parser_state = ParserState,
|
parser_state = ParserState,
|
||||||
idle_timeout = IdleTimeout}) ->
|
idle_timeout = IdleTimeout}) ->
|
||||||
|
@ -381,7 +389,13 @@ shutdown(Reason, State) ->
|
||||||
stop(Reason, State) ->
|
stop(Reason, State) ->
|
||||||
{stop, Reason, State}.
|
{stop, Reason, State}.
|
||||||
|
|
||||||
maybe_gc(State) ->
|
%% For incoming messages, bump gc-stats with packet count and totoal volume
|
||||||
%% TODO: gc and shutdown policy
|
%% For outgoing messages, only 'publish' type is taken into account.
|
||||||
State.
|
maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) ->
|
||||||
|
ok = emqx_gc:inc(Cnt, Oct);
|
||||||
|
maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
|
||||||
|
Oct = iolist_size(Payload),
|
||||||
|
ok = emqx_gc:inc(1, Oct);
|
||||||
|
maybe_gc(_, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -12,38 +12,87 @@
|
||||||
%% See the License for the specific language governing permissions and
|
%% See the License for the specific language governing permissions and
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
|
|
||||||
%% GC Utility functions.
|
%% @doc This module manages an opaque collection of statistics data used to
|
||||||
|
%% force garbage collection on `self()' process when hitting thresholds.
|
||||||
|
%% Namely:
|
||||||
|
%% (1) Total number of messages passed through
|
||||||
|
%% (2) Total data volume passed through
|
||||||
|
%% @end
|
||||||
|
|
||||||
-module(emqx_gc).
|
-module(emqx_gc).
|
||||||
|
|
||||||
%% Memory: (10, 100, 1000)
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
%%
|
|
||||||
|
|
||||||
-export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2,
|
-export([init/1, inc/2, reset/0]).
|
||||||
maybe_force_gc/3]).
|
|
||||||
|
|
||||||
-spec(conn_max_gc_count() -> integer()).
|
-type st() :: #{ cnt => {integer(), integer()}
|
||||||
conn_max_gc_count() ->
|
, oct => {integer(), integer()}
|
||||||
case emqx_config:get_env(conn_force_gc_count) of
|
}.
|
||||||
I when is_integer(I), I > 0 -> I + rand:uniform(I);
|
|
||||||
I when is_integer(I), I =< 0 -> undefined;
|
-define(disabled, disabled).
|
||||||
undefined -> undefined
|
-define(ENABLED(X), (is_integer(X) andalso X > 0)).
|
||||||
|
|
||||||
|
%% @doc Initialize force GC parameters.
|
||||||
|
-spec init(false | map()) -> ok.
|
||||||
|
init(#{count := Count, bytes := Bytes}) ->
|
||||||
|
Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)],
|
||||||
|
Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
|
||||||
|
erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)),
|
||||||
|
ok;
|
||||||
|
init(_) -> erlang:put(?MODULE, #{}), ok.
|
||||||
|
|
||||||
|
%% @doc Increase count and bytes stats in one call,
|
||||||
|
%% ensure gc is triggered at most once, even if both thresholds are hit.
|
||||||
|
-spec inc(pos_integer(), pos_integer()) -> ok.
|
||||||
|
inc(Cnt, Oct) ->
|
||||||
|
mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end).
|
||||||
|
|
||||||
|
%% @doc Reset counters to zero.
|
||||||
|
-spec reset() -> ok.
|
||||||
|
reset() ->
|
||||||
|
mutate_pd_with(fun(St) -> reset(St) end).
|
||||||
|
|
||||||
|
%% ======== Internals ========
|
||||||
|
|
||||||
|
%% mutate gc stats numbers in process dict with the given function
|
||||||
|
mutate_pd_with(F) ->
|
||||||
|
St = F(erlang:get(?MODULE)),
|
||||||
|
erlang:put(?MODULE, St),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% Increase count and bytes stats in one call,
|
||||||
|
%% ensure gc is triggered at most once, even if both thresholds are hit.
|
||||||
|
-spec inc(st(), pos_integer(), pos_integer()) -> st().
|
||||||
|
inc(St0, Cnt, Oct) ->
|
||||||
|
case do_inc(St0, cnt, Cnt) of
|
||||||
|
{true, St} ->
|
||||||
|
St;
|
||||||
|
{false, St1} ->
|
||||||
|
{_, St} = do_inc(St1, oct, Oct),
|
||||||
|
St
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(reset_conn_gc_count(pos_integer(), tuple()) -> tuple()).
|
%% Reset counters to zero.
|
||||||
reset_conn_gc_count(Pos, State) ->
|
reset(St) -> reset(cnt, reset(oct, St)).
|
||||||
case element(Pos, State) of
|
|
||||||
undefined -> State;
|
-spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}.
|
||||||
_I -> setelement(Pos, State, conn_max_gc_count())
|
do_inc(St, Key, Num) ->
|
||||||
|
case maps:get(Key, St, ?disabled) of
|
||||||
|
?disabled ->
|
||||||
|
{false, St};
|
||||||
|
{Init, Remain} when Remain > Num ->
|
||||||
|
{false, maps:put(Key, {Init, Remain - Num}, St)};
|
||||||
|
_ ->
|
||||||
|
{true, do_gc(St)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
maybe_force_gc(Pos, State) ->
|
do_gc(St) ->
|
||||||
maybe_force_gc(Pos, State, fun() -> ok end).
|
erlang:garbage_collect(),
|
||||||
maybe_force_gc(Pos, State, Cb) ->
|
reset(St).
|
||||||
case element(Pos, State) of
|
|
||||||
undefined -> State;
|
reset(Key, St) ->
|
||||||
I when I =< 0 -> Cb(), garbage_collect(),
|
case maps:get(Key, St, ?disabled) of
|
||||||
reset_conn_gc_count(Pos, State);
|
?disabled -> St;
|
||||||
I -> setelement(Pos, State, I - 1)
|
{Init, _} -> maps:put(Key, {Init, Init}, St)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -350,10 +350,13 @@ init([Parent, #{zone := Zone,
|
||||||
enable_stats = get_env(Zone, enable_stats, true),
|
enable_stats = get_env(Zone, enable_stats, true),
|
||||||
deliver_stats = 0,
|
deliver_stats = 0,
|
||||||
enqueue_stats = 0,
|
enqueue_stats = 0,
|
||||||
created_at = os:timestamp()},
|
created_at = os:timestamp()
|
||||||
|
},
|
||||||
emqx_sm:register_session(ClientId, attrs(State)),
|
emqx_sm:register_session(ClientId, attrs(State)),
|
||||||
emqx_sm:set_session_stats(ClientId, stats(State)),
|
emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
|
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
|
||||||
|
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
|
||||||
|
ok = emqx_gc:init(GcPolicy),
|
||||||
ok = proc_lib:init_ack(Parent, {ok, self()}),
|
ok = proc_lib:init_ack(Parent, {ok, self()}),
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
||||||
|
|
||||||
|
@ -567,8 +570,11 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer
|
||||||
handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
|
handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
|
||||||
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
|
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
|
||||||
|
|
||||||
handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
|
handle_info({timeout, Timer, emit_stats},
|
||||||
|
State = #state{client_id = ClientId,
|
||||||
|
stats_timer = Timer}) ->
|
||||||
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
|
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
|
ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
|
||||||
{noreply, State#state{stats_timer = undefined}, hibernate};
|
{noreply, State#state{stats_timer = undefined}, hibernate};
|
||||||
|
|
||||||
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
||||||
|
@ -744,21 +750,22 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% Deliver qos0 message directly to client
|
%% Deliver qos0 message directly to client
|
||||||
dispatch(Msg = #message{qos = ?QOS0}, State) ->
|
dispatch(Msg = #message{qos = ?QOS0} = Msg, State) ->
|
||||||
deliver(undefined, Msg, State),
|
deliver(undefined, Msg, State),
|
||||||
inc_stats(deliver, State);
|
inc_stats(deliver, Msg, State);
|
||||||
|
|
||||||
dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
|
dispatch(Msg = #message{qos = QoS} = Msg,
|
||||||
|
State = #state{next_pkt_id = PacketId, inflight = Inflight})
|
||||||
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
|
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
|
||||||
case emqx_inflight:is_full(Inflight) of
|
case emqx_inflight:is_full(Inflight) of
|
||||||
true -> enqueue_msg(Msg, State);
|
true -> enqueue_msg(Msg, State);
|
||||||
false ->
|
false ->
|
||||||
deliver(PacketId, Msg, State),
|
deliver(PacketId, Msg, State),
|
||||||
await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State)))
|
await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State)))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
||||||
inc_stats(enqueue, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
|
inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Deliver
|
%% Deliver
|
||||||
|
@ -882,11 +889,19 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Inc stats
|
%% Inc stats
|
||||||
|
|
||||||
inc_stats(deliver, State = #state{deliver_stats = I}) ->
|
inc_stats(deliver, Msg, State = #state{deliver_stats = I}) ->
|
||||||
|
MsgSize = msg_size(Msg),
|
||||||
|
ok = emqx_gc:inc(1, MsgSize),
|
||||||
State#state{deliver_stats = I + 1};
|
State#state{deliver_stats = I + 1};
|
||||||
inc_stats(enqueue, State = #state{enqueue_stats = I}) ->
|
inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) ->
|
||||||
State#state{enqueue_stats = I + 1}.
|
State#state{enqueue_stats = I + 1}.
|
||||||
|
|
||||||
|
%% Take only the payload size into account, add other fields if necessary
|
||||||
|
msg_size(#message{payload = Payload}) -> payload_size(Payload).
|
||||||
|
|
||||||
|
%% Payload should be binary(), but not 100% sure. Need dialyzer!
|
||||||
|
payload_size(Payload) -> erlang:iolist_size(Payload).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
|
|
||||||
|
@ -902,5 +917,3 @@ noreply(State) ->
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
{stop, {shutdown, Reason}, State}.
|
{stop, {shutdown, Reason}, State}.
|
||||||
|
|
||||||
%% TODO: GC Policy and Shutdown Policy
|
|
||||||
%% maybe_gc(State) -> State.
|
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
|
||||||
|
-module(emqx_gc_tests).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
trigger_by_cnt_test() ->
|
||||||
|
Args = #{count => 2, bytes => 0},
|
||||||
|
ok = emqx_gc:init(Args),
|
||||||
|
ok = emqx_gc:inc(1, 1000),
|
||||||
|
St1 = inspect(),
|
||||||
|
?assertMatch({_, Remain} when Remain > 0, maps:get(cnt, St1)),
|
||||||
|
ok = emqx_gc:inc(2, 2),
|
||||||
|
St2 = inspect(),
|
||||||
|
ok = emqx_gc:inc(0, 2000),
|
||||||
|
St3 = inspect(),
|
||||||
|
?assertEqual(St2, St3),
|
||||||
|
?assertMatch({N, N}, maps:get(cnt, St2)),
|
||||||
|
?assertNot(maps:is_key(oct, St2)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
trigger_by_oct_test() ->
|
||||||
|
Args = #{count => 2, bytes => 2},
|
||||||
|
ok = emqx_gc:init(Args),
|
||||||
|
ok = emqx_gc:inc(1, 1),
|
||||||
|
St1 = inspect(),
|
||||||
|
?assertMatch({_, Remain} when Remain > 0, maps:get(oct, St1)),
|
||||||
|
ok = emqx_gc:inc(2, 2),
|
||||||
|
St2 = inspect(),
|
||||||
|
?assertMatch({N, N}, maps:get(oct, St2)),
|
||||||
|
?assertMatch({M, M}, maps:get(cnt, St2)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
disabled_test() ->
|
||||||
|
Args = #{count => -1, bytes => false},
|
||||||
|
ok = emqx_gc:init(Args),
|
||||||
|
ok = emqx_gc:inc(1, 1),
|
||||||
|
?assertEqual(#{}, inspect()),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
inspect() -> erlang:get(emqx_gc).
|
Loading…
Reference in New Issue