From 48381d4c8608acbbe446d585b198dbffcb456ac0 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 6 Jun 2023 17:56:36 +0800 Subject: [PATCH 1/7] feat: refactor flapping detect conf --- apps/emqx/src/emqx_channel.erl | 7 +-- apps/emqx/src/emqx_config.erl | 24 +++++--- apps/emqx/src/emqx_flapping.erl | 77 ++++++++++++++------------ apps/emqx/src/emqx_schema.erl | 7 +-- apps/emqx/src/emqx_zone_schema.erl | 3 +- apps/emqx/test/emqx_flapping_SUITE.erl | 20 ++++--- 6 files changed, 75 insertions(+), 63 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 5637bb171..a7349a436 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1632,10 +1632,9 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Flapping -count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> - is_integer(emqx_config:get_zone_conf(Zone, [flapping_detect, window_time])) andalso - emqx_flapping:detect(ClientInfo), - {ok, Channel}. +count_flapping_event(_ConnPkt, #channel{clientinfo = ClientInfo}) -> + _ = emqx_flapping:detect(ClientInfo), + ok. %%-------------------------------------------------------------------- %% Authenticate diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 080172c7b..19963af50 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -707,7 +707,10 @@ do_put(Type, Putter, [], DeepValue) -> do_put(Type, Putter, [RootName | KeyPath], DeepValue) -> OldValue = do_get(Type, [RootName], #{}), NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue), - persistent_term:put(?PERSIS_KEY(Type, RootName), NewValue). + Key = ?PERSIS_KEY(Type, RootName), + persistent_term:put(Key, NewValue), + post_save_config_hook(Key, NewValue), + ok. do_deep_get(?CONF, AtomKeyPath, Map, Default) -> emqx_utils_maps:deep_get(AtomKeyPath, Map, Default); @@ -829,15 +832,12 @@ merge_with_global_defaults(GlobalDefaults, ZoneVal) -> maybe_update_zone([zones | T], ZonesValue, Value) -> %% note, do not write to PT, return *New value* instead NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value), - ExistingZoneNames = maps:keys(?MODULE:get([zones], #{})), - %% Update only new zones with global defaults GLD = zone_global_defaults(), - maps:fold( - fun(ZoneName, ZoneValue, Acc) -> - Acc#{ZoneName := merge_with_global_defaults(GLD, ZoneValue)} + maps:map( + fun(_ZoneName, ZoneValue) -> + merge_with_global_defaults(GLD, ZoneValue) end, - NewZonesValue, - maps:without(ExistingZoneNames, NewZonesValue) + NewZonesValue ); maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) -> NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value), @@ -911,3 +911,11 @@ rawconf_to_conf(SchemaModule, RawPath, RawValue) -> ), AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}), emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues). + +%% When the global zone change, the zones is updated with the new global zone. +%% The zones config has no config_handler callback, so we need to update via this hook +post_save_config_hook(?PERSIS_KEY(?CONF, zones), _Zones) -> + emqx_flapping:update_config(), + ok; +post_save_config_hook(_Key, _NewValue) -> + ok. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 70b1a3232..02a9858c8 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -22,13 +22,13 @@ -include("types.hrl"). -include("logger.hrl"). --export([start_link/0, stop/0]). +-export([start_link/0, update_config/0, stop/0]). %% API -export([detect/1]). -ifdef(TEST). --export([get_policy/2]). +-export([get_policy/1]). -endif. %% gen_server callbacks @@ -59,12 +59,17 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +update_config() -> + gen_server:cast(?MODULE, update_config). + stop() -> gen_server:stop(?MODULE). %% @doc Detect flapping when a MQTT client disconnected. -spec detect(emqx_types:clientinfo()) -> boolean(). detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) -> - Policy = #{max_count := Threshold} = get_policy([max_count, window_time, ban_time], Zone), + detect(ClientId, PeerHost, get_policy(Zone)). + +detect(ClientId, PeerHost, #{enable := true, max_count := Threshold} = Policy) -> %% The initial flapping record sets the detect_cnt to 0. InitVal = #flapping{ clientid = ClientId, @@ -82,24 +87,12 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) -> [] -> false end - end. + end; +detect(_ClientId, _PeerHost, #{enable := false}) -> + false. -get_policy(Keys, Zone) when is_list(Keys) -> - RootKey = flapping_detect, - Conf = emqx_config:get_zone_conf(Zone, [RootKey]), - lists:foldl( - fun(Key, Acc) -> - case maps:find(Key, Conf) of - {ok, V} -> Acc#{Key => V}; - error -> Acc#{Key => emqx_config:get([RootKey, Key])} - end - end, - #{}, - Keys - ); -get_policy(Key, Zone) -> - #{Key := Conf} = get_policy([Key], Zone), - Conf. +get_policy(Zone) -> + emqx_config:get_zone_conf(Zone, [flapping_detect]). now_diff(TS) -> erlang:system_time(millisecond) - TS. @@ -115,8 +108,8 @@ init([]) -> {read_concurrency, true}, {write_concurrency, true} ]), - start_timers(), - {ok, #{}, hibernate}. + Timers = start_timers(), + {ok, Timers, hibernate}. handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -169,17 +162,20 @@ handle_cast( ) end, {noreply, State}; +handle_cast(update_config, State) -> + NState = update_timer(State), + {noreply, NState}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> - Timestamp = - erlang:system_time(millisecond) - get_policy(window_time, Zone), + Policy = #{window_time := WindowTime} = get_policy(Zone), + Timestamp = erlang:system_time(millisecond) - WindowTime, MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}], ets:select_delete(?FLAPPING_TAB, MatchSpec), - _ = start_timer(Zone), - {noreply, State, hibernate}; + Timer = start_timer(Policy, Zone), + {noreply, State#{Zone => Timer}, hibernate}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -190,18 +186,27 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -start_timer(Zone) -> - case get_policy(window_time, Zone) of - WindowTime when is_integer(WindowTime) -> - emqx_utils:start_timer(WindowTime, {garbage_collect, Zone}); - disabled -> - ok - end. +start_timer(#{enable := true, window_time := WindowTime}, Zone) -> + emqx_utils:start_timer(WindowTime, {garbage_collect, Zone}); +start_timer(_Policy, _Zone) -> + undefined. start_timers() -> - maps:foreach( - fun(Zone, _ZoneConf) -> - start_timer(Zone) + maps:map( + fun(ZoneName, #{flapping_detect := FlappingDetect}) -> + start_timer(FlappingDetect, ZoneName) + end, + emqx:get_config([zones], #{}) + ). + +update_timer(Timers) -> + maps:map( + fun(ZoneName, #{flapping_detect := FlappingDetect}) -> + case maps:get(ZoneName, Timers, undefined) of + undefined -> start_timer(FlappingDetect, ZoneName); + %% Don't reset this timer, it will be updated after next timeout. + TRef -> TRef + end end, emqx:get_config([zones], #{}) ). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 521293f7a..4929c2023 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -275,7 +275,7 @@ roots(low) -> {"flapping_detect", sc( ref("flapping_detect"), - #{importance => ?IMPORTANCE_HIDDEN} + #{importance => ?DEFAULT_IMPORTANCE} )}, {"persistent_session_store", sc( @@ -685,15 +685,14 @@ fields("flapping_detect") -> boolean(), #{ default => false, - deprecated => {since, "5.0.23"}, desc => ?DESC(flapping_detect_enable) } )}, {"window_time", sc( - hoconsc:union([disabled, duration()]), + duration(), #{ - default => disabled, + default => "1m", importance => ?IMPORTANCE_HIGH, desc => ?DESC(flapping_detect_window_time) } diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index ccef4e54b..695a4aa4c 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -58,8 +58,7 @@ hidden() -> [ "stats", "overload_protection", - "conn_congestion", - "flapping_detect" + "conn_congestion" ]. %% zone schemas are clones from the same name from root level diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 877f05995..3303ec358 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -30,6 +30,7 @@ init_per_suite(Config) -> default, [flapping_detect], #{ + enable => true, max_count => 3, % 0.1s window_time => 100, @@ -102,20 +103,21 @@ t_expired_detecting(_) -> ) ). -t_conf_without_window_time(_) -> - %% enable is deprecated, so we need to make sure it won't be used. +t_conf_update(_) -> Global = emqx_config:get([flapping_detect]), - ?assertNot(maps:is_key(enable, Global)), - %% zones don't have default value, so we need to make sure fallback to global conf. - %% this new_zone will fallback to global conf. + #{ + ban_time := _BanTime, + enable := _Enable, + max_count := _MaxCount, + window_time := _WindowTime + } = Global, + emqx_config:put_zone_conf(new_zone, [flapping_detect], #{}), ?assertEqual(Global, get_policy(new_zone)), emqx_config:put_zone_conf(new_zone_1, [flapping_detect], #{window_time => 100}), - ?assertEqual(100, emqx_flapping:get_policy(window_time, new_zone_1)), - ?assertEqual(maps:get(ban_time, Global), emqx_flapping:get_policy(ban_time, new_zone_1)), - ?assertEqual(maps:get(max_count, Global), emqx_flapping:get_policy(max_count, new_zone_1)), + ?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(new_zone_1)), ok. get_policy(Zone) -> - emqx_flapping:get_policy([window_time, ban_time, max_count], Zone). + emqx_flapping:get_policy(Zone). From d58506a7c3596faf2b402412300bbdb4ec59af6b Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 6 Jun 2023 22:25:54 +0800 Subject: [PATCH 2/7] feat: always update default zones --- apps/emqx/src/emqx_config.erl | 5 ++-- apps/emqx/test/emqx_config_SUITE.erl | 2 +- apps/emqx/test/emqx_flapping_SUITE.erl | 32 +++++++++++++++++++------- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 19963af50..f1bb2f5dd 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -831,13 +831,14 @@ merge_with_global_defaults(GlobalDefaults, ZoneVal) -> NewZoneVal :: map(). maybe_update_zone([zones | T], ZonesValue, Value) -> %% note, do not write to PT, return *New value* instead - NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value), GLD = zone_global_defaults(), + NewZonesValue0 = emqx_utils_maps:deep_put(T, ZonesValue, Value), + NewZonesValue1 = emqx_utils_maps:deep_merge(#{default => GLD}, NewZonesValue0), maps:map( fun(_ZoneName, ZoneValue) -> merge_with_global_defaults(GLD, ZoneValue) end, - NewZonesValue + NewZonesValue1 ); maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) -> NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value), diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 050a4f22c..bee603dff 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -344,7 +344,7 @@ zone_global_defaults() -> conn_congestion => #{enable_alarm => true, min_alarm_sustain_duration => 60000}, flapping_detect => - #{ban_time => 300000, max_count => 15, window_time => disabled}, + #{ban_time => 300000, max_count => 15, window_time => 60000, enable => false}, force_gc => #{bytes => 16777216, count => 16000, enable => true}, force_shutdown => diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 3303ec358..34d5fd89a 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -26,16 +26,16 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), - emqx_config:put_zone_conf( - default, + %% update global default config + {ok, _} = emqx:update_config( [flapping_detect], #{ - enable => true, - max_count => 3, + <<"enable">> => true, + <<"max_count">> => 3, % 0.1s - window_time => 100, + <<"window_time">> => 100, %% 2s - ban_time => 2000 + <<"ban_time">> => "2s" } ), Config. @@ -53,6 +53,7 @@ t_detect_check(_) -> clientid => <<"client007">>, peerhost => {127, 0, 0, 1} }, + ct:pal("www:~p~n", [emqx_flapping:get_policy(default)]), false = emqx_flapping:detect(ClientInfo), false = emqx_banned:check(ClientInfo), false = emqx_flapping:detect(ClientInfo), @@ -115,8 +116,23 @@ t_conf_update(_) -> emqx_config:put_zone_conf(new_zone, [flapping_detect], #{}), ?assertEqual(Global, get_policy(new_zone)), - emqx_config:put_zone_conf(new_zone_1, [flapping_detect], #{window_time => 100}), - ?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(new_zone_1)), + emqx_config:put_zone_conf(zone_1, [flapping_detect], #{window_time => 100}), + ?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)), + + Zones = #{ + <<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}}, + <<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}} + }, + ?assertMatch({ok, _}, emqx:update_config([zones], Zones)), + %% new_zone is already deleted + ?assertError({config_not_found, _}, get_policy(new_zone)), + %% update zone(zone_1) has default. + ?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)), + %% create zone(zone_2) has default + ?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)), + %% reset to default(empty) andalso get default from global + ?assertMatch({ok, _}, emqx:update_config([zones], #{})), + ?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])), ok. get_policy(Zone) -> From 535870386a636531a1bc92c96e82c3f3eceb2c2a Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 7 Jun 2023 09:20:41 +0800 Subject: [PATCH 3/7] fix: zones update api failed --- apps/emqx/src/config/emqx_config_zones.erl | 35 +++++++++++++++++++ apps/emqx/src/emqx_config.erl | 5 ++- apps/emqx/src/emqx_flapping.erl | 20 ++++++++--- apps/emqx/test/emqx_flapping_SUITE.erl | 30 ++++++++++++++-- .../test/emqx_mgmt_api_configs_SUITE.erl | 2 +- 5 files changed, 83 insertions(+), 9 deletions(-) create mode 100644 apps/emqx/src/config/emqx_config_zones.erl diff --git a/apps/emqx/src/config/emqx_config_zones.erl b/apps/emqx/src/config/emqx_config_zones.erl new file mode 100644 index 000000000..57e2824ff --- /dev/null +++ b/apps/emqx/src/config/emqx_config_zones.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_config_zones). + +-behaviour(emqx_config_handler). + +%% API +-export([add_handler/0, remove_handler/0, pre_config_update/3]). + +-define(ZONES, [zones]). + +add_handler() -> + ok = emqx_config_handler:add_handler(?ZONES, ?MODULE), + ok. + +remove_handler() -> + ok = emqx_config_handler:remove_handler(?ZONES), + ok. + +%% replace the old config with the new config +pre_config_update(?ZONES, NewRaw, _OldRaw) -> + {ok, NewRaw}. diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index f1bb2f5dd..d12b52639 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -637,11 +637,13 @@ save_to_override_conf(false, RawConf, _Opts) -> add_handlers() -> ok = emqx_config_logger:add_handler(), + ok = emqx_config_zones:add_handler(), emqx_sys_mon:add_handler(), ok. remove_handlers() -> ok = emqx_config_logger:remove_handler(), + ok = emqx_config_zones:remove_handler(), emqx_sys_mon:remove_handler(), ok. @@ -914,7 +916,8 @@ rawconf_to_conf(SchemaModule, RawPath, RawValue) -> emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues). %% When the global zone change, the zones is updated with the new global zone. -%% The zones config has no config_handler callback, so we need to update via this hook +%% The global zone's keys is too many, +%% so we don't choose to write a global zone change emqx_config_handler callback to hook post_save_config_hook(?PERSIS_KEY(?CONF, zones), _Zones) -> emqx_flapping:update_config(), ok; diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 02a9858c8..ef451ae0d 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -50,6 +50,12 @@ started_at :: pos_integer(), detect_cnt :: integer() }). +-define(DEFAULT_POLICY, #{ + enable => false, + max_count => 15, + window_time => 60000, + ban_time => 5 * 6000 +}). -opaque flapping() :: #flapping{}. @@ -91,8 +97,9 @@ detect(ClientId, PeerHost, #{enable := true, max_count := Threshold} = Policy) - detect(_ClientId, _PeerHost, #{enable := false}) -> false. +%% with default, if we delete Zone at running time. we should not crash. get_policy(Zone) -> - emqx_config:get_zone_conf(Zone, [flapping_detect]). + emqx_config:get_zone_conf(Zone, [flapping_detect], ?DEFAULT_POLICY). now_diff(TS) -> erlang:system_time(millisecond) - TS. @@ -201,11 +208,14 @@ start_timers() -> update_timer(Timers) -> maps:map( - fun(ZoneName, #{flapping_detect := FlappingDetect}) -> + fun(ZoneName, #{flapping_detect := FlappingDetect = #{enable := Enable}}) -> case maps:get(ZoneName, Timers, undefined) of - undefined -> start_timer(FlappingDetect, ZoneName); - %% Don't reset this timer, it will be updated after next timeout. - TRef -> TRef + undefined -> + start_timer(FlappingDetect, ZoneName); + TRef when Enable -> TRef; + TRef -> + erlang:cancel_timer(TRef), + undefined end end, emqx:get_config([zones], #{}) diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 34d5fd89a..d377eba4a 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -53,7 +53,6 @@ t_detect_check(_) -> clientid => <<"client007">>, peerhost => {127, 0, 0, 1} }, - ct:pal("www:~p~n", [emqx_flapping:get_policy(default)]), false = emqx_flapping:detect(ClientInfo), false = emqx_banned:check(ClientInfo), false = emqx_flapping:detect(ClientInfo), @@ -133,7 +132,34 @@ t_conf_update(_) -> %% reset to default(empty) andalso get default from global ?assertMatch({ok, _}, emqx:update_config([zones], #{})), ?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])), + ?assertError({config_not_found, _}, get_policy(zone_1)), + ?assertError({config_not_found, _}, get_policy(zone_2)), + ok. + +t_conf_update_timer(_Config) -> + _ = emqx_flapping:start_link(), + validate_timer([default]), + {ok, _} = + emqx:update_config([zones], #{ + <<"timer_1">> => #{<<"flapping_detect">> => #{<<"enable">> => true}}, + <<"timer_2">> => #{<<"flapping_detect">> => #{<<"enable">> => true}}, + <<"timer_3">> => #{<<"flapping_detect">> => #{<<"enable">> => false}} + }), + validate_timer([timer_1, timer_2, timer_3, default]), + ok. + +validate_timer(Names) -> + Zones = emqx:get_config([zones]), + ?assertEqual(lists:sort(Names), lists:sort(maps:keys(Zones))), + Timers = sys:get_state(emqx_flapping), + maps:foreach( + fun(Name, #{flapping_detect := #{enable := Enable}}) -> + ?assertEqual(Enable, is_reference(maps:get(Name, Timers)), Timers) + end, + Zones + ), + ?assertEqual(maps:keys(Zones), maps:keys(Timers)), ok. get_policy(Zone) -> - emqx_flapping:get_policy(Zone). + emqx_config:get_zone_conf(Zone, [flapping_detect]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 5fca80db2..735db7ebb 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -209,7 +209,7 @@ t_zones(_Config) -> ?assertEqual(Mqtt1, NewMqtt), %% delete the new zones {ok, #{}} = update_config("zones", Zones), - ?assertEqual(undefined, emqx_config:get_raw([new_zone, mqtt], undefined)), + ?assertEqual(undefined, emqx_config:get_raw([zones, new_zone], undefined)), ok. t_dashboard(_Config) -> From 1d33b7dbd85ccfa4f82a847f2cae788c56d90d9f Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 7 Jun 2023 19:02:01 +0800 Subject: [PATCH 4/7] chore: make static check happy --- apps/emqx/src/emqx_flapping.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index ef451ae0d..129fee012 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -214,7 +214,7 @@ update_timer(Timers) -> start_timer(FlappingDetect, ZoneName); TRef when Enable -> TRef; TRef -> - erlang:cancel_timer(TRef), + _ = erlang:cancel_timer(TRef), undefined end end, From 5659cc2b47f33a6d37d6ec392409b82ecfc9f2d7 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 7 Jun 2023 21:54:22 +0800 Subject: [PATCH 5/7] feat: add converter to flapping_detect --- apps/emqx/src/emqx_config.erl | 6 +++--- apps/emqx/src/emqx_flapping.erl | 12 ++++++++++-- apps/emqx/src/emqx_schema.erl | 14 ++++++++++++-- apps/emqx/test/emqx_flapping_SUITE.erl | 10 ++++++++++ 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index d12b52639..fa2118f99 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -711,7 +711,7 @@ do_put(Type, Putter, [RootName | KeyPath], DeepValue) -> NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue), Key = ?PERSIS_KEY(Type, RootName), persistent_term:put(Key, NewValue), - post_save_config_hook(Key, NewValue), + put_config_post_change_actions(Key, NewValue), ok. do_deep_get(?CONF, AtomKeyPath, Map, Default) -> @@ -918,8 +918,8 @@ rawconf_to_conf(SchemaModule, RawPath, RawValue) -> %% When the global zone change, the zones is updated with the new global zone. %% The global zone's keys is too many, %% so we don't choose to write a global zone change emqx_config_handler callback to hook -post_save_config_hook(?PERSIS_KEY(?CONF, zones), _Zones) -> +put_config_post_change_actions(?PERSIS_KEY(?CONF, zones), _Zones) -> emqx_flapping:update_config(), ok; -post_save_config_hook(_Key, _NewValue) -> +put_config_post_change_actions(_Key, _NewValue) -> ok. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 129fee012..252e47106 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -97,9 +97,17 @@ detect(ClientId, PeerHost, #{enable := true, max_count := Threshold} = Policy) - detect(_ClientId, _PeerHost, #{enable := false}) -> false. -%% with default, if we delete Zone at running time. we should not crash. get_policy(Zone) -> - emqx_config:get_zone_conf(Zone, [flapping_detect], ?DEFAULT_POLICY). + Flapping = [flapping_detect], + case emqx_config:get_zone_conf(Zone, Flapping, undefined) of + undefined -> + %% If zone has be deleted at running time, + %% we don't crash the connection and disable flapping detect. + Policy = emqx_config:get(Flapping), + Policy#{enable => false}; + Policy -> + Policy + end. now_diff(TS) -> erlang:system_time(millisecond) - TS. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 4929c2023..49d96e26a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -33,6 +33,7 @@ -define(MAX_INT_TIMEOUT_MS, 4294967295). %% floor(?MAX_INT_TIMEOUT_MS / 1000). -define(MAX_INT_TIMEOUT_S, 4294967). +-define(DEFAULT_WINDOW_TIME, "1m"). -type duration() :: integer(). -type duration_s() :: integer(). @@ -275,7 +276,10 @@ roots(low) -> {"flapping_detect", sc( ref("flapping_detect"), - #{importance => ?DEFAULT_IMPORTANCE} + #{ + importance => ?DEFAULT_IMPORTANCE, + converter => fun flapping_detect_converter/2 + } )}, {"persistent_session_store", sc( @@ -692,7 +696,7 @@ fields("flapping_detect") -> sc( duration(), #{ - default => "1m", + default => ?DEFAULT_WINDOW_TIME, importance => ?IMPORTANCE_HIGH, desc => ?DESC(flapping_detect_window_time) } @@ -3495,3 +3499,9 @@ mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) -> Mqtt#{<<"keepalive_multiplier">> => Backoff * 2}; mqtt_converter(Mqtt, _Opts) -> Mqtt. + +%% For backward compatibility with window_time is disable +flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) -> + Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false}; +flapping_detect_converter(Conf, _Opts) -> + Conf. diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index d377eba4a..b03334dc6 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -161,5 +161,15 @@ validate_timer(Names) -> ?assertEqual(maps:keys(Zones), maps:keys(Timers)), ok. +t_window_compatibility_check(_Conf) -> + Flapping = emqx:get_config([flapping_detect]), + ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>), + ?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])), + %% reset + FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]), + ok = emqx_config:init_load(emqx_schema, FlappingBin), + ?assertEqual(Flapping, emqx:get_config([flapping_detect])), + ok. + get_policy(Zone) -> emqx_config:get_zone_conf(Zone, [flapping_detect]). From 70e800f8a4360507286cf97d5d91ccb900709447 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 7 Jun 2023 22:50:11 +0800 Subject: [PATCH 6/7] chore: delete unuse default value and make flapping_detect middle improtance --- apps/emqx/src/emqx_flapping.erl | 6 ------ apps/emqx/src/emqx_schema.erl | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 252e47106..7e8b8f9fc 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -50,12 +50,6 @@ started_at :: pos_integer(), detect_cnt :: integer() }). --define(DEFAULT_POLICY, #{ - enable => false, - max_count => 15, - window_time => 60000, - ban_time => 5 * 6000 -}). -opaque flapping() :: #flapping{}. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 49d96e26a..8c4a7a865 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -277,7 +277,7 @@ roots(low) -> sc( ref("flapping_detect"), #{ - importance => ?DEFAULT_IMPORTANCE, + importance => ?IMPORTANCE_MEDIUM, converter => fun flapping_detect_converter/2 } )}, From 622ac4d1951dc0dc377a7fb914825ca270611235 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 7 Jun 2023 17:14:10 +0200 Subject: [PATCH 7/7] fix(emqx_schema): binary for string default value --- apps/emqx/src/emqx_schema.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 8c4a7a865..51b2fbea6 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -33,7 +33,7 @@ -define(MAX_INT_TIMEOUT_MS, 4294967295). %% floor(?MAX_INT_TIMEOUT_MS / 1000). -define(MAX_INT_TIMEOUT_S, 4294967). --define(DEFAULT_WINDOW_TIME, "1m"). +-define(DEFAULT_WINDOW_TIME, <<"1m">>). -type duration() :: integer(). -type duration_s() :: integer().