Tuning the 'force_gc_policy' of MQTT connections (#3192)

Tuning the 'force_gc_policy' of MQTT connections
This commit is contained in:
Feng Lee 2020-01-17 09:42:16 +08:00 committed by tigercl
parent a71486cac7
commit a318532bb0
5 changed files with 11 additions and 8 deletions

View File

@ -601,11 +601,11 @@ zone.external.enable_stats = on
## Default: ignore ## Default: ignore
zone.external.acl_deny_action = ignore zone.external.acl_deny_action = ignore
## Force MQTT connection/session process GC after this number of ## Force the MQTT connection process GC after this number of
## messages | bytes passed through. ## messages | bytes passed through.
## ##
## Numbers delimited by `|'. Zero or negative is to disable. ## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_gc_policy = 1000|1MB zone.external.force_gc_policy = 10000|10MB
## Max message queue length and total heap size to force shutdown ## Max message queue length and total heap size to force shutdown
## connection/session process. ## connection/session process.
@ -792,6 +792,9 @@ zone.internal.enable_acl = off
## Default: ignore ## Default: ignore
zone.internal.acl_deny_action = ignore zone.internal.acl_deny_action = ignore
## See zone.$name.force_gc_policy
## zone.internal.force_gc_policy = 100000|100MB
## See zone.$name.wildcard_subscription. ## See zone.$name.wildcard_subscription.
## ##
## Value: boolean ## Value: boolean

View File

@ -915,7 +915,6 @@ end}.
%% messages | bytes passed through. %% messages | bytes passed through.
%% Numbers delimited by `|'. Zero or negative is to disable. %% Numbers delimited by `|'. Zero or negative is to disable.
{mapping, "zone.$name.force_gc_policy", "emqx.zones", [ {mapping, "zone.$name.force_gc_policy", "emqx.zones", [
{default, "0 | 0MB"},
{datatype, string} {datatype, string}
]}. ]}.

View File

@ -58,8 +58,7 @@ merge_opts(Defaults, Options) ->
%% @doc Apply a function to a maybe argument. %% @doc Apply a function to a maybe argument.
-spec(maybe_apply(fun((maybe(A)) -> maybe(A)), maybe(A)) -spec(maybe_apply(fun((maybe(A)) -> maybe(A)), maybe(A))
-> maybe(A) when A :: any()). -> maybe(A) when A :: any()).
maybe_apply(_Fun, undefined) -> maybe_apply(_Fun, undefined) -> undefined;
undefined;
maybe_apply(Fun, Arg) when is_function(Fun) -> maybe_apply(Fun, Arg) when is_function(Fun) ->
erlang:apply(Fun, [Arg]). erlang:apply(Fun, [Arg]).

View File

@ -429,9 +429,9 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
run_gc(Stats, State = #state{gc_state = GcSt}) -> run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
false -> State;
{_IsGC, GcSt1} -> {_IsGC, GcSt1} ->
State#state{gc_state = GcSt1}; State#state{gc_state = GcSt1}
false -> State
end. end.
check_oom(State = #state{channel = Channel}) -> check_oom(State = #state{channel = Channel}) ->

View File

@ -45,6 +45,8 @@
, session_expiry_interval/1 , session_expiry_interval/1
, force_gc_policy/1 , force_gc_policy/1
, force_shutdown_policy/1 , force_shutdown_policy/1
, get_env/2
, get_env/3
]}). ]}).
%% APIs %% APIs
@ -114,7 +116,7 @@ start_link() ->
stop() -> stop() ->
gen_server:stop(?SERVER). gen_server:stop(?SERVER).
-spec(init_gc_state(zone()) -> emqx_gc:gc_state()). -spec(init_gc_state(zone()) -> maybe(emqx_gc:gc_state())).
init_gc_state(Zone) -> init_gc_state(Zone) ->
maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)). maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)).