diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 745429536..8a936067e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1630,7 +1630,7 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) -> %% Flapping count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> - emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso + is_integer(emqx_config:get_zone_conf(Zone, [flapping_detect, window_time])) andalso emqx_flapping:detect(ClientInfo), {ok, Channel}. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 46193fb16..70b1a3232 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -27,6 +27,10 @@ %% API -export([detect/1]). +-ifdef(TEST). +-export([get_policy/2]). +-endif. + %% gen_server callbacks -export([ init/1, @@ -39,15 +43,6 @@ %% Tab -define(FLAPPING_TAB, ?MODULE). -%% Default Policy --define(FLAPPING_THRESHOLD, 30). --define(FLAPPING_DURATION, 60000). --define(FLAPPING_BANNED_INTERVAL, 300000). --define(DEFAULT_DETECT_POLICY, #{ - max_count => ?FLAPPING_THRESHOLD, - window_time => ?FLAPPING_DURATION, - ban_time => ?FLAPPING_BANNED_INTERVAL -}). -record(flapping, { clientid :: emqx_types:clientid(), @@ -69,7 +64,7 @@ stop() -> gen_server:stop(?MODULE). %% @doc Detect flapping when a MQTT client disconnected. -spec detect(emqx_types:clientinfo()) -> boolean(). detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) -> - Policy = #{max_count := Threshold} = get_policy(Zone), + Policy = #{max_count := Threshold} = get_policy([max_count, window_time, ban_time], Zone), %% The initial flapping record sets the detect_cnt to 0. InitVal = #flapping{ clientid = ClientId, @@ -89,8 +84,22 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) -> end end. -get_policy(Zone) -> - emqx_config:get_zone_conf(Zone, [flapping_detect]). +get_policy(Keys, Zone) when is_list(Keys) -> + RootKey = flapping_detect, + Conf = emqx_config:get_zone_conf(Zone, [RootKey]), + lists:foldl( + fun(Key, Acc) -> + case maps:find(Key, Conf) of + {ok, V} -> Acc#{Key => V}; + error -> Acc#{Key => emqx_config:get([RootKey, Key])} + end + end, + #{}, + Keys + ); +get_policy(Key, Zone) -> + #{Key := Conf} = get_policy([Key], Zone), + Conf. now_diff(TS) -> erlang:system_time(millisecond) - TS. @@ -166,8 +175,7 @@ handle_cast(Msg, State) -> handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> Timestamp = - erlang:system_time(millisecond) - - maps:get(window_time, get_policy(Zone)), + erlang:system_time(millisecond) - get_policy(window_time, Zone), MatchSpec = [{{'_', '_', '_', '$1', '_'}, [{'<', '$1', Timestamp}], [true]}], ets:select_delete(?FLAPPING_TAB, MatchSpec), _ = start_timer(Zone), @@ -183,15 +191,19 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. start_timer(Zone) -> - WindTime = maps:get(window_time, get_policy(Zone)), - emqx_utils:start_timer(WindTime, {garbage_collect, Zone}). + case get_policy(window_time, Zone) of + WindowTime when is_integer(WindowTime) -> + emqx_utils:start_timer(WindowTime, {garbage_collect, Zone}); + disabled -> + ok + end. start_timers() -> - lists:foreach( - fun({Zone, _ZoneConf}) -> + maps:foreach( + fun(Zone, _ZoneConf) -> start_timer(Zone) end, - maps:to_list(emqx:get_config([zones], #{})) + emqx:get_config([zones], #{}) ). fmt_host(PeerHost) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index df5a91727..8335d69b8 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -145,17 +145,23 @@ roots(high) -> {"listeners", sc( ref("listeners"), - #{} - )}, - {"zones", - sc( - map("name", ref("zone")), - #{desc => ?DESC(zones)} + #{importance => ?IMPORTANCE_HIGH} )}, {"mqtt", sc( ref("mqtt"), - #{desc => ?DESC(mqtt)} + #{ + desc => ?DESC(mqtt), + importance => ?IMPORTANCE_MEDIUM + } + )}, + {"zones", + sc( + map("name", ref("zone")), + #{ + desc => ?DESC(zones), + importance => ?IMPORTANCE_LOW + } )}, {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication(global)}, %% NOTE: authorization schema here is only to keep emqx app prue @@ -199,12 +205,16 @@ roots(low) -> {"conn_congestion", sc( ref("conn_congestion"), - #{} + #{ + importance => ?IMPORTANCE_HIDDEN + } )}, {"stats", sc( ref("stats"), - #{} + #{ + importance => ?IMPORTANCE_HIDDEN + } )}, {"sysmon", sc( @@ -219,7 +229,7 @@ roots(low) -> {"flapping_detect", sc( ref("flapping_detect"), - #{} + #{importance => ?IMPORTANCE_HIDDEN} )}, {"persistent_session_store", sc( @@ -339,6 +349,7 @@ fields("stats") -> boolean(), #{ default => true, + importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(stats_enable) } )} @@ -609,8 +620,7 @@ fields("mqtt") -> )} ]; fields("zone") -> - Fields = emqx_zone_schema:roots(), - [{F, ref(emqx_zone_schema, F)} || F <- Fields]; + emqx_zone_schema:zone(); fields("flapping_detect") -> [ {"enable", @@ -618,25 +628,27 @@ fields("flapping_detect") -> boolean(), #{ default => false, + deprecated => {since, "5.0.23"}, desc => ?DESC(flapping_detect_enable) } )}, - {"max_count", - sc( - integer(), - #{ - default => 15, - desc => ?DESC(flapping_detect_max_count) - } - )}, {"window_time", sc( - duration(), + hoconsc:union([disabled, duration()]), #{ - default => <<"1m">>, + default => disabled, + importance => ?IMPORTANCE_HIGH, desc => ?DESC(flapping_detect_window_time) } )}, + {"max_count", + sc( + non_neg_integer(), + #{ + default => 15, + desc => ?DESC(flapping_detect_max_count) + } + )}, {"ban_time", sc( duration(), diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index c2595725b..5d6720986 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -15,8 +15,10 @@ %%-------------------------------------------------------------------- -module(emqx_zone_schema). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). --export([namespace/0, roots/0, fields/1, desc/1]). +-export([namespace/0, roots/0, fields/1, desc/1, zone/0, zone_without_hidden/0]). namespace() -> zone. @@ -33,6 +35,32 @@ roots() -> "overload_protection" ]. +zone() -> + Fields = roots(), + Hidden = hidden(), + lists:map( + fun(F) -> + case lists:member(F, Hidden) of + true -> + {F, ?HOCON(?R_REF(F), #{importance => ?IMPORTANCE_HIDDEN})}; + false -> + {F, ?HOCON(?R_REF(F), #{})} + end + end, + Fields + ). + +zone_without_hidden() -> + lists:map(fun(F) -> {F, ?HOCON(?R_REF(F), #{})} end, roots() -- hidden()). + +hidden() -> + [ + "stats", + "overload_protection", + "conn_congestion", + "flapping_detect" + ]. + %% zone schemas are clones from the same name from root level %% only not allowed to have default values. fields(Name) -> diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 6dd389350..29f8b1503 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -1137,7 +1137,7 @@ t_ws_cookie_init(_) -> %%-------------------------------------------------------------------- t_flapping_detect(_) -> - emqx_config:put_zone_conf(default, [flapping_detect, enable], true), + emqx_config:put_zone_conf(default, [flapping_detect, window_time], 60000), Parent = self(), ok = meck:expect( emqx_cm, diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index e27ff67e0..877f05995 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -101,3 +101,21 @@ t_expired_detecting(_) -> ets:tab2list(emqx_flapping) ) ). + +t_conf_without_window_time(_) -> + %% enable is deprecated, so we need to make sure it won't be used. + Global = emqx_config:get([flapping_detect]), + ?assertNot(maps:is_key(enable, Global)), + %% zones don't have default value, so we need to make sure fallback to global conf. + %% this new_zone will fallback to global conf. + emqx_config:put_zone_conf(new_zone, [flapping_detect], #{}), + ?assertEqual(Global, get_policy(new_zone)), + + emqx_config:put_zone_conf(new_zone_1, [flapping_detect], #{window_time => 100}), + ?assertEqual(100, emqx_flapping:get_policy(window_time, new_zone_1)), + ?assertEqual(maps:get(ban_time, Global), emqx_flapping:get_policy(ban_time, new_zone_1)), + ?assertEqual(maps:get(max_count, Global), emqx_flapping:get_policy(max_count, new_zone_1)), + ok. + +get_policy(Zone) -> + emqx_flapping:get_policy([window_time, ban_time, max_count], Zone). diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index b37a47be6..60b911296 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -352,8 +352,7 @@ with_default_value(Type, Value) -> Type#{example => emqx_utils_maps:binary_string(Value)}. global_zone_roots() -> - lists:map(fun({K, _}) -> K end, global_zone_schema()). + lists:map(fun({K, _}) -> list_to_binary(K) end, global_zone_schema()). global_zone_schema() -> - Roots = hocon_schema:roots(emqx_zone_schema), - lists:map(fun({RootKey, {_Root, Schema}}) -> {RootKey, Schema} end, Roots). + emqx_zone_schema:zone_without_hidden(). diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index e5704b817..b2ae33058 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -127,7 +127,9 @@ t_log(_Config) -> t_global_zone(_Config) -> {ok, Zones} = get_global_zone(), - ZonesKeys = lists:map(fun({K, _}) -> K end, hocon_schema:roots(emqx_zone_schema)), + ZonesKeys = lists:map( + fun({K, _}) -> list_to_binary(K) end, emqx_zone_schema:zone_without_hidden() + ), ?assertEqual(lists:usort(ZonesKeys), lists:usort(maps:keys(Zones))), ?assertEqual( emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]), diff --git a/changes/ce/feat-10358.en.md b/changes/ce/feat-10358.en.md new file mode 100644 index 000000000..e6d05c84b --- /dev/null +++ b/changes/ce/feat-10358.en.md @@ -0,0 +1,2 @@ +Hide `flapping_detect/conn_congestion/stats` configuration. +Deprecate `flapping_detect.enable`.