diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 29a59e482..1506e296f 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1630,7 +1630,7 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) -> %% Flapping count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> - emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso + is_integer(emqx_config:get_zone_conf(Zone, [flapping_detect, window_time])) andalso emqx_flapping:detect(ClientInfo), {ok, Channel}. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 64e4ed6c3..cc4af99d2 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -27,6 +27,10 @@ %% API -export([detect/1]). +-ifdef(TEST). +-export([get_policy/2]). +-endif. + %% gen_server callbacks -export([ init/1, @@ -39,15 +43,6 @@ %% Tab -define(FLAPPING_TAB, ?MODULE). -%% Default Policy --define(FLAPPING_THRESHOLD, 30). --define(FLAPPING_DURATION, 60000). --define(FLAPPING_BANNED_INTERVAL, 300000). --define(DEFAULT_DETECT_POLICY, #{ - max_count => ?FLAPPING_THRESHOLD, - window_time => ?FLAPPING_DURATION, - ban_time => ?FLAPPING_BANNED_INTERVAL -}). -record(flapping, { clientid :: emqx_types:clientid(), @@ -69,7 +64,7 @@ stop() -> gen_server:stop(?MODULE). %% @doc Detect flapping when a MQTT client disconnected. -spec detect(emqx_types:clientinfo()) -> boolean(). detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) -> - Policy = #{max_count := Threshold} = get_policy(Zone), + Policy = #{max_count := Threshold} = get_policy([max_count, window_time, ban_time], Zone), %% The initial flapping record sets the detect_cnt to 0. InitVal = #flapping{ clientid = ClientId, @@ -89,8 +84,22 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) -> end end. -get_policy(Zone) -> - emqx_config:get_zone_conf(Zone, [flapping_detect]). +get_policy(Keys, Zone) when is_list(Keys) -> + RootKey = flapping_detect, + Conf = emqx_config:get_zone_conf(Zone, [RootKey]), + lists:foldl( + fun(Key, Acc) -> + case maps:find(Key, Conf) of + {ok, V} -> Acc#{Key => V}; + error -> Acc#{Key => emqx_config:get([RootKey, Key])} + end + end, + #{}, + Keys + ); +get_policy(Key, Zone) -> + #{Key := Conf} = get_policy([Key], Zone), + Conf. now_diff(TS) -> erlang:system_time(millisecond) - TS. @@ -166,8 +175,7 @@ handle_cast(Msg, State) -> handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> Timestamp = - erlang:system_time(millisecond) - - maps:get(window_time, get_policy(Zone)), + erlang:system_time(millisecond) - get_policy(window_time, Zone), MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}], ets:select_delete(?FLAPPING_TAB, MatchSpec), _ = start_timer(Zone), @@ -183,15 +191,19 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. start_timer(Zone) -> - WindTime = maps:get(window_time, get_policy(Zone)), - emqx_misc:start_timer(WindTime, {garbage_collect, Zone}). + case get_policy(window_time, Zone) of + WindowTime when is_integer(WindowTime) -> + emqx_misc:start_timer(WindowTime, {garbage_collect, Zone}); + disabled -> + ok + end. start_timers() -> - lists:foreach( - fun({Zone, _ZoneConf}) -> + maps:foreach( + fun(Zone, _ZoneConf) -> start_timer(Zone) end, - maps:to_list(emqx:get_config([zones], #{})) + emqx:get_config([zones], #{}) ). fmt_host(PeerHost) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f03ba07c5..c3d566554 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -145,17 +145,23 @@ roots(high) -> {"listeners", sc( ref("listeners"), - #{} - )}, - {"zones", - sc( - map("name", ref("zone")), - #{desc => ?DESC(zones)} + #{importance => ?IMPORTANCE_HIGH} )}, {"mqtt", sc( ref("mqtt"), - #{desc => ?DESC(mqtt)} + #{ + desc => ?DESC(mqtt), + importance => ?IMPORTANCE_MEDIUM + } + )}, + {"zones", + sc( + map("name", ref("zone")), + #{ + desc => ?DESC(zones), + importance => ?IMPORTANCE_LOW + } )}, {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication(global)}, %% NOTE: authorization schema here is only to keep emqx app prue @@ -199,7 +205,9 @@ roots(low) -> {"conn_congestion", sc( ref("conn_congestion"), - #{} + #{ + importance => ?IMPORTANCE_HIDDEN + } )}, {"stats", sc( @@ -221,7 +229,7 @@ roots(low) -> {"flapping_detect", sc( ref("flapping_detect"), - #{} + #{importance => ?IMPORTANCE_HIDDEN} )}, {"persistent_session_store", sc( @@ -620,25 +628,27 @@ fields("flapping_detect") -> boolean(), #{ default => false, + deprecated => {since, "5.0.22"}, desc => ?DESC(flapping_detect_enable) } )}, - {"max_count", - sc( - integer(), - #{ - default => 15, - desc => ?DESC(flapping_detect_max_count) - } - )}, {"window_time", sc( - duration(), + hoconsc:union([disabled, duration()]), #{ - default => <<"1m">>, + default => disabled, + importance => ?IMPORTANCE_HIGH, desc => ?DESC(flapping_detect_window_time) } )}, + {"max_count", + sc( + non_neg_integer(), + #{ + default => 15, + desc => ?DESC(flapping_detect_max_count) + } + )}, {"ban_time", sc( duration(), diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index 6ac8cefa3..5d6720986 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -55,7 +55,10 @@ zone_without_hidden() -> hidden() -> [ - "stats" + "stats", + "overload_protection", + "conn_congestion", + "flapping_detect" ]. %% zone schemas are clones from the same name from root level diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index e27ff67e0..877f05995 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -101,3 +101,21 @@ t_expired_detecting(_) -> ets:tab2list(emqx_flapping) ) ). + +t_conf_without_window_time(_) -> + %% enable is deprecated, so we need to make sure it won't be used. + Global = emqx_config:get([flapping_detect]), + ?assertNot(maps:is_key(enable, Global)), + %% zones don't have default value, so we need to make sure fallback to global conf. + %% this new_zone will fallback to global conf. + emqx_config:put_zone_conf(new_zone, [flapping_detect], #{}), + ?assertEqual(Global, get_policy(new_zone)), + + emqx_config:put_zone_conf(new_zone_1, [flapping_detect], #{window_time => 100}), + ?assertEqual(100, emqx_flapping:get_policy(window_time, new_zone_1)), + ?assertEqual(maps:get(ban_time, Global), emqx_flapping:get_policy(ban_time, new_zone_1)), + ?assertEqual(maps:get(max_count, Global), emqx_flapping:get_policy(max_count, new_zone_1)), + ok. + +get_policy(Zone) -> + emqx_flapping:get_policy([window_time, ban_time, max_count], Zone). diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 2d24bce99..06aa6f78d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -121,7 +121,9 @@ t_log(_Config) -> t_global_zone(_Config) -> {ok, Zones} = get_global_zone(), - ZonesKeys = lists:map(fun({K, _}) -> K end, hocon_schema:roots(emqx_zone_schema)), + ZonesKeys = lists:map( + fun({K, _}) -> list_to_binary(K) end, emqx_zone_schema:zone_without_hidden() + ), ?assertEqual(lists:usort(ZonesKeys), lists:usort(maps:keys(Zones))), ?assertEqual( emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]),