From a318532bb04abf014b2e668f273c529675c84579 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 09:42:16 +0800 Subject: [PATCH] Tuning the 'force_gc_policy' of MQTT connections (#3192) Tuning the 'force_gc_policy' of MQTT connections --- etc/emqx.conf | 7 +++++-- priv/emqx.schema | 1 - src/emqx_misc.erl | 3 +-- src/emqx_ws_connection.erl | 4 ++-- src/emqx_zone.erl | 4 +++- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 0c4ea81a5..b331fefd7 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -601,11 +601,11 @@ zone.external.enable_stats = on ## Default: ignore zone.external.acl_deny_action = ignore -## Force MQTT connection/session process GC after this number of +## Force the MQTT connection process GC after this number of ## messages | bytes passed through. ## ## Numbers delimited by `|'. Zero or negative is to disable. -zone.external.force_gc_policy = 1000|1MB +zone.external.force_gc_policy = 10000|10MB ## Max message queue length and total heap size to force shutdown ## connection/session process. @@ -792,6 +792,9 @@ zone.internal.enable_acl = off ## Default: ignore zone.internal.acl_deny_action = ignore +## See zone.$name.force_gc_policy +## zone.internal.force_gc_policy = 100000|100MB + ## See zone.$name.wildcard_subscription. ## ## Value: boolean diff --git a/priv/emqx.schema b/priv/emqx.schema index a6966f0a5..662415a68 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -915,7 +915,6 @@ end}. %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. {mapping, "zone.$name.force_gc_policy", "emqx.zones", [ - {default, "0 | 0MB"}, {datatype, string} ]}. diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index f5fe5029f..d08d3ba35 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -58,8 +58,7 @@ merge_opts(Defaults, Options) -> %% @doc Apply a function to a maybe argument. -spec(maybe_apply(fun((maybe(A)) -> maybe(A)), maybe(A)) -> maybe(A) when A :: any()). -maybe_apply(_Fun, undefined) -> - undefined; +maybe_apply(_Fun, undefined) -> undefined; maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ceae32823..204dc82ee 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -429,9 +429,9 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> run_gc(Stats, State = #state{gc_state = GcSt}) -> case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of + false -> State; {_IsGC, GcSt1} -> - State#state{gc_state = GcSt1}; - false -> State + State#state{gc_state = GcSt1} end. check_oom(State = #state{channel = Channel}) -> diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 0304f4c8a..68befdfdc 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -45,6 +45,8 @@ , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 + , get_env/2 + , get_env/3 ]}). %% APIs @@ -114,7 +116,7 @@ start_link() -> stop() -> gen_server:stop(?SERVER). --spec(init_gc_state(zone()) -> emqx_gc:gc_state()). +-spec(init_gc_state(zone()) -> maybe(emqx_gc:gc_state())). init_gc_state(Zone) -> maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)).