diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2faadce62..f317dbd8d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -671,7 +671,9 @@ handle_info(disconnected, Channel = #channel{connected = false}) -> {ok, Channel}; handle_info(disconnected, Channel = #channel{protocol = Protocol, - session = Session}) -> + session = Session, + client = Client = #{zone := Zone}}) -> + emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(Client), Channel1 = ensure_disconnected(Channel), Channel2 = case timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)) of 0 -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 6e5f98c14..ee898acbd 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -141,11 +141,11 @@ handle_cast({detected, Flapping = #flapping{client_id = ClientId, %% Log first ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", [ClientId, esockd_net:format(Peername), DetectCnt, Duration]), - %% TODO: Send Alarm %% Banned. BannedFlapping = Flapping#flapping{client_id = {banned, ClientId}, banned_at = emqx_time:now_ms() }, + alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}), ets:insert(?FLAPPING_TAB, BannedFlapping); false -> ?LOG(warning, "~s(~s) disconnected ~w times in ~wms", @@ -189,9 +189,17 @@ with_flapping_tab(Fun, Args) -> end. expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) -> - ets:select_delete(?FLAPPING_TAB, - [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'}, - [{'<', '$1', NowTime-Duration}], [true]}, - {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'}, - [{'<', '$1', NowTime-Interval}], [true]}]). + case ets:select(?FLAPPING_TAB, + [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'}, + [{'<', '$1', NowTime-Duration}], ['$_']}, + {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'}, + [{'<', '$1', NowTime-Interval}], ['$_']}]) of + [] -> ok; + Flappings -> + lists:foreach(fun(Flapping = #flapping{client_id = {banned, ClientId}}) -> + ets:delete_object(?FLAPPING_TAB, Flapping), + alarm_handler:clear_alarm({flapping_detected, ClientId}); + (_) -> ok + end, Flappings) + end. diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index 493a57b6e..cb5d0321c 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -22,22 +22,24 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - prepare_env(), + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([], fun set_special_configs/1), Config. -prepare_env() -> +set_special_configs(emqx) -> emqx_zone:set_env(external, enable_flapping_detect, true), application:set_env(emqx, flapping_detect_policy, #{threshold => 3, duration => 100, banned_interval => 200 - }). + }); +set_special_configs(_App) -> ok. end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]), ok. t_detect_check(_) -> - {ok, _Pid} = emqx_flapping:start_link(), Client = #{zone => external, client_id => <<"clientid">>, peername => {{127,0,0,1}, 5000}