Merge pull request #5867 from qzhuyan/dev/william/olp

feat(olp): first PR for overload protection
This commit is contained in:
William Yang 2021-10-14 10:00:01 +02:00 committed by GitHub
commit dcca1d7544
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 413 additions and 20 deletions

View File

@ -833,6 +833,39 @@ force_shutdown {
max_heap_size = 32MB
}
overload_protection {
## React on system overload or not
## @doc overload_protection.enable
## ValueType: Boolean
## Default: false
enable = false
## Backoff delay in ms
## @doc overload_protection.backoff_delay
## ValueType: Integer
## Range: (0, infinity)
## Default: 1
backoff_delay = 1
## Backoff GC enabled
## @doc overload_protection.backoff_gc
## ValueType: Boolean
## Default: false
backoff_gc = false
## Backoff hibernation enabled
## @doc overload_protection.backoff_hibernation
## ValueType: Boolean
## Default: true
backoff_hibernation = true
## Backoff hibernation enabled
## @doc overload_protection.backoff_hibernation
## ValueType: Boolean
## Default: true
backoff_new_conn = true
}
force_gc {
## Force the MQTT connection process GC after this number of
## messages or bytes passed through.

View File

@ -9,11 +9,12 @@
%% This rebar.config is necessary because the app may be used as a
%% `git_subdir` dependency in other projects.
{deps,
[ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
[ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.1"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}}

View File

@ -5,7 +5,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy]},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]},
{mod, {emqx_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -410,6 +410,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt,
normalize_message(Name, no_details) ->
list_to_binary(io_lib:format("~p", [Name]));
normalize_message(runq_overload, #{node := Node, runq_length := Len}) ->
list_to_binary(io_lib:format("VM is overloaded on node: ~p: ~p", [Node, Len]));
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->

View File

@ -20,6 +20,7 @@
-include("emqx.hrl").
-include("logger.hrl").
-include_lib("lc/include/lc.hrl").
%% gen_event callbacks
@ -74,6 +75,14 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_process_memory_usage),
{ok, State};
handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
emqx_alarm:activate(runq_overload, Info),
{ok, State};
handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) ->
emqx_alarm:deactivate(runq_overload),
{ok, State};
handle_event(_, State) ->
{ok, State}.

View File

@ -291,7 +291,8 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
case pipeline([fun enrich_conninfo/2,
case pipeline([fun overload_protection/2,
fun enrich_conninfo/2,
fun run_conn_hooks/2,
fun check_connect/2,
fun enrich_client/2,
@ -1158,6 +1159,9 @@ run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session})
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
emqx_olp:backoff(Zone),
ok.
%%--------------------------------------------------------------------
%% Enrich MQTT Connect Info

View File

@ -317,13 +317,20 @@ exit_on_sock_error(Reason) ->
%%--------------------------------------------------------------------
%% Recv Loop
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
recvloop(Parent, State = #state{ idle_timeout = IdleTimeout
, zone = Zone
}) ->
receive
Msg ->
handle_recv(Msg, Parent, State)
after
IdleTimeout + 100 ->
hibernate(Parent, cancel_stats_timer(State))
case emqx_olp:backoff_hibernation(Zone) of
true ->
recvloop(Parent, State);
false ->
hibernate(Parent, cancel_stats_timer(State))
end
end.
handle_recv({system, From, Request}, Parent, State) ->
@ -822,8 +829,10 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
%%--------------------------------------------------------------------
%% Run GC and Check OOM
run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
run_gc(Stats, State = #state{gc_state = GcSt, zone = Zone}) ->
case ?ENABLED(GcSt) andalso not emqx_olp:backoff_gc(Zone)
andalso emqx_gc:run(Stats, GcSt)
of
false -> State;
{_IsGC, GcSt1} ->
State#state{gc_state = GcSt1}

View File

@ -289,7 +289,9 @@ esockd_opts(Type, Opts0) ->
infinity -> Opts1;
Rate -> Opts1#{max_conn_rate => Rate}
end,
Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))},
Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))
, tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}
},
maps:to_list(case Type of
tcp -> Opts3#{tcp_options => tcp_opts(Opts0)};
ssl -> Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)}

View File

@ -184,6 +184,15 @@
{counter, 'session.terminated'}
]).
%% Overload protetion counters
-define(OLP_METRICS,
[{counter, 'olp.delay.ok'},
{counter, 'olp.delay.timeout'},
{counter, 'olp.hbn'},
{counter, 'olp.gc'},
{counter, 'olp.new_conn'}
]).
-record(state, {next_idx = 1}).
-record(metric, {name, type, idx}).
@ -430,7 +439,8 @@ init([]) ->
?MESSAGE_METRICS,
?DELIVERY_METRICS,
?CLIENT_METRICS,
?SESSION_METRICS
?SESSION_METRICS,
?OLP_METRICS
]),
% Store reserved indices
ok = lists:foreach(fun({Type, Name}) ->
@ -575,5 +585,11 @@ reserved_idx('session.takeovered') -> 222;
reserved_idx('session.discarded') -> 223;
reserved_idx('session.terminated') -> 224;
reserved_idx('olp.delay.ok') -> 300;
reserved_idx('olp.delay.timeout') -> 301;
reserved_idx('olp.hbn') -> 302;
reserved_idx('olp.gc') -> 303;
reserved_idx('olp.new_conn') -> 304;
reserved_idx(_) -> undefined.

136
apps/emqx/src/emqx_olp.erl Normal file
View File

@ -0,0 +1,136 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_olp).
-include_lib("lc/include/lc.hrl").
-export([ is_overloaded/0
, backoff/1
, backoff_gc/1
, backoff_hibernation/1
, backoff_new_conn/1
]).
%% exports for O&M
-export([ status/0
, enable/0
, disable/0
]).
-type cfg_key() ::
backoff_gc |
backoff_hibernation |
backoff_new_conn.
-type cnt_name() ::
'olp.delay.ok' |
'olp.delay.timeout' |
'olp.hbn' |
'olp.gc' |
'olp.new_conn'.
-define(overload_protection, overload_protection).
%% @doc Light realtime check if system is overloaded.
-spec is_overloaded() -> boolean().
is_overloaded() ->
load_ctl:is_overloaded().
%% @doc Backoff with a delay if the system is overloaded, for tasks that could be deferred.
%% returns `false' if backoff didn't happen, the system is cool.
%% returns `ok' if backoff is triggered and get unblocked when the system is cool.
%% returns `timeout' if backoff is trigged but get unblocked due to timeout as configured.
-spec backoff(Zone :: atom()) -> ok | false | timeout.
backoff(Zone) ->
case emqx_config:get_zone_conf(Zone, [?overload_protection]) of
#{enable := true, backoff_delay := Delay} ->
case load_ctl:maydelay(Delay) of
false -> false;
ok ->
emqx_metrics:inc('olp.delay.ok'),
ok;
timeout ->
emqx_metrics:inc('olp.delay.timeout'),
timeout
end;
_ ->
ok
end.
%% @doc If forceful GC should be skipped when the system is overloaded.
-spec backoff_gc(Zone :: atom()) -> boolean().
backoff_gc(Zone) ->
do_check(Zone, ?FUNCTION_NAME, 'olp.gc').
%% @doc If hibernation should be skipped when the system is overloaded.
-spec backoff_hibernation(Zone :: atom()) -> boolean().
backoff_hibernation(Zone) ->
do_check(Zone, ?FUNCTION_NAME, 'olp.hbn').
%% @doc Returns {error, overloaded} if new connection should be
%% closed when system is overloaded.
-spec backoff_new_conn(Zone :: atom()) -> ok | {error, overloaded}.
backoff_new_conn(Zone) ->
case do_check(Zone, ?FUNCTION_NAME, 'olp.new_conn') of
true ->
{error, overloaded};
false ->
ok
end.
-spec status() -> any().
status() ->
is_overloaded().
%% @doc turn off backgroud runq check.
-spec disable() -> ok | {error, timeout}.
disable() ->
load_ctl:stop_runq_flagman(5000).
%% @doc turn on backgroud runq check.
-spec enable() -> {ok, pid()} | {error, running | restarting | disabled}.
enable() ->
case load_ctl:restart_runq_flagman() of
{error, disabled} ->
OldCfg = load_ctl:get_config(),
ok = load_ctl:put_config(OldCfg#{ ?RUNQ_MON_F0 => true }),
load_ctl:restart_runq_flagman();
Other ->
Other
end.
%%% Internals
-spec do_check(Zone::atom(), cfg_key(), cnt_name()) -> boolean().
do_check(Zone, Key, CntName) ->
case load_ctl:is_overloaded() of
true ->
case emqx_config:get_zone_conf(Zone, [?overload_protection]) of
#{enable := true, Key := true} ->
emqx_metrics:inc(CntName),
true;
_ ->
false
end;
false -> false
end.
%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End:

View File

@ -35,13 +35,19 @@ init(ConnOpts) when is_map(ConnOpts) ->
-spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
new_conn(Conn, S) ->
process_flag(trap_exit, true),
{ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S),
receive
{Pid, stream_acceptor_ready} ->
ok = quicer:async_handshake(Conn),
{ok, S};
{'EXIT', Pid, _Reason} ->
{error, stream_accept_error}
case emqx_olp:is_overloaded() of
false ->
{ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S),
receive
{Pid, stream_acceptor_ready} ->
ok = quicer:async_handshake(Conn),
{ok, S};
{'EXIT', Pid, _Reason} ->
{error, stream_accept_error}
end;
true ->
emqx_metrics:inc('olp.new_conn'),
{error, overloaded}
end.
-spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.

View File

@ -123,6 +123,9 @@ roots(medium) ->
, {"force_shutdown",
sc(ref("force_shutdown"),
#{})}
, {"overload_protection",
sc(ref("overload_protection"),
#{})}
];
roots(low) ->
[ {"force_gc",
@ -324,7 +327,9 @@ fields("mqtt") ->
fields("zone") ->
Fields = ["mqtt", "stats", "flapping_detect", "force_shutdown",
"conn_congestion", "rate_limit", "quota", "force_gc"],
"conn_congestion", "rate_limit", "quota", "force_gc",
"overload_protection"
],
[{F, ref(emqx_zone_schema, F)} || F <- Fields];
fields("rate_limit") ->
@ -392,6 +397,35 @@ fields("force_shutdown") ->
})}
];
fields("overload_protection") ->
[ {"enable",
sc(boolean(),
#{ desc => "React on system overload or not"
, default => false
})}
, {"backoff_delay",
sc(range(0, inf),
#{ desc => "Some unimporant tasks could be delayed"
"for execution, here set the delays in ms"
, default => 1
})}
, {"backoff_gc",
sc(boolean(),
#{ desc => "Skip forceful GC if necessary"
, default => false
})}
, {"backoff_hibernation",
sc(boolean(),
#{ desc => "Skip process hibernation if necessary"
, default => true
})}
, {"backoff_new_conn",
sc(boolean(),
#{ desc => "Close new incoming connections if necessary"
, default => true
})}
];
fields("conn_congestion") ->
[ {"enable_alarm",
sc(boolean(),

View File

@ -0,0 +1,118 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_olp_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("lc/include/lc.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_, Config) ->
emqx_olp:enable(),
case wait_for(fun() -> lc_sup:whereis_runq_flagman() end, 10) of
true -> ok;
false ->
ct:fail("runq_flagman is not up")
end,
ok = load_ctl:put_config(#{ ?RUNQ_MON_F0 => true
, ?RUNQ_MON_F1 => 5
, ?RUNQ_MON_F2 => 1
, ?RUNQ_MON_T1 => 200
, ?RUNQ_MON_T2 => 50
, ?RUNQ_MON_C1 => 2
, ?RUNQ_MON_F5 => -1
}),
Config.
%% Test that olp could be enabled/disabled globally
t_disable_enable(_Config) ->
Old = load_ctl:whereis_runq_flagman(),
ok = emqx_olp:disable(),
?assert(not is_process_alive(Old)),
{ok, Pid} = emqx_olp:enable(),
timer:sleep(1000),
?assert(is_process_alive(Pid)).
%% Test that overload detection works
t_is_overloaded(_Config) ->
P = burst_runq(),
timer:sleep(3000),
?assert(emqx_olp:is_overloaded()),
exit(P, kill),
timer:sleep(3000),
?assert(not emqx_olp:is_overloaded()).
%% Test that new conn is rejected when olp is enabled
t_overloaded_conn(_Config) ->
process_flag(trap_exit, true),
?assert(erlang:is_process_alive(load_ctl:whereis_runq_flagman())),
emqx_config:put([overload_protection, enable], true),
P = burst_runq(),
timer:sleep(1000),
?assert(emqx_olp:is_overloaded()),
true = emqx:is_running(node()),
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
?assertNotMatch({ok, _Pid}, emqtt:connect(C)),
exit(P, kill).
%% Test that new conn is rejected when olp is enabled
t_overload_cooldown_conn(Config) ->
t_overloaded_conn(Config),
timer:sleep(1000),
?assert(not emqx_olp:is_overloaded()),
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
?assertMatch({ok, _Pid}, emqtt:connect(C)),
emqtt:stop(C).
-spec burst_runq() -> ParentToKill :: pid().
burst_runq() ->
NProc = erlang:system_info(schedulers_online),
spawn(?MODULE, worker_parent, [NProc * 10, {?MODULE, busy_loop, []}]).
%% internal helpers
worker_parent(N, {M, F, A}) ->
lists:foreach(fun(_) ->
proc_lib:spawn_link(fun() -> apply(M, F, A) end)
end, lists:seq(1, N)),
receive stop -> ok end.
busy_loop() ->
erlang:yield(),
busy_loop().
wait_for(_Fun, 0) ->
false;
wait_for(Fun, Retry) ->
case is_pid(Fun()) of
true ->
true;
false ->
timer:sleep(10),
wait_for(Fun, Retry - 1)
end.

View File

@ -38,6 +38,7 @@
, trace/1
, log/1
, authz/1
, olp/1
]).
-define(PROC_INFOKEYS, [status,
@ -495,6 +496,27 @@ authz(_) ->
{"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
]).
%%--------------------------------------------------------------------
%% @doc OLP (Overload Protection related)
olp(["status"]) ->
S = case emqx_olp:is_overloaded() of
true -> "overloaded";
false -> "not overloaded"
end,
emqx_ctl:print("~p is ~s ~n", [node(), S]);
olp(["disable"]) ->
Res = emqx_olp:disable(),
emqx_ctl:print("Disable overload protetion ~p : ~p ~n", [node(), Res]);
olp(["enable"]) ->
Res = emqx_olp:enable(),
emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]);
olp(_) ->
emqx_ctl:usage([{"olp status", "Return OLP status if system is overloaded"},
{"olp enable", "Enable overload protection"},
{"olp disable", "Disable overload protection"}
]).
%%--------------------------------------------------------------------
%% Dump ETS
%%--------------------------------------------------------------------

View File

@ -42,13 +42,14 @@
{post_hooks,[]}.
{deps,
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
[ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.1"}}}
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.9"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}