feat: hide overload_protection,conn_congestion,flapping_detect

This commit is contained in:
Zhongwen Deng 2023-04-12 14:39:59 +08:00
parent e5b85916b6
commit 7934a1cea1
6 changed files with 86 additions and 41 deletions

View File

@ -1630,7 +1630,7 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
%% Flapping
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
is_integer(emqx_config:get_zone_conf(Zone, [flapping_detect, window_time])) andalso
emqx_flapping:detect(ClientInfo),
{ok, Channel}.

View File

@ -27,6 +27,10 @@
%% API
-export([detect/1]).
-ifdef(TEST).
-export([get_policy/2]).
-endif.
%% gen_server callbacks
-export([
init/1,
@ -39,15 +43,6 @@
%% Tab
-define(FLAPPING_TAB, ?MODULE).
%% Default Policy
-define(FLAPPING_THRESHOLD, 30).
-define(FLAPPING_DURATION, 60000).
-define(FLAPPING_BANNED_INTERVAL, 300000).
-define(DEFAULT_DETECT_POLICY, #{
max_count => ?FLAPPING_THRESHOLD,
window_time => ?FLAPPING_DURATION,
ban_time => ?FLAPPING_BANNED_INTERVAL
}).
-record(flapping, {
clientid :: emqx_types:clientid(),
@ -69,7 +64,7 @@ stop() -> gen_server:stop(?MODULE).
%% @doc Detect flapping when a MQTT client disconnected.
-spec detect(emqx_types:clientinfo()) -> boolean().
detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
Policy = #{max_count := Threshold} = get_policy(Zone),
Policy = #{max_count := Threshold} = get_policy([max_count, window_time, ban_time], Zone),
%% The initial flapping record sets the detect_cnt to 0.
InitVal = #flapping{
clientid = ClientId,
@ -89,8 +84,22 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
end
end.
get_policy(Zone) ->
emqx_config:get_zone_conf(Zone, [flapping_detect]).
get_policy(Keys, Zone) when is_list(Keys) ->
RootKey = flapping_detect,
Conf = emqx_config:get_zone_conf(Zone, [RootKey]),
lists:foldl(
fun(Key, Acc) ->
case maps:find(Key, Conf) of
{ok, V} -> Acc#{Key => V};
error -> Acc#{Key => emqx_config:get([RootKey, Key])}
end
end,
#{},
Keys
);
get_policy(Key, Zone) ->
#{Key := Conf} = get_policy([Key], Zone),
Conf.
now_diff(TS) -> erlang:system_time(millisecond) - TS.
@ -166,8 +175,7 @@ handle_cast(Msg, State) ->
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
Timestamp =
erlang:system_time(millisecond) -
maps:get(window_time, get_policy(Zone)),
erlang:system_time(millisecond) - get_policy(window_time, Zone),
MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}],
ets:select_delete(?FLAPPING_TAB, MatchSpec),
_ = start_timer(Zone),
@ -183,15 +191,19 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
start_timer(Zone) ->
WindTime = maps:get(window_time, get_policy(Zone)),
emqx_misc:start_timer(WindTime, {garbage_collect, Zone}).
case get_policy(window_time, Zone) of
WindowTime when is_integer(WindowTime) ->
emqx_misc:start_timer(WindowTime, {garbage_collect, Zone});
disabled ->
ok
end.
start_timers() ->
lists:foreach(
fun({Zone, _ZoneConf}) ->
maps:foreach(
fun(Zone, _ZoneConf) ->
start_timer(Zone)
end,
maps:to_list(emqx:get_config([zones], #{}))
emqx:get_config([zones], #{})
).
fmt_host(PeerHost) ->

View File

@ -145,17 +145,23 @@ roots(high) ->
{"listeners",
sc(
ref("listeners"),
#{}
)},
{"zones",
sc(
map("name", ref("zone")),
#{desc => ?DESC(zones)}
#{importance => ?IMPORTANCE_HIGH}
)},
{"mqtt",
sc(
ref("mqtt"),
#{desc => ?DESC(mqtt)}
#{
desc => ?DESC(mqtt),
importance => ?IMPORTANCE_MEDIUM
}
)},
{"zones",
sc(
map("name", ref("zone")),
#{
desc => ?DESC(zones),
importance => ?IMPORTANCE_LOW
}
)},
{?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication(global)},
%% NOTE: authorization schema here is only to keep emqx app prue
@ -199,7 +205,9 @@ roots(low) ->
{"conn_congestion",
sc(
ref("conn_congestion"),
#{}
#{
importance => ?IMPORTANCE_HIDDEN
}
)},
{"stats",
sc(
@ -221,7 +229,7 @@ roots(low) ->
{"flapping_detect",
sc(
ref("flapping_detect"),
#{}
#{importance => ?IMPORTANCE_HIDDEN}
)},
{"persistent_session_store",
sc(
@ -620,25 +628,27 @@ fields("flapping_detect") ->
boolean(),
#{
default => false,
deprecated => {since, "5.0.22"},
desc => ?DESC(flapping_detect_enable)
}
)},
{"max_count",
sc(
integer(),
#{
default => 15,
desc => ?DESC(flapping_detect_max_count)
}
)},
{"window_time",
sc(
duration(),
hoconsc:union([disabled, duration()]),
#{
default => <<"1m">>,
default => disabled,
importance => ?IMPORTANCE_HIGH,
desc => ?DESC(flapping_detect_window_time)
}
)},
{"max_count",
sc(
non_neg_integer(),
#{
default => 15,
desc => ?DESC(flapping_detect_max_count)
}
)},
{"ban_time",
sc(
duration(),

View File

@ -55,7 +55,10 @@ zone_without_hidden() ->
hidden() ->
[
"stats"
"stats",
"overload_protection",
"conn_congestion",
"flapping_detect"
].
%% zone schemas are clones from the same name from root level

View File

@ -101,3 +101,21 @@ t_expired_detecting(_) ->
ets:tab2list(emqx_flapping)
)
).
t_conf_without_window_time(_) ->
%% enable is deprecated, so we need to make sure it won't be used.
Global = emqx_config:get([flapping_detect]),
?assertNot(maps:is_key(enable, Global)),
%% zones don't have default value, so we need to make sure fallback to global conf.
%% this new_zone will fallback to global conf.
emqx_config:put_zone_conf(new_zone, [flapping_detect], #{}),
?assertEqual(Global, get_policy(new_zone)),
emqx_config:put_zone_conf(new_zone_1, [flapping_detect], #{window_time => 100}),
?assertEqual(100, emqx_flapping:get_policy(window_time, new_zone_1)),
?assertEqual(maps:get(ban_time, Global), emqx_flapping:get_policy(ban_time, new_zone_1)),
?assertEqual(maps:get(max_count, Global), emqx_flapping:get_policy(max_count, new_zone_1)),
ok.
get_policy(Zone) ->
emqx_flapping:get_policy([window_time, ban_time, max_count], Zone).

View File

@ -121,7 +121,9 @@ t_log(_Config) ->
t_global_zone(_Config) ->
{ok, Zones} = get_global_zone(),
ZonesKeys = lists:map(fun({K, _}) -> K end, hocon_schema:roots(emqx_zone_schema)),
ZonesKeys = lists:map(
fun({K, _}) -> list_to_binary(K) end, emqx_zone_schema:zone_without_hidden()
),
?assertEqual(lists:usort(ZonesKeys), lists:usort(maps:keys(Zones))),
?assertEqual(
emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]),