From e9710ade14d28d541e45dc3f88c2a52136b92670 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 1 Oct 2021 14:45:31 +0200 Subject: [PATCH 01/17] feat(olp): first commit for overload protection - add lc app - add alarm handler for lc runq alarm - backoff when handling CONNECT message - close new connswhen overload --- apps/emqx/rebar.config | 3 ++- apps/emqx/src/emqx.app.src | 2 +- apps/emqx/src/emqx_alarm.erl | 2 ++ apps/emqx/src/emqx_alarm_handler.erl | 9 ++++++++ apps/emqx/src/emqx_channel.erl | 7 +++++- apps/emqx/src/emqx_olp.erl | 33 ++++++++++++++++++++++++++++ rebar.config | 3 ++- 7 files changed, 55 insertions(+), 4 deletions(-) create mode 100644 apps/emqx/src/emqx_olp.erl diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 4ec7c7dc5..1a6a37d01 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -13,13 +13,14 @@ , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.5"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} + , {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}} ]}. {plugins, [rebar3_proper]}. diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 031e4f654..3f167d093 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -5,7 +5,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy]}, + {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]}, {mod, {emqx_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 6bd2d5d49..381933938 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -400,6 +400,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt, normalize_message(Name, no_details) -> list_to_binary(io_lib:format("~p", [Name])); +normalize_message({runq_overload, _Node}, #{node := Node, runq_length := Len}) -> + list_to_binary(io_lib:format("Runq is overloaded on node: ~p: ~p", [Node, Len])); normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> diff --git a/apps/emqx/src/emqx_alarm_handler.erl b/apps/emqx/src/emqx_alarm_handler.erl index 06f4e23a6..9e535c733 100644 --- a/apps/emqx/src/emqx_alarm_handler.erl +++ b/apps/emqx/src/emqx_alarm_handler.erl @@ -20,6 +20,7 @@ -include("emqx.hrl"). -include("logger.hrl"). +-include_lib("lc/include/lc.hrl"). %% gen_event callbacks @@ -74,6 +75,14 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) -> emqx_alarm:deactivate(high_process_memory_usage), {ok, State}; +handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> + emqx_alarm:activate({runq_overload, node()}, Info), + {ok, State}; + +handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) -> + emqx_alarm:deactivate({runq_overload, node()}), + {ok, State}; + handle_event(_, State) -> {ok, State}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0b1ff7e25..e7500164e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -291,7 +291,8 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) -> handle_out(connack, ?RC_PROTOCOL_ERROR, Channel); handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> - case pipeline([fun enrich_conninfo/2, + case pipeline([fun overload_protection/2, + fun enrich_conninfo/2, fun run_conn_hooks/2, fun check_connect/2, fun enrich_client/2, @@ -1142,6 +1143,10 @@ run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +overload_protection(_, #channel{clientinfo = #{zone := Zone}}) -> + T = get_mqtt_conf(Zone, overload_drawback_delay, 1), + emqx_olp:backoff(T), + ok. %%-------------------------------------------------------------------- %% Enrich MQTT Connect Info diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl new file mode 100644 index 000000000..9a57ea9ff --- /dev/null +++ b/apps/emqx/src/emqx_olp.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_olp). +-export([ is_overloaded/0 + , backoff/1 + ]). + +-spec is_overloaded() -> boolean(). +is_overloaded() -> + load_ctl:is_overloaded(). + +-spec backoff(timer:timeout()) -> ok | timeout. +backoff(Delay) -> + load_ctl:maydelay(Delay). + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: diff --git a/rebar.config b/rebar.config index ca8dd3e22..df71cd540 100644 --- a/rebar.config +++ b/rebar.config @@ -48,7 +48,7 @@ , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} @@ -64,6 +64,7 @@ , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}} + , {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}} ]}. {xref_ignores, From 166f02edc4156797588452a4227c4a66898a42fc Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 Oct 2021 10:44:53 +0200 Subject: [PATCH 02/17] feat(olp): don't hibernate conn proc when overloaded --- apps/emqx/src/emqx_connection.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 8d0e74313..139f8f6aa 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -323,7 +323,12 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> handle_recv(Msg, Parent, State) after IdleTimeout + 100 -> - hibernate(Parent, cancel_stats_timer(State)) + case emqx_olp:is_overloaded() of + true -> + recvloop(Parent, State); + false -> + hibernate(Parent, cancel_stats_timer(State)) + end end. handle_recv({system, From, Request}, Parent, State) -> From 9304e3c122e14357db87e3afe0a719a105506996 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 Oct 2021 12:00:32 +0200 Subject: [PATCH 03/17] feat(olp): add config and backoff gc --- apps/emqx/src/emqx_channel.erl | 3 +-- apps/emqx/src/emqx_connection.erl | 6 ++++-- apps/emqx/src/emqx_olp.erl | 25 ++++++++++++++++++++++--- apps/emqx/src/emqx_schema.erl | 25 ++++++++++++++++++++++++- 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index e7500164e..d5b55a0d5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1144,8 +1144,7 @@ run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) %% Internal functions %%-------------------------------------------------------------------- overload_protection(_, #channel{clientinfo = #{zone := Zone}}) -> - T = get_mqtt_conf(Zone, overload_drawback_delay, 1), - emqx_olp:backoff(T), + emqx_olp:backoff(Zone), ok. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 139f8f6aa..180747208 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -821,8 +821,10 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> %%-------------------------------------------------------------------- %% Run GC and Check OOM -run_gc(Stats, State = #state{gc_state = GcSt}) -> - case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of +run_gc(Stats, State = #state{gc_state = GcSt, zone = Zone}) -> + case ?ENABLED(GcSt) andalso not emqx_olp:backoff_gc(Zone) + andalso emqx_gc:run(Stats, GcSt) + of false -> State; {_IsGC, GcSt1} -> State#state{gc_state = GcSt1} diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index 9a57ea9ff..f38218765 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -16,16 +16,35 @@ -module(emqx_olp). -export([ is_overloaded/0 , backoff/1 + , backoff_gc/1 + , backoff_hibernation/1 ]). -spec is_overloaded() -> boolean(). is_overloaded() -> load_ctl:is_overloaded(). --spec backoff(timer:timeout()) -> ok | timeout. -backoff(Delay) -> - load_ctl:maydelay(Delay). +-spec backoff(Zone :: atom()) -> ok | timeout. +backoff(Zone) -> + case emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) of + true -> + Delay = emqx_config:get_zone_conf(Zone, [overload_protection, backoff_delay], 1), + load_ctl:maydelay(Delay); + false -> + ok + end. +-spec backoff_gc(Zone :: atom()) -> ok | timeout. +backoff_gc(Zone) -> + load_ctl:is_overloaded() + andalso emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) + andalso emqx_config:get_zone_conf(Zone, [overload_protection, backoff_gc], false). + +-spec backoff_hibernation(Zone :: atom()) -> ok | timeout. +backoff_hibernation(Zone) -> + load_ctl:is_overloaded() + andalso emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) + andalso emqx_config:get_zone_conf(Zone, [overload_protection, backoff_hibernation], false). %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 31f973421..cd9c5e265 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -122,6 +122,9 @@ roots(medium) -> , {"force_shutdown", sc(ref("force_shutdown"), #{})} + , {"overload_protection", + sc(ref("overload_protection"), + #{})} ]; roots(low) -> [ {"force_gc", @@ -323,7 +326,9 @@ fields("mqtt") -> fields("zone") -> Fields = ["mqtt", "stats", "flapping_detect", "force_shutdown", - "conn_congestion", "rate_limit", "quota", "force_gc"], + "conn_congestion", "rate_limit", "quota", "force_gc", + "overload_protection" + ], [{F, ref(emqx_zone_schema, F)} || F <- Fields]; fields("rate_limit") -> @@ -391,6 +396,24 @@ fields("force_shutdown") -> })} ]; +fields("overload_protection") -> + [ {"enable", + sc(boolean(), + #{ default => false})} + , {"backoff_delay", + sc(range(0, inf), + #{ default => 1 + })} + , {"backoff_gc", + sc(boolean(), + #{ default => true + })} + , {"backoff_hibernation", + sc(boolean(), + #{ default => true + })} + ]; + fields("conn_congestion") -> [ {"enable_alarm", sc(boolean(), From eb895a9f801aff6b29d83d4cbb9a2af3e773a533 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 Oct 2021 12:31:14 +0200 Subject: [PATCH 04/17] feat(olp): quic --- apps/emqx/src/emqx_quic_connection.erl | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index c23aec17b..eb16b9b0a 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -35,13 +35,18 @@ init(ConnOpts) when is_map(ConnOpts) -> -spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. new_conn(Conn, S) -> process_flag(trap_exit, true), - {ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S), - receive - {Pid, stream_acceptor_ready} -> - ok = quicer:async_handshake(Conn), - {ok, S}; - {'EXIT', Pid, _Reason} -> - {error, stream_accept_error} + case emqx_olp:is_overloaded() of + false -> + {ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S), + receive + {Pid, stream_acceptor_ready} -> + ok = quicer:async_handshake(Conn), + {ok, S}; + {'EXIT', Pid, _Reason} -> + {error, stream_accept_error} + end; + true -> + {error, overloaded} end. -spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. From 67267acb700a6d552e30f6c1cc226da00a7ff6b4 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 Oct 2021 14:36:30 +0200 Subject: [PATCH 05/17] feat(olp): management API --- apps/emqx/src/emqx_olp.erl | 21 +++++++++++++++++++++ apps/emqx_management/src/emqx_mgmt_cli.erl | 22 ++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index f38218765..758c2d348 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -14,12 +14,20 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_olp). + -export([ is_overloaded/0 , backoff/1 , backoff_gc/1 , backoff_hibernation/1 ]). + +%% exports for O&M +-export([ status/0 + , on/0 + , off/0 + ]). + -spec is_overloaded() -> boolean(). is_overloaded() -> load_ctl:is_overloaded(). @@ -45,6 +53,19 @@ backoff_hibernation(Zone) -> load_ctl:is_overloaded() andalso emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) andalso emqx_config:get_zone_conf(Zone, [overload_protection, backoff_hibernation], false). + +-spec status() -> any(). +status() -> + is_overloaded(). + +-spec off() -> ok | {error, timeout}. +off() -> + load_ctl:stop_runq_flagman(5000). + +-spec on() -> any(). +on() -> + load_ctl:restart_runq_flagman(). + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index abe64d886..bc6da3caf 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -38,6 +38,7 @@ , trace/1 , log/1 , authz/1 + , olp/1 ]). -define(PROC_INFOKEYS, [status, @@ -495,6 +496,27 @@ authz(_) -> {"authz cache-clean ", "Clears authorization cache for given client"} ]). + +%%-------------------------------------------------------------------- +%% @doc OLP (Overload Protection related) +olp(["status"]) -> + S = case emqx_olp:is_overloaded() of + true -> "overloaded"; + false -> "not overloaded" + end, + emqx_ctl:print("~p is ~s ~n", [node(), S]); +olp(["off"]) -> + Res = emqx_olp:off(), + emqx_ctl:print("Turn off overload protetion ~p : ~p ~n", [node(), Res]); +olp(["on"]) -> + Res = emqx_olp:on(), + emqx_ctl:print("Turn on overload protection ~p : ~p ~n", [node(), Res]); +olp(_) -> + emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"}, + {"olp on", "Turn on overload protection"}, + {"olp off", "Turn off overload protection"} + ]). + %%-------------------------------------------------------------------- %% Dump ETS %%-------------------------------------------------------------------- From 4dc63b26a8ca004ac85c7352cc35c988cff9bb6b Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 Oct 2021 15:03:14 +0200 Subject: [PATCH 06/17] feat(olp): metrics --- apps/emqx/src/emqx_connection.erl | 1 + apps/emqx/src/emqx_metrics.erl | 18 +++++++++++++++++- apps/emqx/src/emqx_olp.erl | 12 ++++++++++-- apps/emqx/src/emqx_quic_connection.erl | 1 + 4 files changed, 29 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 180747208..57c68510f 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -325,6 +325,7 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> IdleTimeout + 100 -> case emqx_olp:is_overloaded() of true -> + emqx_metrics:inc('olp.hbn'), recvloop(Parent, State); false -> hibernate(Parent, cancel_stats_timer(State)) diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 282b8b5f3..8b79f9ff2 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -184,6 +184,15 @@ {counter, 'session.terminated'} ]). +%% Overload protetion counters +-define(OLP_METRICS, + [{counter, 'olp.delay.ok'}, + {counter, 'olp.delay.timeout'}, + {counter, 'olp.hbn'}, + {counter, 'olp.gc'}, + {counter, 'olp.close.quic'} + ]). + -record(state, {next_idx = 1}). -record(metric, {name, type, idx}). @@ -430,7 +439,8 @@ init([]) -> ?MESSAGE_METRICS, ?DELIVERY_METRICS, ?CLIENT_METRICS, - ?SESSION_METRICS + ?SESSION_METRICS, + ?OLP_METRICS ]), % Store reserved indices ok = lists:foreach(fun({Type, Name}) -> @@ -571,5 +581,11 @@ reserved_idx('session.takeovered') -> 222; reserved_idx('session.discarded') -> 223; reserved_idx('session.terminated') -> 224; +reserved_idx('olp.delay.ok') -> 300; +reserved_idx('olp.delay.timeout') -> 301; +reserved_idx('olp.hbn') -> 302; +reserved_idx('olp.gc') -> 303; +reserved_idx('olp.close.quic') -> 304; + reserved_idx(_) -> undefined. diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index 758c2d348..2e3efe080 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -32,12 +32,20 @@ is_overloaded() -> load_ctl:is_overloaded(). --spec backoff(Zone :: atom()) -> ok | timeout. +-spec backoff(Zone :: atom()) -> ok | false | timeout. backoff(Zone) -> case emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) of true -> Delay = emqx_config:get_zone_conf(Zone, [overload_protection, backoff_delay], 1), - load_ctl:maydelay(Delay); + case load_ctl:maydelay(Delay) of + false -> false; + ok -> + emqx_metrics:inc('olp.delay.ok'), + ok; + timeout -> + emqx_metrics:inc('olp.delay.timeout'), + timeout + end; false -> ok end. diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index eb16b9b0a..aaf7321dd 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -46,6 +46,7 @@ new_conn(Conn, S) -> {error, stream_accept_error} end; true -> + emqx_metrics:inc('olp.close.quic'), {error, overloaded} end. From 370edac031c69a6957766a95e3d5138ce081ebaf Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 4 Oct 2021 16:15:49 +0200 Subject: [PATCH 07/17] feat(olp): add default config --- apps/emqx/etc/emqx.conf | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index df5ae9034..c747052f0 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -833,6 +833,33 @@ force_shutdown { max_heap_size = 32MB } +overload_protection { + ## React on system overload or not + ## @doc overload_protection.enable + ## ValueType: Boolean + ## Default: false + enable = false + + ## Backoff delay in ms + ## @doc overload_protection.backoff_delay + ## ValueType: Integer + ## Range: (0, infinity) + ## Default: 1 + backoff_delay = 1 + + ## Backoff GC enabled + ## @doc overload_protection.backoff_gc + ## ValueType: Boolean + ## Default: true + backoff_gc = true + + ## Backoff hibernation enabled + ## @doc overload_protection.backoff_hibernation + ## ValueType: Boolean + ## Default: true + backoff_hibernation = true +} + force_gc { ## Force the MQTT connection process GC after this number of ## messages or bytes passed through. From 6baf2dbd95c15c4a4f5e1108b3fa7ef53c3b7ba3 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 5 Oct 2021 09:21:32 +0200 Subject: [PATCH 08/17] feat(olp): backoff new conn --- apps/emqx/etc/emqx.conf | 10 ++++-- apps/emqx/src/emqx_connection.erl | 7 ++-- apps/emqx/src/emqx_listeners.erl | 4 ++- apps/emqx/src/emqx_metrics.erl | 4 +-- apps/emqx/src/emqx_olp.erl | 46 +++++++++++++++++++------- apps/emqx/src/emqx_quic_connection.erl | 2 +- apps/emqx/src/emqx_schema.erl | 6 +++- 7 files changed, 57 insertions(+), 22 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index c747052f0..2b70d6dda 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -850,14 +850,20 @@ overload_protection { ## Backoff GC enabled ## @doc overload_protection.backoff_gc ## ValueType: Boolean - ## Default: true - backoff_gc = true + ## Default: false + backoff_gc = false ## Backoff hibernation enabled ## @doc overload_protection.backoff_hibernation ## ValueType: Boolean ## Default: true backoff_hibernation = true + + ## Backoff hibernation enabled + ## @doc overload_protection.backoff_hibernation + ## ValueType: Boolean + ## Default: true + backoff_new_conn = true } force_gc { diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 57c68510f..2b81f3526 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -317,15 +317,16 @@ exit_on_sock_error(Reason) -> %%-------------------------------------------------------------------- %% Recv Loop -recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> +recvloop(Parent, State = #state{ idle_timeout = IdleTimeout + , zone = Zone + }) -> receive Msg -> handle_recv(Msg, Parent, State) after IdleTimeout + 100 -> - case emqx_olp:is_overloaded() of + case emqx_olp:backoff_hibernation(Zone) of true -> - emqx_metrics:inc('olp.hbn'), recvloop(Parent, State); false -> hibernate(Parent, cancel_stats_timer(State)) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 5c4776207..b9367d245 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -289,7 +289,9 @@ esockd_opts(Type, Opts0) -> infinity -> Opts1; Rate -> Opts1#{max_conn_rate => Rate} end, - Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))}, + Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])) + , tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]} + }, maps:to_list(case Type of tcp -> Opts3#{tcp_options => tcp_opts(Opts0)}; ssl -> Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)} diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 8b79f9ff2..c8cef1308 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -190,7 +190,7 @@ {counter, 'olp.delay.timeout'}, {counter, 'olp.hbn'}, {counter, 'olp.gc'}, - {counter, 'olp.close.quic'} + {counter, 'olp.new_conn'} ]). -record(state, {next_idx = 1}). @@ -585,7 +585,7 @@ reserved_idx('olp.delay.ok') -> 300; reserved_idx('olp.delay.timeout') -> 301; reserved_idx('olp.hbn') -> 302; reserved_idx('olp.gc') -> 303; -reserved_idx('olp.close.quic') -> 304; +reserved_idx('olp.new_conn') -> 304; reserved_idx(_) -> undefined. diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index 2e3efe080..aa4966e36 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -19,6 +19,7 @@ , backoff/1 , backoff_gc/1 , backoff_hibernation/1 + , backoff_new_conn/1 ]). @@ -28,15 +29,16 @@ , off/0 ]). +-define(overload_protection, overload_protection). + -spec is_overloaded() -> boolean(). is_overloaded() -> load_ctl:is_overloaded(). -spec backoff(Zone :: atom()) -> ok | false | timeout. backoff(Zone) -> - case emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) of - true -> - Delay = emqx_config:get_zone_conf(Zone, [overload_protection, backoff_delay], 1), + case emqx_config:get_zone_conf(Zone, [?overload_protection]) of + #{enable := true, backoff_delay := Delay} -> case load_ctl:maydelay(Delay) of false -> false; ok -> @@ -46,21 +48,26 @@ backoff(Zone) -> emqx_metrics:inc('olp.delay.timeout'), timeout end; - false -> + _ -> ok end. --spec backoff_gc(Zone :: atom()) -> ok | timeout. +-spec backoff_gc(Zone :: atom()) -> boolean(). backoff_gc(Zone) -> - load_ctl:is_overloaded() - andalso emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) - andalso emqx_config:get_zone_conf(Zone, [overload_protection, backoff_gc], false). + do_check(Zone, ?FUNCTION_NAME, 'olp.gc'). --spec backoff_hibernation(Zone :: atom()) -> ok | timeout. +-spec backoff_hibernation(Zone :: atom()) -> boolean(). backoff_hibernation(Zone) -> - load_ctl:is_overloaded() - andalso emqx_config:get_zone_conf(Zone, [overload_protection, enable], false) - andalso emqx_config:get_zone_conf(Zone, [overload_protection, backoff_hibernation], false). + do_check(Zone, ?FUNCTION_NAME, 'olp.hbn'). + +-spec backoff_new_conn(Zone :: atom()) -> ok | {error, overloaded}. +backoff_new_conn(Zone) -> + case do_check(Zone, ?FUNCTION_NAME, 'olp.new_conn') of + true -> + {error, overloaded}; + false -> + ok + end. -spec status() -> any(). status() -> @@ -74,6 +81,21 @@ off() -> on() -> load_ctl:restart_runq_flagman(). +%%% Internals +do_check(Zone, Key, CntName) -> + case load_ctl:is_overloaded() of + true -> + case emqx_config:get_zone_conf(Zone, [?overload_protection]) of + #{enable := true, Key := true} -> + emqx_metrics:inc(CntName), + true; + _ -> + false + end; + false -> false + end. + + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index aaf7321dd..cc195419c 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -46,7 +46,7 @@ new_conn(Conn, S) -> {error, stream_accept_error} end; true -> - emqx_metrics:inc('olp.close.quic'), + emqx_metrics:inc('olp.new_conn'), {error, overloaded} end. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index cd9c5e265..bbb9fd67c 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -406,12 +406,16 @@ fields("overload_protection") -> })} , {"backoff_gc", sc(boolean(), - #{ default => true + #{ default => false })} , {"backoff_hibernation", sc(boolean(), #{ default => true })} + , {"backoff_new_conn", + sc(boolean(), + #{ default => true + })} ]; fields("conn_congestion") -> From 547484a2d1e1deb4add31512db6c99772267c6bf Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 5 Oct 2021 09:47:21 +0200 Subject: [PATCH 09/17] chore(olp): add spec. --- apps/emqx/src/emqx_olp.erl | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index aa4966e36..3e47e5eed 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -29,6 +29,18 @@ , off/0 ]). +-type cfg_key() :: + backoff_gc | + backoff_hibernation | + backoff_new_conn. + +-type cnt_name() :: + 'olp.delay.ok' | + 'olp.delay.timeout' | + 'olp.hbn' | + 'olp.gc' | + 'olp.new_conn'. + -define(overload_protection, overload_protection). -spec is_overloaded() -> boolean(). @@ -82,6 +94,7 @@ on() -> load_ctl:restart_runq_flagman(). %%% Internals +-spec do_check(Zone::atom(), cfg_key(), cnt_name()) -> boolean(). do_check(Zone, Key, CntName) -> case load_ctl:is_overloaded() of true -> From bfe4346469558c0e7d3859afe394830368f322b6 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 5 Oct 2021 14:36:36 +0200 Subject: [PATCH 10/17] feat(olp): alarm without nodeid --- apps/emqx/src/emqx_alarm.erl | 2 +- apps/emqx/src/emqx_alarm_handler.erl | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 381933938..fe9933ae0 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -400,7 +400,7 @@ normalize(#deactivated_alarm{activate_at = ActivateAt, normalize_message(Name, no_details) -> list_to_binary(io_lib:format("~p", [Name])); -normalize_message({runq_overload, _Node}, #{node := Node, runq_length := Len}) -> +normalize_message(runq_overload, #{node := Node, runq_length := Len}) -> list_to_binary(io_lib:format("Runq is overloaded on node: ~p: ~p", [Node, Len])); normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); diff --git a/apps/emqx/src/emqx_alarm_handler.erl b/apps/emqx/src/emqx_alarm_handler.erl index 9e535c733..4cf699895 100644 --- a/apps/emqx/src/emqx_alarm_handler.erl +++ b/apps/emqx/src/emqx_alarm_handler.erl @@ -76,11 +76,11 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) -> {ok, State}; handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> - emqx_alarm:activate({runq_overload, node()}, Info), + emqx_alarm:activate(runq_overload, Info), {ok, State}; handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) -> - emqx_alarm:deactivate({runq_overload, node()}), + emqx_alarm:deactivate(runq_overload), {ok, State}; handle_event(_, State) -> From afb4b5dbd75315b4cac24292c377213e767154ca Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 5 Oct 2021 14:37:39 +0200 Subject: [PATCH 11/17] test(olp): add CT suite emqx_olp_SUITE --- apps/emqx/test/emqx_olp_SUITE.erl | 97 +++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 apps/emqx/test/emqx_olp_SUITE.erl diff --git a/apps/emqx/test/emqx_olp_SUITE.erl b/apps/emqx/test/emqx_olp_SUITE.erl new file mode 100644 index 000000000..58777c2e2 --- /dev/null +++ b/apps/emqx/test/emqx_olp_SUITE.erl @@ -0,0 +1,97 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_olp_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("lc/include/lc.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +init_per_testcase(_, Config) -> + emqx_olp:on(), + ok = load_ctl:put_config(#{ ?RUNQ_MON_T1 => 200 + , ?RUNQ_MON_T2 => 50 + , ?RUNQ_MON_C1 => 3 + }), + Config. + +%% Test that olp could be enabled/disabled globally +t_off_on(_Config) -> + Old = load_ctl:whereis_runq_flagman(), + ok = emqx_olp:off(), + ?assert(not is_process_alive(Old)), + {ok, Pid} = emqx_olp:on(), + ?assert(is_process_alive(Pid)). + +%% Test that overload detection works +t_is_overloaded(_Config) -> + P = burst_runq(), + timer:sleep(2000), + ?assert(emqx_olp:is_overloaded()), + exit(P, kill), + timer:sleep(2000), + ?assert(not emqx_olp:is_overloaded()). + +%% Test that new conn is rejected when olp is enabled +t_overloaded_conn(_Config) -> + process_flag(trap_exit, true), + ?assert(erlang:is_process_alive(load_ctl:whereis_runq_flagman())), + emqx_config:put([overload_protection, enable], true), + P = burst_runq(), + timer:sleep(1000), + ?assert(emqx_olp:is_overloaded()), + true = emqx:is_running(node()), + {ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]), + ?assertNotMatch({ok, _Pid}, emqtt:connect(C)), + exit(P, kill). + +%% Test that new conn is rejected when olp is enabled +t_overload_cooldown_conn(Config) -> + t_overloaded_conn(Config), + timer:sleep(1000), + ?assert(not emqx_olp:is_overloaded()), + {ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]), + ?assertMatch({ok, _Pid}, emqtt:connect(C)), + emqtt:stop(C). + +-spec burst_runq() -> ParentToKill :: pid(). +burst_runq() -> + NProc = erlang:system_info(schedulers_online), + spawn(?MODULE, worker_parent, [NProc * 10, {?MODULE, busy_loop, []}]). + +%% internal helpers +worker_parent(N, {M, F, A}) -> + lists:foreach(fun(_) -> + proc_lib:spawn_link(fun() -> apply(M, F, A) end) + end, lists:seq(1, N)), + receive stop -> ok end. + +busy_loop() -> + erlang:yield(), + busy_loop(). From 9ecb7821f9f6145aad8b2fc3dfb26c1680e6d42c Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 6 Oct 2021 09:40:09 +0200 Subject: [PATCH 12/17] feat(olp): use tagged deps libs --- apps/emqx/rebar.config | 6 +++--- rebar.config | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 1a6a37d01..5c21b80e5 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -9,18 +9,18 @@ %% This rebar.config is necessary because the app may be used as a %% `git_subdir` dependency in other projects. {deps, - [ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} + [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.0"}}} + , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.5"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} - , {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}} ]}. {plugins, [rebar3_proper]}. diff --git a/rebar.config b/rebar.config index df71cd540..200b892fe 100644 --- a/rebar.config +++ b/rebar.config @@ -42,13 +42,14 @@ {post_hooks,[]}. {deps, - [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps + [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.0"}}} + , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.9"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} @@ -64,7 +65,6 @@ , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}} - , {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}} ]}. {xref_ignores, From 77f8159ca17cef4f1338bb45c485205445225ccd Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 6 Oct 2021 11:16:14 +0200 Subject: [PATCH 13/17] docs(olp): emqx_olp module --- apps/emqx/src/emqx_olp.erl | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index 3e47e5eed..d3f93f455 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -43,10 +43,15 @@ -define(overload_protection, overload_protection). +%% @doc Light realtime check if system is overloaded. -spec is_overloaded() -> boolean(). is_overloaded() -> load_ctl:is_overloaded(). +%% @doc Backoff with a delay if the system is overloaded, for tasks that could be deferred. +%% returns `false' if backoff didn't happen, the system is cool. +%% returns `ok' if backoff is triggered and get unblocked when the system is cool. +%% returns `timeout' if backoff is trigged but get unblocked due to timeout as configured. -spec backoff(Zone :: atom()) -> ok | false | timeout. backoff(Zone) -> case emqx_config:get_zone_conf(Zone, [?overload_protection]) of @@ -64,14 +69,18 @@ backoff(Zone) -> ok end. +%% @doc If forceful GC should be skipped when the system is overloaded. -spec backoff_gc(Zone :: atom()) -> boolean(). backoff_gc(Zone) -> do_check(Zone, ?FUNCTION_NAME, 'olp.gc'). +%% @doc If hibernation should be skipped when the system is overloaded. -spec backoff_hibernation(Zone :: atom()) -> boolean(). backoff_hibernation(Zone) -> do_check(Zone, ?FUNCTION_NAME, 'olp.hbn'). +%% @doc Returns {error, overloaded} if new connection should be +%% closed when system is overloaded. -spec backoff_new_conn(Zone :: atom()) -> ok | {error, overloaded}. backoff_new_conn(Zone) -> case do_check(Zone, ?FUNCTION_NAME, 'olp.new_conn') of @@ -85,11 +94,13 @@ backoff_new_conn(Zone) -> status() -> is_overloaded(). +%% @doc turn off backgroud runq check. -spec off() -> ok | {error, timeout}. off() -> load_ctl:stop_runq_flagman(5000). --spec on() -> any(). +%% @doc turn on backgroud runq check. +-spec on() -> {ok, pid()} | {error, running | restarting}. on() -> load_ctl:restart_runq_flagman(). From 8d8969672b24939726d4aa934d80abaa948f0954 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 7 Oct 2021 16:03:54 +0200 Subject: [PATCH 14/17] feat(olp): add desc in schema --- apps/emqx/src/emqx_schema.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index bbb9fd67c..0a1df16b4 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -399,22 +399,29 @@ fields("force_shutdown") -> fields("overload_protection") -> [ {"enable", sc(boolean(), - #{ default => false})} + #{ desc => "React on system overload or not" + , default => false + })} , {"backoff_delay", sc(range(0, inf), - #{ default => 1 + #{ desc => "Some unimporant tasks could be delayed" + "for execution, here set the delays in ms" + , default => 1 })} , {"backoff_gc", sc(boolean(), - #{ default => false + #{ desc => "Skip forceful GC if necessary" + , default => false })} , {"backoff_hibernation", sc(boolean(), - #{ default => true + #{ desc => "Skip process hibernation if necessary" + , default => true })} , {"backoff_new_conn", sc(boolean(), - #{ default => true + #{ desc => "Close new incoming connections if necessary" + , default => true })} ]; From b0c27c74fd2fed880d05993df12cdfb3e85587e5 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 11 Oct 2021 20:17:19 +0200 Subject: [PATCH 15/17] feat(olp): bump lc to 0.1.1 --- apps/emqx/rebar.config | 2 +- apps/emqx/src/emqx_olp.erl | 13 ++++++++++-- apps/emqx/test/emqx_olp_SUITE.erl | 35 ++++++++++++++++++++++++------- rebar.config | 2 +- 4 files changed, 41 insertions(+), 11 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 5c21b80e5..d388e6803 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -9,7 +9,7 @@ %% This rebar.config is necessary because the app may be used as a %% `git_subdir` dependency in other projects. {deps, - [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.0"}}} + [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.1"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index d3f93f455..101064bb6 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -15,6 +15,8 @@ %%-------------------------------------------------------------------- -module(emqx_olp). +-include_lib("lc/include/lc.hrl"). + -export([ is_overloaded/0 , backoff/1 , backoff_gc/1 @@ -100,9 +102,16 @@ off() -> load_ctl:stop_runq_flagman(5000). %% @doc turn on backgroud runq check. --spec on() -> {ok, pid()} | {error, running | restarting}. +-spec on() -> {ok, pid()} | {error, running | restarting | disabled}. on() -> - load_ctl:restart_runq_flagman(). + case load_ctl:restart_runq_flagman() of + {error, disabled} -> + OldCfg = load_ctl:get_config(), + ok = load_ctl:put_config(OldCfg#{ ?RUNQ_MON_F0 => true }), + load_ctl:restart_runq_flagman(); + Other -> + Other + end. %%% Internals -spec do_check(Zone::atom(), cfg_key(), cnt_name()) -> boolean(). diff --git a/apps/emqx/test/emqx_olp_SUITE.erl b/apps/emqx/test/emqx_olp_SUITE.erl index 58777c2e2..0096765b9 100644 --- a/apps/emqx/test/emqx_olp_SUITE.erl +++ b/apps/emqx/test/emqx_olp_SUITE.erl @@ -35,11 +35,20 @@ end_per_suite(_Config) -> init_per_testcase(_, Config) -> emqx_olp:on(), - ok = load_ctl:put_config(#{ ?RUNQ_MON_T1 => 200 - , ?RUNQ_MON_T2 => 50 - , ?RUNQ_MON_C1 => 3 - }), - Config. + case wait_for(fun() -> lc_sup:whereis_runq_flagman() end, 10) of + true -> ok; + false -> + ct:fail("runq_flagman is not up") + end, + ok = load_ctl:put_config(#{ ?RUNQ_MON_F0 => true + , ?RUNQ_MON_F1 => 5 + , ?RUNQ_MON_F2 => 1 + , ?RUNQ_MON_T1 => 200 + , ?RUNQ_MON_T2 => 50 + , ?RUNQ_MON_C1 => 2 + , ?RUNQ_MON_F5 => -1 + }), + Config. %% Test that olp could be enabled/disabled globally t_off_on(_Config) -> @@ -47,15 +56,16 @@ t_off_on(_Config) -> ok = emqx_olp:off(), ?assert(not is_process_alive(Old)), {ok, Pid} = emqx_olp:on(), + timer:sleep(1000), ?assert(is_process_alive(Pid)). %% Test that overload detection works t_is_overloaded(_Config) -> P = burst_runq(), - timer:sleep(2000), + timer:sleep(3000), ?assert(emqx_olp:is_overloaded()), exit(P, kill), - timer:sleep(2000), + timer:sleep(3000), ?assert(not emqx_olp:is_overloaded()). %% Test that new conn is rejected when olp is enabled @@ -95,3 +105,14 @@ worker_parent(N, {M, F, A}) -> busy_loop() -> erlang:yield(), busy_loop(). + +wait_for(_Fun, 0) -> + false; +wait_for(Fun, Retry) -> + case is_pid(Fun()) of + true -> + true; + false -> + timer:sleep(10), + wait_for(Fun, Retry - 1) + end. diff --git a/rebar.config b/rebar.config index 200b892fe..2829095b0 100644 --- a/rebar.config +++ b/rebar.config @@ -42,7 +42,7 @@ {post_hooks,[]}. {deps, - [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.0"}}} + [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.1"}}} , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.9"}}} From 58033c083d38cf2f3ba93867034f7615cb42a7f0 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 12 Oct 2021 16:05:14 +0200 Subject: [PATCH 16/17] chore(olp): update alarm text --- apps/emqx/src/emqx_alarm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index fe9933ae0..f9ab5f295 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -401,7 +401,7 @@ normalize(#deactivated_alarm{activate_at = ActivateAt, normalize_message(Name, no_details) -> list_to_binary(io_lib:format("~p", [Name])); normalize_message(runq_overload, #{node := Node, runq_length := Len}) -> - list_to_binary(io_lib:format("Runq is overloaded on node: ~p: ~p", [Node, Len])); + list_to_binary(io_lib:format("VM is overloaded on node: ~p: ~p", [Node, Len])); normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> From 23fc8afc5005ce1826268b75254748c6a6ce3503 Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 13 Oct 2021 08:54:50 +0200 Subject: [PATCH 17/17] feat(olp): rename olp mgmt API on -> enable off -> disable --- apps/emqx/src/emqx_olp.erl | 12 ++++++------ apps/emqx/test/emqx_olp_SUITE.erl | 8 ++++---- apps/emqx_management/src/emqx_mgmt_cli.erl | 18 +++++++++--------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index 101064bb6..df97beb35 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -27,8 +27,8 @@ %% exports for O&M -export([ status/0 - , on/0 - , off/0 + , enable/0 + , disable/0 ]). -type cfg_key() :: @@ -97,13 +97,13 @@ status() -> is_overloaded(). %% @doc turn off backgroud runq check. --spec off() -> ok | {error, timeout}. -off() -> +-spec disable() -> ok | {error, timeout}. +disable() -> load_ctl:stop_runq_flagman(5000). %% @doc turn on backgroud runq check. --spec on() -> {ok, pid()} | {error, running | restarting | disabled}. -on() -> +-spec enable() -> {ok, pid()} | {error, running | restarting | disabled}. +enable() -> case load_ctl:restart_runq_flagman() of {error, disabled} -> OldCfg = load_ctl:get_config(), diff --git a/apps/emqx/test/emqx_olp_SUITE.erl b/apps/emqx/test/emqx_olp_SUITE.erl index 0096765b9..04a294558 100644 --- a/apps/emqx/test/emqx_olp_SUITE.erl +++ b/apps/emqx/test/emqx_olp_SUITE.erl @@ -34,7 +34,7 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). init_per_testcase(_, Config) -> - emqx_olp:on(), + emqx_olp:enable(), case wait_for(fun() -> lc_sup:whereis_runq_flagman() end, 10) of true -> ok; false -> @@ -51,11 +51,11 @@ init_per_testcase(_, Config) -> Config. %% Test that olp could be enabled/disabled globally -t_off_on(_Config) -> +t_disable_enable(_Config) -> Old = load_ctl:whereis_runq_flagman(), - ok = emqx_olp:off(), + ok = emqx_olp:disable(), ?assert(not is_process_alive(Old)), - {ok, Pid} = emqx_olp:on(), + {ok, Pid} = emqx_olp:enable(), timer:sleep(1000), ?assert(is_process_alive(Pid)). diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index bc6da3caf..b3b38154b 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -505,16 +505,16 @@ olp(["status"]) -> false -> "not overloaded" end, emqx_ctl:print("~p is ~s ~n", [node(), S]); -olp(["off"]) -> - Res = emqx_olp:off(), - emqx_ctl:print("Turn off overload protetion ~p : ~p ~n", [node(), Res]); -olp(["on"]) -> - Res = emqx_olp:on(), - emqx_ctl:print("Turn on overload protection ~p : ~p ~n", [node(), Res]); +olp(["disable"]) -> + Res = emqx_olp:disable(), + emqx_ctl:print("Disable overload protetion ~p : ~p ~n", [node(), Res]); +olp(["enable"]) -> + Res = emqx_olp:enable(), + emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]); olp(_) -> - emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"}, - {"olp on", "Turn on overload protection"}, - {"olp off", "Turn off overload protection"} + emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"}, + {"olp enable", "Enable overload protection"}, + {"olp disable", "Disable overload protection"} ]). %%--------------------------------------------------------------------