From e9710ade14d28d541e45dc3f88c2a52136b92670 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 1 Oct 2021 14:45:31 +0200 Subject: [PATCH] feat(olp): first commit for overload protection - add lc app - add alarm handler for lc runq alarm - backoff when handling CONNECT message - close new connswhen overload --- apps/emqx/rebar.config | 3 ++- apps/emqx/src/emqx.app.src | 2 +- apps/emqx/src/emqx_alarm.erl | 2 ++ apps/emqx/src/emqx_alarm_handler.erl | 9 ++++++++ apps/emqx/src/emqx_channel.erl | 7 +++++- apps/emqx/src/emqx_olp.erl | 33 ++++++++++++++++++++++++++++ rebar.config | 3 ++- 7 files changed, 55 insertions(+), 4 deletions(-) create mode 100644 apps/emqx/src/emqx_olp.erl diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 4ec7c7dc5..1a6a37d01 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -13,13 +13,14 @@ , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.5"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} + , {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}} ]}. {plugins, [rebar3_proper]}. diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 031e4f654..3f167d093 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -5,7 +5,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy]}, + {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]}, {mod, {emqx_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 6bd2d5d49..381933938 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -400,6 +400,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt, normalize_message(Name, no_details) -> list_to_binary(io_lib:format("~p", [Name])); +normalize_message({runq_overload, _Node}, #{node := Node, runq_length := Len}) -> + list_to_binary(io_lib:format("Runq is overloaded on node: ~p: ~p", [Node, Len])); normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> diff --git a/apps/emqx/src/emqx_alarm_handler.erl b/apps/emqx/src/emqx_alarm_handler.erl index 06f4e23a6..9e535c733 100644 --- a/apps/emqx/src/emqx_alarm_handler.erl +++ b/apps/emqx/src/emqx_alarm_handler.erl @@ -20,6 +20,7 @@ -include("emqx.hrl"). -include("logger.hrl"). +-include_lib("lc/include/lc.hrl"). %% gen_event callbacks @@ -74,6 +75,14 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) -> emqx_alarm:deactivate(high_process_memory_usage), {ok, State}; +handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> + emqx_alarm:activate({runq_overload, node()}, Info), + {ok, State}; + +handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) -> + emqx_alarm:deactivate({runq_overload, node()}), + {ok, State}; + handle_event(_, State) -> {ok, State}. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0b1ff7e25..e7500164e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -291,7 +291,8 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) -> handle_out(connack, ?RC_PROTOCOL_ERROR, Channel); handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> - case pipeline([fun enrich_conninfo/2, + case pipeline([fun overload_protection/2, + fun enrich_conninfo/2, fun run_conn_hooks/2, fun check_connect/2, fun enrich_client/2, @@ -1142,6 +1143,10 @@ run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +overload_protection(_, #channel{clientinfo = #{zone := Zone}}) -> + T = get_mqtt_conf(Zone, overload_drawback_delay, 1), + emqx_olp:backoff(T), + ok. %%-------------------------------------------------------------------- %% Enrich MQTT Connect Info diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl new file mode 100644 index 000000000..9a57ea9ff --- /dev/null +++ b/apps/emqx/src/emqx_olp.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_olp). +-export([ is_overloaded/0 + , backoff/1 + ]). + +-spec is_overloaded() -> boolean(). +is_overloaded() -> + load_ctl:is_overloaded(). + +-spec backoff(timer:timeout()) -> ok | timeout. +backoff(Delay) -> + load_ctl:maydelay(Delay). + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: diff --git a/rebar.config b/rebar.config index ca8dd3e22..df71cd540 100644 --- a/rebar.config +++ b/rebar.config @@ -48,7 +48,7 @@ , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}} @@ -64,6 +64,7 @@ , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}} + , {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}} ]}. {xref_ignores,