diff --git a/etc/emqx.conf b/etc/emqx.conf index 5da71b1d2..1d623953b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -694,7 +694,7 @@ zone.external.flapping_threshold = 10, 1m ## -s: second ## ## Default: 1h, 1 hour -zone.external.flapping_expiry_interval = 1h +zone.external.flapping_banned_expiry_interval = 1h ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## @@ -789,7 +789,7 @@ zone.internal.flapping_threshold = 10, 1m ## -s: second ## ## Default: 1h, 1 hour -zone.internal.flapping_expiry_interval = 1h +zone.internal.flapping_banned_expiry_interval = 1h ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index f28913565..5c7f421ce 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -836,7 +836,7 @@ end}. {datatype, string} ]}. -{mapping, "zone.$name.flapping_expiry_interval", "emqx.zones", [ +{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [ {datatype, {duration, s}} ]}. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index a915fdddb..2a81e0bcb 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -207,4 +207,3 @@ stats_fun() -> undefined -> ok; Size -> emqx_stats:setstat('connections/count', 'connections/max', Size) end. - diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 7e369a2e3..4655a8ab3 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -50,27 +50,23 @@ %% the expiry time unit is minutes. -spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()). init_flapping(ClientId, Interval) -> - #flapping{ client_id = ClientId - , check_count = 1 - , timestamp = emqx_time:now_secs() + Interval - }. + #flapping{client_id = ClientId, + check_count = 1, + timestamp = emqx_time:now_secs() + Interval}. %% @doc This function is used to initialize flapping records %% the expiry time unit is minutes. --spec(check( Action :: atom() - , ClientId :: binary() - , Threshold :: {integer(), integer()}) - -> flapping_state()). +-spec(check(Action :: atom(), ClientId :: binary(), + Threshold :: {integer(), integer()}) -> flapping_state()). check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) -> check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)). --spec(check( Action :: atom() - , ClientId :: binary() - , Threshold :: {integer(), integer()} - , InitFlapping :: flapping_record()) - -> flapping_state()). +-spec(check(Action :: atom(), ClientId :: binary(), + Threshold :: {integer(), integer()}, + InitFlapping :: flapping_record()) -> flapping_state()). check(Action, ClientId, Threshold, InitFlapping) -> - try ets:update_counter(?FLAPPING_TAB, ClientId, {_Pos = #flapping.check_count, 1}) of + case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of + 1 -> ok; CheckCount -> case ets:lookup(?FLAPPING_TAB, ClientId) of [Flapping] -> @@ -78,23 +74,14 @@ check(Action, ClientId, Threshold, InitFlapping) -> _Flapping -> ok end - catch - error:badarg -> - ets:insert_new(?FLAPPING_TAB, InitFlapping), - ok end. --spec(check_flapping( Action :: atom() - , CheckTimes :: integer() - , Threshold :: {integer(), integer()} - , InitFlapping :: flapping_record()) - -> flapping_state()). -check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval}, +check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, Flapping = #flapping{ client_id = ClientId , timestamp = Timestamp }) -> case emqx_time:now_secs() of NowTimestamp when NowTimestamp =< Timestamp, - CheckTimes > TimesThreshold -> + CheckCount > TimesThreshold -> ets:delete(?FLAPPING_TAB, ClientId), flapping; NowTimestamp when NowTimestamp > Timestamp, @@ -110,7 +97,7 @@ check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval}, %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- --spec(start_link(TimerInterval :: integer()) -> startlink_ret()). +-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()). start_link(TimerInterval) -> gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []). @@ -145,17 +132,6 @@ terminate(_Reason, _StateName, _State) -> %% @doc clean expired records in ets clean_expired_records() -> - Records = ets:tab2list(?FLAPPING_TAB), - traverse_records(Records). - -traverse_records([]) -> - ok; -traverse_records([#flapping{client_id = ClientId, - timestamp = Timestamp} | LeftRecords]) -> - case emqx_time:now_secs() > Timestamp of - true -> - ets:delete(?FLAPPING_TAB, ClientId); - false -> - true - end, - traverse_records(LeftRecords). + NowTime = emqx_time:now_secs(), + MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}], + ets:select_delete(?FLAPPING_TAB, MatchSpec). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 3de7f977d..e5ca486cc 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -943,16 +943,15 @@ flag(true) -> 1. do_flapping_detect(Action, #pstate{zone = Zone, client_id = ClientId, enable_flapping_detect = true}) -> - ExpiryInterval = emqx_zone:get_env(Zone, flapping_expiry_interval, 3600000), + BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000), Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20), - Until = erlang:system_time(second) + ExpiryInterval, + Until = erlang:system_time(second) + BanExpiryInterval, case emqx_flapping:check(Action, ClientId, Threshold) of flapping -> emqx_banned:add(#banned{who = {client_id, ClientId}, reason = <<"flapping">>, by = <<"flapping_checker">>, - until = Until - }), + until = Until}), ok; _Other -> ok