From 0a1679b12264b100f221cc16222e9de5a4f63998 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 10 Sep 2019 15:21:08 +0800 Subject: [PATCH] Implement a new flapping module (#2884) --- etc/emqx.conf | 54 ++------- priv/emqx.schema | 42 +++---- src/emqx_banned.erl | 18 ++- src/emqx_channel.erl | 34 +++--- src/emqx_flapping.erl | 247 ++++++++++++++++++++++++++---------------- 5 files changed, 212 insertions(+), 183 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 34c64b099..a211d91d9 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -483,16 +483,14 @@ acl_cache_ttl = 1m ## Default: ignore acl_deny_action = ignore -## The cleanning interval for flapping +## Specify the global flapping detect policy. +## The value is a string composed of flapping threshold, duration and banned interval. +## 1. threshold: an integer to specfify the disconnected times of a MQTT Client; +## 2. duration: the time window for flapping detect; +## 3. banned interval: the banned interval if a flapping is detected. ## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 1h, 1 hour -## flapping_clean_interval = 1h +## Value: Integer,Duration,Duration +flapping_detect_policy = 30, 1m, 5m ##-------------------------------------------------------------------- ## MQTT Protocol @@ -728,25 +726,6 @@ zone.external.mqueue_store_qos0 = true ## Value: on | off zone.external.enable_flapping_detect = off -## The times of state change per min, specifying the threshold which is used to -## detect if the connection starts flapping -## -## Value: number -zone.external.flapping_threshold = 10, 1m - -## Flapping expiry interval for connections. -## This config entry is used to determine when the connection -## will be unbanned. -## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 1h, 1 hour -zone.external.flapping_banned_expiry_interval = 1h - ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -828,25 +807,6 @@ zone.internal.mqueue_store_qos0 = true ## Value: on | off zone.internal.enable_flapping_detect = off -## The times of state change per second, specifying the threshold which is used to -## detect if the connection starts flapping -## -## Value: number -zone.internal.flapping_threshold = 10, 1m - -## Flapping expiry interval for connections. -## This config entry is used to determine when the connection -## will be unbanned. -## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 1h, 1 hour -zone.internal.flapping_banned_expiry_interval = 1h - ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: diff --git a/priv/emqx.schema b/priv/emqx.schema index 48b85ad6e..2c43ad577 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -631,11 +631,27 @@ end}. {datatype, {enum, [ignore, disconnect]}} ]}. -%% @doc time interval to clean flapping records -{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [ - {datatype, {duration, ms}} +%% @doc Flapping detect policy +{mapping, "flapping_detect_policy", "emqx.flapping_detect_policy", [ + {datatype, string}, + {default, "30,1m,5m"} ]}. +{translation, "emqx.flapping_detect_policy", fun(Conf) -> + Policy = cuttlefish:conf_get("flapping_detect_policy", Conf), + [Threshold, Duration, Interval] = string:tokens(Policy, ", "), + ParseDuration = fun(S) -> + case cuttlefish_duration:parse(S, ms) of + I when is_integer(I) -> I; + {error, Reason} -> error(Reason) + end + end, + #{threshold => list_to_integer(Threshold), + duration => ParseDuration(Duration), + banned_interval => ParseDuration(Interval) + } +end}. + {validator, "range:gt_0", "must greater than 0", fun(X) -> X > 0 end }. @@ -877,15 +893,8 @@ end}. ]}. {mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [ - {datatype, flag} -]}. - -{mapping, "zone.$name.flapping_threshold", "emqx.zones", [ - {datatype, string} -]}. - -{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [ - {datatype, {duration, s}} + {datatype, flag}, + {default, off} ]}. %% @doc Force connection/session process GC after this number of @@ -919,15 +928,6 @@ end}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {retain_available, Val}; - ("flapping_threshold", Val) -> - [Limit, Duration] = string:tokens(Val, ", "), - FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of - Min when is_integer(Min) -> - {list_to_integer(Limit), Min}; - {error, Reason} -> - error(Reason) - end, - {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> {wildcard_subscription, Val}; ("shared_subscription", Val) -> diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index ba3743385..4c1d7e969 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -38,6 +38,8 @@ , info/1 ]). +-export([is_enabled/1]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -70,10 +72,18 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec(check(emqx_types:client()) -> boolean()). -check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -> - ets:member(?BANNED_TAB, {client_id, ClientId}) - orelse ets:member(?BANNED_TAB, {username, Username}) - orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}). +check(#{zone := Zone, + client_id := ClientId, + username := Username, + peername := {IPAddr, _} + }) -> + is_enabled(Zone) andalso + ets:member(?BANNED_TAB, {client_id, ClientId}) + orelse ets:member(?BANNED_TAB, {username, Username}) + orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}). + +is_enabled(Zone) -> + emqx_zone:get_env(Zone, enable_ban, false). -spec(add(emqx_types:banned()) -> ok). add(Banned) when is_record(Banned, banned) -> diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index aa001cb01..2c2b7c02f 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -221,6 +221,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun init_protocol/2, fun enrich_client/2, fun set_logger_meta/2, + fun check_banned/2, + fun check_flapping/2, fun auth_connect/2], ConnPkt, Channel) of {ok, NConnPkt, NChannel} -> process_connect(NConnPkt, NChannel); @@ -862,8 +864,6 @@ validate_packet(Packet, _Channel) -> check_connect(ConnPkt, Channel) -> pipeline([fun check_proto_ver/2, fun check_client_id/2, - %%fun check_flapping/2, - fun check_banned/2, fun check_will_topic/2, fun check_will_retain/2], ConnPkt, Channel). @@ -898,20 +898,6 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. -%%TODO: check banned... -check_banned(#mqtt_packet_connect{client_id = ClientId, - username = Username}, - #channel{client = Client = #{zone := Zone}}) -> - case emqx_zone:get_env(Zone, enable_ban, false) of - true -> - case emqx_banned:check(Client#{client_id => ClientId, - username => Username}) of - true -> {error, ?RC_BANNED}; - false -> ok - end; - false -> ok - end. - check_will_topic(#mqtt_packet_connect{will_flag = false}, _Channel) -> ok; check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _Channel) -> @@ -974,6 +960,22 @@ fix_mountpoint(_ConnPkt, Client = #{mountpoint := Mountpoint}) -> set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) -> emqx_logger:set_metadata_client_id(ClientId). +%%-------------------------------------------------------------------- +%% Check banned/flapping +%%-------------------------------------------------------------------- + +check_banned(_ConnPkt, #channel{client = Client}) -> + case emqx_banned:check(Client) of + true -> {error, ?RC_BANNED}; + false -> ok + end. + +check_flapping(_ConnPkt, #channel{client = Client}) -> + case emqx_flapping:check(Client) of + true -> {error, ?RC_CONNECTION_RATE_EXCEEDED}; + false -> ok + end. + %%-------------------------------------------------------------------- %% Auth Connect %%-------------------------------------------------------------------- diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 0a9a62d56..dc507433f 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -14,129 +14,186 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc This module is used to garbage clean the flapping records. - -module(emqx_flapping). +-behaviour(gen_server). + -include("emqx.hrl"). -include("types.hrl"). +-include("logger.hrl"). --behaviour(gen_statem). +-logger_header("[Flapping]"). -export([start_link/0]). -%% gen_statem callbacks +%% API +-export([check/1, detect/1]). + +%% gen_server callbacks -export([ init/1 - , initialized/3 - , callback_mode/0 - , terminate/3 - , code_change/4 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 ]). +%% Tab -define(FLAPPING_TAB, ?MODULE). - --define(default_flapping_clean_interval, 3600000). - --export([check/3]). - --record(flapping, { - client_id :: binary(), - check_count :: integer(), - timestamp :: integer() +%% Default Policy +-define(FLAPPING_THRESHOLD, 30). +-define(FLAPPING_DURATION, 60000). +-define(FLAPPING_BANNED_INTERVAL, 300000). +-define(DEFAULT_DETECT_POLICY, + #{threshold => ?FLAPPING_THRESHOLD, + duration => ?FLAPPING_DURATION, + banned_interval => ?FLAPPING_BANNED_INTERVAL }). --type(flapping_record() :: #flapping{}). +-record(flapping, { + client_id :: emqx_types:client_id(), + peername :: emqx_types:peername(), + started_at :: pos_integer(), + detect_cnt :: pos_integer(), + banned_at :: pos_integer() + }). --type(flapping_state() :: flapping | ok). +-opaque(flapping() :: #flapping{}). -%% @doc This function is used to initialize flapping records -%% the expiry time unit is minutes. --spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()). -init_flapping(ClientId, Interval) -> - #flapping{client_id = ClientId, - check_count = 1, - timestamp = emqx_time:now_secs() + Interval}. +-export_type([flapping/0]). -%% @doc This function is used to initialize flapping records -%% the expiry time unit is minutes. --spec(check(Action :: atom(), ClientId :: binary(), - Threshold :: {integer(), integer()}) -> flapping_state()). -check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) -> - check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)). - --spec(check(Action :: atom(), ClientId :: binary(), - Threshold :: {integer(), integer()}, - InitFlapping :: flapping_record()) -> flapping_state()). -check(Action, ClientId, Threshold, InitFlapping) -> - case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of - 1 -> ok; - CheckCount -> - case ets:lookup(?FLAPPING_TAB, ClientId) of - [Flapping] -> - check_flapping(Action, CheckCount, Threshold, Flapping); - _Flapping -> - ok - end - end. - -check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, - Flapping = #flapping{ client_id = ClientId - , timestamp = Timestamp }) -> - case emqx_time:now_secs() of - NowTimestamp when NowTimestamp =< Timestamp, - CheckCount > TimesThreshold -> - ets:delete(?FLAPPING_TAB, ClientId), - flapping; - NowTimestamp when NowTimestamp > Timestamp, - Action =:= disconnect -> - ets:delete(?FLAPPING_TAB, ClientId), - ok; - NowTimestamp -> - NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval}, - ets:insert(?FLAPPING_TAB, NewFlapping), - ok - end. - -%%-------------------------------------------------------------------- -%% gen_statem callbacks -%%-------------------------------------------------------------------- --spec(start_link() -> startlink_ret()). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% @doc Check flapping when a MQTT client connected. +-spec(check(emqx_types:client()) -> boolean()). +check(#{zone := Zone, client_id := ClientId}) -> + is_enabled(Zone) andalso check(ClientId, get_policy()). + +check(ClientId, #{banned_interval := Interval}) -> + case ets:lookup(?FLAPPING_TAB, {banned, ClientId}) of + [] -> false; + [#flapping{banned_at = BannedAt}] -> + now_diff(BannedAt) < Interval + end. + +%% @doc Detect flapping when a MQTT client disconnected. +-spec(detect(emqx_types:client()) -> boolean()). +detect(Client = #{zone := Zone}) -> + is_enabled(Zone) andalso detect(Client, get_policy()). + +detect(#{client_id := ClientId, peername := Peername}, + Policy = #{threshold := Threshold}) -> + try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of + Cnt when Cnt < Threshold -> false; + _Cnt -> case ets:lookup(?FLAPPING_TAB, ClientId) of + [Flapping] -> + ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}), + true; + [] -> false + end + catch + error:badarg -> + %% Create a flapping record. + Flapping = #flapping{client_id = ClientId, + peername = Peername, + started_at = emqx_time:now_ms(), + detect_cnt = 1 + }, + true = ets:insert(?FLAPPING_TAB, Flapping), + false + end. + +-compile({inline, [is_enabled/1, get_policy/0, now_diff/1]}). + +-spec(is_enabled(emqx_types:zone()) -> boolean()). +is_enabled(Zone) -> + emqx_zone:get(Zone, enable_flapping_detect, false). + +get_policy() -> + emqx:get_env(flapping_detect_policy, ?DEFAULT_DETECT_POLICY). + +now_diff(TS) -> emqx_time:now_ms() - TS. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- init([]) -> - Interval = emqx:get_env(flapping_clean_interval, ?default_flapping_clean_interval), - TabOpts = [ public - , set - , {keypos, 2} - , {write_concurrency, true} - , {read_concurrency, true}], - ok = emqx_tables:new(?FLAPPING_TAB, TabOpts), - {ok, initialized, #{timer_interval => Interval}}. + #{duration := Duration, banned_interval := Interval} = get_policy(), + ok = emqx_tables:new(?FLAPPING_TAB, [public, set, + {keypos, 2}, + {read_concurrency, true}, + {write_concurrency, true} + ]), + State = #{time => max(Duration, Interval) + 1, tref => undefined}, + {ok, ensure_timer(State), hibernate}. -callback_mode() -> [state_functions, state_enter]. +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. -initialized(enter, _OldState, #{timer_interval := Time}) -> - Action = {state_timeout, Time, clean_expired_records}, - {keep_state_and_data, Action}; -initialized(state_timeout, clean_expired_records, #{}) -> - clean_expired_records(), - repeat_state_and_data. +handle_cast({detected, Flapping = #flapping{client_id = ClientId, + peername = Peername, + started_at = StartedAt, + detect_cnt = DetectCnt}, + #{duration := Duration}}, State) -> + case (Interval = now_diff(StartedAt)) < Duration of + true -> %% Flapping happened:( + %% Log first + ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", + [ClientId, esockd_net:format(Peername), DetectCnt, Duration]), + %% TODO: Send Alarm + %% Banned. + BannedFlapping = Flapping#flapping{client_id = {banned, ClientId}, + banned_at = emqx_time:now_ms() + }, + ets:insert(?FLAPPING_TAB, BannedFlapping); + false -> + ?LOG(warning, "~s(~s) disconnected ~w times in ~wms", + [ClientId, esockd_net:format(Peername), DetectCnt, Interval]), + ets:delete_object(?FLAPPING_TAB, Flapping) + end, + {noreply, State}; -code_change(_Vsn, State, Data, _Extra) -> - {ok, State, Data}. +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. -terminate(_Reason, _StateName, _State) -> - emqx_tables:delete(?FLAPPING_TAB), +handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) -> + with_flapping_tab(fun expire_flapping/2, + [emqx_time:now_ms(), get_policy()]), + {noreply, ensure_timer(State#{tref => undefined}), hibernate}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -%% @doc clean expired records in ets -clean_expired_records() -> - NowTime = emqx_time:now_secs(), - MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}], - ets:select_delete(?FLAPPING_TAB, MatchSpec). +ensure_timer(State = #{time := Time, tref := undefined}) -> + State#{tref => emqx_misc:start_timer(Time, expire_flapping)}; +ensure_timer(State) -> State. +with_flapping_tab(Fun, Args) -> + case ets:info(?FLAPPING_TAB, size) of + undefined -> ok; + 0 -> ok; + _Size -> erlang:apply(Fun, Args) + end. + +expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) -> + ets:select_delete(?FLAPPING_TAB, + [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'}, + [{'<', '$1', NowTime-Duration}], [true]}, + {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'}, + [{'<', '$1', NowTime-Interval}], [true]}]).