feat(olp): first commit for overload protection
- add lc app - add alarm handler for lc runq alarm - backoff when handling CONNECT message - close new connswhen overload
This commit is contained in:
parent
48599324de
commit
e9710ade14
|
@ -13,13 +13,14 @@
|
||||||
, {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}}
|
, {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
|
||||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}}
|
, {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}}
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.5"}}}
|
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.5"}}}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||||
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
||||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
|
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
|
||||||
|
, {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{plugins, [rebar3_proper]}.
|
{plugins, [rebar3_proper]}.
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
{vsn, "5.0.0"}, % strict semver, bump manually!
|
{vsn, "5.0.0"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy]},
|
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]},
|
||||||
{mod, {emqx_app,[]}},
|
{mod, {emqx_app,[]}},
|
||||||
{env, []},
|
{env, []},
|
||||||
{licenses, ["Apache-2.0"]},
|
{licenses, ["Apache-2.0"]},
|
||||||
|
|
|
@ -400,6 +400,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt,
|
||||||
|
|
||||||
normalize_message(Name, no_details) ->
|
normalize_message(Name, no_details) ->
|
||||||
list_to_binary(io_lib:format("~p", [Name]));
|
list_to_binary(io_lib:format("~p", [Name]));
|
||||||
|
normalize_message({runq_overload, _Node}, #{node := Node, runq_length := Len}) ->
|
||||||
|
list_to_binary(io_lib:format("Runq is overloaded on node: ~p: ~p", [Node, Len]));
|
||||||
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
|
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||||
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
|
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
|
||||||
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
|
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
-include_lib("lc/include/lc.hrl").
|
||||||
|
|
||||||
|
|
||||||
%% gen_event callbacks
|
%% gen_event callbacks
|
||||||
|
@ -74,6 +75,14 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) ->
|
||||||
emqx_alarm:deactivate(high_process_memory_usage),
|
emqx_alarm:deactivate(high_process_memory_usage),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
|
handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
|
||||||
|
emqx_alarm:activate({runq_overload, node()}, Info),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) ->
|
||||||
|
emqx_alarm:deactivate({runq_overload, node()}),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
handle_event(_, State) ->
|
handle_event(_, State) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
|
|
@ -291,7 +291,8 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
|
||||||
handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
|
handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
|
||||||
|
|
||||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
case pipeline([fun enrich_conninfo/2,
|
case pipeline([fun overload_protection/2,
|
||||||
|
fun enrich_conninfo/2,
|
||||||
fun run_conn_hooks/2,
|
fun run_conn_hooks/2,
|
||||||
fun check_connect/2,
|
fun check_connect/2,
|
||||||
fun enrich_client/2,
|
fun enrich_client/2,
|
||||||
|
@ -1142,6 +1143,10 @@ run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session})
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
|
||||||
|
T = get_mqtt_conf(Zone, overload_drawback_delay, 1),
|
||||||
|
emqx_olp:backoff(T),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enrich MQTT Connect Info
|
%% Enrich MQTT Connect Info
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_olp).
|
||||||
|
-export([ is_overloaded/0
|
||||||
|
, backoff/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-spec is_overloaded() -> boolean().
|
||||||
|
is_overloaded() ->
|
||||||
|
load_ctl:is_overloaded().
|
||||||
|
|
||||||
|
-spec backoff(timer:timeout()) -> ok | timeout.
|
||||||
|
backoff(Delay) ->
|
||||||
|
load_ctl:maydelay(Delay).
|
||||||
|
|
||||||
|
%%%_* Emacs ====================================================================
|
||||||
|
%%% Local Variables:
|
||||||
|
%%% allout-layout: t
|
||||||
|
%%% erlang-indent-level: 2
|
||||||
|
%%% End:
|
|
@ -48,7 +48,7 @@
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
|
||||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}}
|
, {esockd, {git, "https://github.com/emqx/esockd", {branch, "dev/william/acceptor-add-tune-func"}}}
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
|
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
|
||||||
|
@ -64,6 +64,7 @@
|
||||||
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}}
|
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}}
|
||||||
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
||||||
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}}
|
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}}
|
||||||
|
, {lc, {git, "https://github.com/qzhuyan/lc.git", {branch, "dev/william/add-alarm"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{xref_ignores,
|
{xref_ignores,
|
||||||
|
|
Loading…
Reference in New Issue