Merge pull request #10954 from zhongwencool/flapping-detect-conf
feat: don't hide enable in flapping detect conf
This commit is contained in:
commit
641aca00d8
|
@ -0,0 +1,35 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_config_zones).
|
||||||
|
|
||||||
|
-behaviour(emqx_config_handler).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([add_handler/0, remove_handler/0, pre_config_update/3]).
|
||||||
|
|
||||||
|
-define(ZONES, [zones]).
|
||||||
|
|
||||||
|
add_handler() ->
|
||||||
|
ok = emqx_config_handler:add_handler(?ZONES, ?MODULE),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
remove_handler() ->
|
||||||
|
ok = emqx_config_handler:remove_handler(?ZONES),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% replace the old config with the new config
|
||||||
|
pre_config_update(?ZONES, NewRaw, _OldRaw) ->
|
||||||
|
{ok, NewRaw}.
|
|
@ -1635,10 +1635,9 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Flapping
|
%% Flapping
|
||||||
|
|
||||||
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
count_flapping_event(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
|
||||||
is_integer(emqx_config:get_zone_conf(Zone, [flapping_detect, window_time])) andalso
|
_ = emqx_flapping:detect(ClientInfo),
|
||||||
emqx_flapping:detect(ClientInfo),
|
ok.
|
||||||
{ok, Channel}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Authenticate
|
%% Authenticate
|
||||||
|
|
|
@ -689,11 +689,13 @@ prune_backup_files(Path) ->
|
||||||
|
|
||||||
add_handlers() ->
|
add_handlers() ->
|
||||||
ok = emqx_config_logger:add_handler(),
|
ok = emqx_config_logger:add_handler(),
|
||||||
|
ok = emqx_config_zones:add_handler(),
|
||||||
emqx_sys_mon:add_handler(),
|
emqx_sys_mon:add_handler(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
remove_handlers() ->
|
remove_handlers() ->
|
||||||
ok = emqx_config_logger:remove_handler(),
|
ok = emqx_config_logger:remove_handler(),
|
||||||
|
ok = emqx_config_zones:remove_handler(),
|
||||||
emqx_sys_mon:remove_handler(),
|
emqx_sys_mon:remove_handler(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -759,7 +761,10 @@ do_put(Type, Putter, [], DeepValue) ->
|
||||||
do_put(Type, Putter, [RootName | KeyPath], DeepValue) ->
|
do_put(Type, Putter, [RootName | KeyPath], DeepValue) ->
|
||||||
OldValue = do_get(Type, [RootName], #{}),
|
OldValue = do_get(Type, [RootName], #{}),
|
||||||
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
|
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
|
||||||
persistent_term:put(?PERSIS_KEY(Type, RootName), NewValue).
|
Key = ?PERSIS_KEY(Type, RootName),
|
||||||
|
persistent_term:put(Key, NewValue),
|
||||||
|
put_config_post_change_actions(Key, NewValue),
|
||||||
|
ok.
|
||||||
|
|
||||||
do_deep_get(?CONF, AtomKeyPath, Map, Default) ->
|
do_deep_get(?CONF, AtomKeyPath, Map, Default) ->
|
||||||
emqx_utils_maps:deep_get(AtomKeyPath, Map, Default);
|
emqx_utils_maps:deep_get(AtomKeyPath, Map, Default);
|
||||||
|
@ -880,16 +885,14 @@ merge_with_global_defaults(GlobalDefaults, ZoneVal) ->
|
||||||
NewZoneVal :: map().
|
NewZoneVal :: map().
|
||||||
maybe_update_zone([zones | T], ZonesValue, Value) ->
|
maybe_update_zone([zones | T], ZonesValue, Value) ->
|
||||||
%% note, do not write to PT, return *New value* instead
|
%% note, do not write to PT, return *New value* instead
|
||||||
NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value),
|
|
||||||
ExistingZoneNames = maps:keys(?MODULE:get([zones], #{})),
|
|
||||||
%% Update only new zones with global defaults
|
|
||||||
GLD = zone_global_defaults(),
|
GLD = zone_global_defaults(),
|
||||||
maps:fold(
|
NewZonesValue0 = emqx_utils_maps:deep_put(T, ZonesValue, Value),
|
||||||
fun(ZoneName, ZoneValue, Acc) ->
|
NewZonesValue1 = emqx_utils_maps:deep_merge(#{default => GLD}, NewZonesValue0),
|
||||||
Acc#{ZoneName := merge_with_global_defaults(GLD, ZoneValue)}
|
maps:map(
|
||||||
|
fun(_ZoneName, ZoneValue) ->
|
||||||
|
merge_with_global_defaults(GLD, ZoneValue)
|
||||||
end,
|
end,
|
||||||
NewZonesValue,
|
NewZonesValue1
|
||||||
maps:without(ExistingZoneNames, NewZonesValue)
|
|
||||||
);
|
);
|
||||||
maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) ->
|
maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) ->
|
||||||
NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value),
|
NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value),
|
||||||
|
@ -963,3 +966,12 @@ rawconf_to_conf(SchemaModule, RawPath, RawValue) ->
|
||||||
),
|
),
|
||||||
AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}),
|
AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}),
|
||||||
emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues).
|
emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues).
|
||||||
|
|
||||||
|
%% When the global zone change, the zones is updated with the new global zone.
|
||||||
|
%% The global zone's keys is too many,
|
||||||
|
%% so we don't choose to write a global zone change emqx_config_handler callback to hook
|
||||||
|
put_config_post_change_actions(?PERSIS_KEY(?CONF, zones), _Zones) ->
|
||||||
|
emqx_flapping:update_config(),
|
||||||
|
ok;
|
||||||
|
put_config_post_change_actions(_Key, _NewValue) ->
|
||||||
|
ok.
|
||||||
|
|
|
@ -22,13 +22,13 @@
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
|
||||||
-export([start_link/0, stop/0]).
|
-export([start_link/0, update_config/0, stop/0]).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([detect/1]).
|
-export([detect/1]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([get_policy/2]).
|
-export([get_policy/1]).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -59,12 +59,17 @@
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
update_config() ->
|
||||||
|
gen_server:cast(?MODULE, update_config).
|
||||||
|
|
||||||
stop() -> gen_server:stop(?MODULE).
|
stop() -> gen_server:stop(?MODULE).
|
||||||
|
|
||||||
%% @doc Detect flapping when a MQTT client disconnected.
|
%% @doc Detect flapping when a MQTT client disconnected.
|
||||||
-spec detect(emqx_types:clientinfo()) -> boolean().
|
-spec detect(emqx_types:clientinfo()) -> boolean().
|
||||||
detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
|
detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
|
||||||
Policy = #{max_count := Threshold} = get_policy([max_count, window_time, ban_time], Zone),
|
detect(ClientId, PeerHost, get_policy(Zone)).
|
||||||
|
|
||||||
|
detect(ClientId, PeerHost, #{enable := true, max_count := Threshold} = Policy) ->
|
||||||
%% The initial flapping record sets the detect_cnt to 0.
|
%% The initial flapping record sets the detect_cnt to 0.
|
||||||
InitVal = #flapping{
|
InitVal = #flapping{
|
||||||
clientid = ClientId,
|
clientid = ClientId,
|
||||||
|
@ -82,24 +87,21 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
|
||||||
[] ->
|
[] ->
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
end.
|
end;
|
||||||
|
detect(_ClientId, _PeerHost, #{enable := false}) ->
|
||||||
|
false.
|
||||||
|
|
||||||
get_policy(Keys, Zone) when is_list(Keys) ->
|
get_policy(Zone) ->
|
||||||
RootKey = flapping_detect,
|
Flapping = [flapping_detect],
|
||||||
Conf = emqx_config:get_zone_conf(Zone, [RootKey]),
|
case emqx_config:get_zone_conf(Zone, Flapping, undefined) of
|
||||||
lists:foldl(
|
undefined ->
|
||||||
fun(Key, Acc) ->
|
%% If zone has be deleted at running time,
|
||||||
case maps:find(Key, Conf) of
|
%% we don't crash the connection and disable flapping detect.
|
||||||
{ok, V} -> Acc#{Key => V};
|
Policy = emqx_config:get(Flapping),
|
||||||
error -> Acc#{Key => emqx_config:get([RootKey, Key])}
|
Policy#{enable => false};
|
||||||
end
|
Policy ->
|
||||||
end,
|
Policy
|
||||||
#{},
|
end.
|
||||||
Keys
|
|
||||||
);
|
|
||||||
get_policy(Key, Zone) ->
|
|
||||||
#{Key := Conf} = get_policy([Key], Zone),
|
|
||||||
Conf.
|
|
||||||
|
|
||||||
now_diff(TS) -> erlang:system_time(millisecond) - TS.
|
now_diff(TS) -> erlang:system_time(millisecond) - TS.
|
||||||
|
|
||||||
|
@ -115,8 +117,8 @@ init([]) ->
|
||||||
{read_concurrency, true},
|
{read_concurrency, true},
|
||||||
{write_concurrency, true}
|
{write_concurrency, true}
|
||||||
]),
|
]),
|
||||||
start_timers(),
|
Timers = start_timers(),
|
||||||
{ok, #{}, hibernate}.
|
{ok, Timers, hibernate}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
|
@ -169,17 +171,20 @@ handle_cast(
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
handle_cast(update_config, State) ->
|
||||||
|
NState = update_timer(State),
|
||||||
|
{noreply, NState};
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
|
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
|
||||||
Timestamp =
|
Policy = #{window_time := WindowTime} = get_policy(Zone),
|
||||||
erlang:system_time(millisecond) - get_policy(window_time, Zone),
|
Timestamp = erlang:system_time(millisecond) - WindowTime,
|
||||||
MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}],
|
MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}],
|
||||||
ets:select_delete(?FLAPPING_TAB, MatchSpec),
|
ets:select_delete(?FLAPPING_TAB, MatchSpec),
|
||||||
_ = start_timer(Zone),
|
Timer = start_timer(Policy, Zone),
|
||||||
{noreply, State, hibernate};
|
{noreply, State#{Zone => Timer}, hibernate};
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -190,18 +195,30 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
start_timer(Zone) ->
|
start_timer(#{enable := true, window_time := WindowTime}, Zone) ->
|
||||||
case get_policy(window_time, Zone) of
|
|
||||||
WindowTime when is_integer(WindowTime) ->
|
|
||||||
emqx_utils:start_timer(WindowTime, {garbage_collect, Zone});
|
emqx_utils:start_timer(WindowTime, {garbage_collect, Zone});
|
||||||
disabled ->
|
start_timer(_Policy, _Zone) ->
|
||||||
ok
|
undefined.
|
||||||
end.
|
|
||||||
|
|
||||||
start_timers() ->
|
start_timers() ->
|
||||||
maps:foreach(
|
maps:map(
|
||||||
fun(Zone, _ZoneConf) ->
|
fun(ZoneName, #{flapping_detect := FlappingDetect}) ->
|
||||||
start_timer(Zone)
|
start_timer(FlappingDetect, ZoneName)
|
||||||
|
end,
|
||||||
|
emqx:get_config([zones], #{})
|
||||||
|
).
|
||||||
|
|
||||||
|
update_timer(Timers) ->
|
||||||
|
maps:map(
|
||||||
|
fun(ZoneName, #{flapping_detect := FlappingDetect = #{enable := Enable}}) ->
|
||||||
|
case maps:get(ZoneName, Timers, undefined) of
|
||||||
|
undefined ->
|
||||||
|
start_timer(FlappingDetect, ZoneName);
|
||||||
|
TRef when Enable -> TRef;
|
||||||
|
TRef ->
|
||||||
|
_ = erlang:cancel_timer(TRef),
|
||||||
|
undefined
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
emqx:get_config([zones], #{})
|
emqx:get_config([zones], #{})
|
||||||
).
|
).
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
-define(MAX_INT_TIMEOUT_MS, 4294967295).
|
-define(MAX_INT_TIMEOUT_MS, 4294967295).
|
||||||
%% floor(?MAX_INT_TIMEOUT_MS / 1000).
|
%% floor(?MAX_INT_TIMEOUT_MS / 1000).
|
||||||
-define(MAX_INT_TIMEOUT_S, 4294967).
|
-define(MAX_INT_TIMEOUT_S, 4294967).
|
||||||
|
-define(DEFAULT_WINDOW_TIME, <<"1m">>).
|
||||||
|
|
||||||
-type duration() :: integer().
|
-type duration() :: integer().
|
||||||
-type duration_s() :: integer().
|
-type duration_s() :: integer().
|
||||||
|
@ -277,7 +278,10 @@ roots(low) ->
|
||||||
{"flapping_detect",
|
{"flapping_detect",
|
||||||
sc(
|
sc(
|
||||||
ref("flapping_detect"),
|
ref("flapping_detect"),
|
||||||
#{importance => ?IMPORTANCE_HIDDEN}
|
#{
|
||||||
|
importance => ?IMPORTANCE_MEDIUM,
|
||||||
|
converter => fun flapping_detect_converter/2
|
||||||
|
}
|
||||||
)},
|
)},
|
||||||
{"persistent_session_store",
|
{"persistent_session_store",
|
||||||
sc(
|
sc(
|
||||||
|
@ -687,15 +691,14 @@ fields("flapping_detect") ->
|
||||||
boolean(),
|
boolean(),
|
||||||
#{
|
#{
|
||||||
default => false,
|
default => false,
|
||||||
deprecated => {since, "5.0.23"},
|
|
||||||
desc => ?DESC(flapping_detect_enable)
|
desc => ?DESC(flapping_detect_enable)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"window_time",
|
{"window_time",
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([disabled, duration()]),
|
duration(),
|
||||||
#{
|
#{
|
||||||
default => disabled,
|
default => ?DEFAULT_WINDOW_TIME,
|
||||||
importance => ?IMPORTANCE_HIGH,
|
importance => ?IMPORTANCE_HIGH,
|
||||||
desc => ?DESC(flapping_detect_window_time)
|
desc => ?DESC(flapping_detect_window_time)
|
||||||
}
|
}
|
||||||
|
@ -3549,3 +3552,9 @@ mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) ->
|
||||||
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
|
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
|
||||||
mqtt_converter(Mqtt, _Opts) ->
|
mqtt_converter(Mqtt, _Opts) ->
|
||||||
Mqtt.
|
Mqtt.
|
||||||
|
|
||||||
|
%% For backward compatibility with window_time is disable
|
||||||
|
flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) ->
|
||||||
|
Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false};
|
||||||
|
flapping_detect_converter(Conf, _Opts) ->
|
||||||
|
Conf.
|
||||||
|
|
|
@ -58,8 +58,7 @@ hidden() ->
|
||||||
[
|
[
|
||||||
"stats",
|
"stats",
|
||||||
"overload_protection",
|
"overload_protection",
|
||||||
"conn_congestion",
|
"conn_congestion"
|
||||||
"flapping_detect"
|
|
||||||
].
|
].
|
||||||
|
|
||||||
%% zone schemas are clones from the same name from root level
|
%% zone schemas are clones from the same name from root level
|
||||||
|
|
|
@ -403,7 +403,7 @@ zone_global_defaults() ->
|
||||||
conn_congestion =>
|
conn_congestion =>
|
||||||
#{enable_alarm => true, min_alarm_sustain_duration => 60000},
|
#{enable_alarm => true, min_alarm_sustain_duration => 60000},
|
||||||
flapping_detect =>
|
flapping_detect =>
|
||||||
#{ban_time => 300000, max_count => 15, window_time => disabled},
|
#{ban_time => 300000, max_count => 15, window_time => 60000, enable => false},
|
||||||
force_gc =>
|
force_gc =>
|
||||||
#{bytes => 16777216, count => 16000, enable => true},
|
#{bytes => 16777216, count => 16000, enable => true},
|
||||||
force_shutdown =>
|
force_shutdown =>
|
||||||
|
|
|
@ -26,15 +26,16 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
emqx_common_test_helpers:start_apps([]),
|
emqx_common_test_helpers:start_apps([]),
|
||||||
emqx_config:put_zone_conf(
|
%% update global default config
|
||||||
default,
|
{ok, _} = emqx:update_config(
|
||||||
[flapping_detect],
|
[flapping_detect],
|
||||||
#{
|
#{
|
||||||
max_count => 3,
|
<<"enable">> => true,
|
||||||
|
<<"max_count">> => 3,
|
||||||
% 0.1s
|
% 0.1s
|
||||||
window_time => 100,
|
<<"window_time">> => 100,
|
||||||
%% 2s
|
%% 2s
|
||||||
ban_time => 2000
|
<<"ban_time">> => "2s"
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
Config.
|
Config.
|
||||||
|
@ -102,20 +103,73 @@ t_expired_detecting(_) ->
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
t_conf_without_window_time(_) ->
|
t_conf_update(_) ->
|
||||||
%% enable is deprecated, so we need to make sure it won't be used.
|
|
||||||
Global = emqx_config:get([flapping_detect]),
|
Global = emqx_config:get([flapping_detect]),
|
||||||
?assertNot(maps:is_key(enable, Global)),
|
#{
|
||||||
%% zones don't have default value, so we need to make sure fallback to global conf.
|
ban_time := _BanTime,
|
||||||
%% this new_zone will fallback to global conf.
|
enable := _Enable,
|
||||||
|
max_count := _MaxCount,
|
||||||
|
window_time := _WindowTime
|
||||||
|
} = Global,
|
||||||
|
|
||||||
emqx_config:put_zone_conf(new_zone, [flapping_detect], #{}),
|
emqx_config:put_zone_conf(new_zone, [flapping_detect], #{}),
|
||||||
?assertEqual(Global, get_policy(new_zone)),
|
?assertEqual(Global, get_policy(new_zone)),
|
||||||
|
|
||||||
emqx_config:put_zone_conf(new_zone_1, [flapping_detect], #{window_time => 100}),
|
emqx_config:put_zone_conf(zone_1, [flapping_detect], #{window_time => 100}),
|
||||||
?assertEqual(100, emqx_flapping:get_policy(window_time, new_zone_1)),
|
?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)),
|
||||||
?assertEqual(maps:get(ban_time, Global), emqx_flapping:get_policy(ban_time, new_zone_1)),
|
|
||||||
?assertEqual(maps:get(max_count, Global), emqx_flapping:get_policy(max_count, new_zone_1)),
|
Zones = #{
|
||||||
|
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}},
|
||||||
|
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}}
|
||||||
|
},
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([zones], Zones)),
|
||||||
|
%% new_zone is already deleted
|
||||||
|
?assertError({config_not_found, _}, get_policy(new_zone)),
|
||||||
|
%% update zone(zone_1) has default.
|
||||||
|
?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)),
|
||||||
|
%% create zone(zone_2) has default
|
||||||
|
?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)),
|
||||||
|
%% reset to default(empty) andalso get default from global
|
||||||
|
?assertMatch({ok, _}, emqx:update_config([zones], #{})),
|
||||||
|
?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])),
|
||||||
|
?assertError({config_not_found, _}, get_policy(zone_1)),
|
||||||
|
?assertError({config_not_found, _}, get_policy(zone_2)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_conf_update_timer(_Config) ->
|
||||||
|
_ = emqx_flapping:start_link(),
|
||||||
|
validate_timer([default]),
|
||||||
|
{ok, _} =
|
||||||
|
emqx:update_config([zones], #{
|
||||||
|
<<"timer_1">> => #{<<"flapping_detect">> => #{<<"enable">> => true}},
|
||||||
|
<<"timer_2">> => #{<<"flapping_detect">> => #{<<"enable">> => true}},
|
||||||
|
<<"timer_3">> => #{<<"flapping_detect">> => #{<<"enable">> => false}}
|
||||||
|
}),
|
||||||
|
validate_timer([timer_1, timer_2, timer_3, default]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
validate_timer(Names) ->
|
||||||
|
Zones = emqx:get_config([zones]),
|
||||||
|
?assertEqual(lists:sort(Names), lists:sort(maps:keys(Zones))),
|
||||||
|
Timers = sys:get_state(emqx_flapping),
|
||||||
|
maps:foreach(
|
||||||
|
fun(Name, #{flapping_detect := #{enable := Enable}}) ->
|
||||||
|
?assertEqual(Enable, is_reference(maps:get(Name, Timers)), Timers)
|
||||||
|
end,
|
||||||
|
Zones
|
||||||
|
),
|
||||||
|
?assertEqual(maps:keys(Zones), maps:keys(Timers)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_window_compatibility_check(_Conf) ->
|
||||||
|
Flapping = emqx:get_config([flapping_detect]),
|
||||||
|
ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>),
|
||||||
|
?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])),
|
||||||
|
%% reset
|
||||||
|
FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]),
|
||||||
|
ok = emqx_config:init_load(emqx_schema, FlappingBin),
|
||||||
|
?assertEqual(Flapping, emqx:get_config([flapping_detect])),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
get_policy(Zone) ->
|
get_policy(Zone) ->
|
||||||
emqx_flapping:get_policy([window_time, ban_time, max_count], Zone).
|
emqx_config:get_zone_conf(Zone, [flapping_detect]).
|
||||||
|
|
|
@ -209,7 +209,7 @@ t_zones(_Config) ->
|
||||||
?assertEqual(Mqtt1, NewMqtt),
|
?assertEqual(Mqtt1, NewMqtt),
|
||||||
%% delete the new zones
|
%% delete the new zones
|
||||||
{ok, #{}} = update_config("zones", Zones),
|
{ok, #{}} = update_config("zones", Zones),
|
||||||
?assertEqual(undefined, emqx_config:get_raw([new_zone, mqtt], undefined)),
|
?assertEqual(undefined, emqx_config:get_raw([zones, new_zone], undefined)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_dashboard(_Config) ->
|
t_dashboard(_Config) ->
|
||||||
|
|
Loading…
Reference in New Issue