Implement a new flapping module (#2884)

This commit is contained in:
Feng Lee 2019-09-10 15:21:08 +08:00 committed by GitHub
parent 88321fe6fe
commit 0a1679b122
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 212 additions and 183 deletions

View File

@ -483,16 +483,14 @@ acl_cache_ttl = 1m
## Default: ignore
acl_deny_action = ignore
## The cleanning interval for flapping
## Specify the global flapping detect policy.
## The value is a string composed of flapping threshold, duration and banned interval.
## 1. threshold: an integer to specfify the disconnected times of a MQTT Client;
## 2. duration: the time window for flapping detect;
## 3. banned interval: the banned interval if a flapping is detected.
##
## Value: Duration
## -d: day
## -h: hour
## -m: minute
## -s: second
##
## Default: 1h, 1 hour
## flapping_clean_interval = 1h
## Value: Integer,Duration,Duration
flapping_detect_policy = 30, 1m, 5m
##--------------------------------------------------------------------
## MQTT Protocol
@ -728,25 +726,6 @@ zone.external.mqueue_store_qos0 = true
## Value: on | off
zone.external.enable_flapping_detect = off
## The times of state change per min, specifying the threshold which is used to
## detect if the connection starts flapping
##
## Value: number
zone.external.flapping_threshold = 10, 1m
## Flapping expiry interval for connections.
## This config entry is used to determine when the connection
## will be unbanned.
##
## Value: Duration
## -d: day
## -h: hour
## -m: minute
## -s: second
##
## Default: 1h, 1 hour
zone.external.flapping_banned_expiry_interval = 1h
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:
@ -828,25 +807,6 @@ zone.internal.mqueue_store_qos0 = true
## Value: on | off
zone.internal.enable_flapping_detect = off
## The times of state change per second, specifying the threshold which is used to
## detect if the connection starts flapping
##
## Value: number
zone.internal.flapping_threshold = 10, 1m
## Flapping expiry interval for connections.
## This config entry is used to determine when the connection
## will be unbanned.
##
## Value: Duration
## -d: day
## -h: hour
## -m: minute
## -s: second
##
## Default: 1h, 1 hour
zone.internal.flapping_banned_expiry_interval = 1h
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:

View File

@ -631,11 +631,27 @@ end}.
{datatype, {enum, [ignore, disconnect]}}
]}.
%% @doc time interval to clean flapping records
{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [
{datatype, {duration, ms}}
%% @doc Flapping detect policy
{mapping, "flapping_detect_policy", "emqx.flapping_detect_policy", [
{datatype, string},
{default, "30,1m,5m"}
]}.
{translation, "emqx.flapping_detect_policy", fun(Conf) ->
Policy = cuttlefish:conf_get("flapping_detect_policy", Conf),
[Threshold, Duration, Interval] = string:tokens(Policy, ", "),
ParseDuration = fun(S) ->
case cuttlefish_duration:parse(S, ms) of
I when is_integer(I) -> I;
{error, Reason} -> error(Reason)
end
end,
#{threshold => list_to_integer(Threshold),
duration => ParseDuration(Duration),
banned_interval => ParseDuration(Interval)
}
end}.
{validator, "range:gt_0", "must greater than 0",
fun(X) -> X > 0 end
}.
@ -877,15 +893,8 @@ end}.
]}.
{mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [
{datatype, flag}
]}.
{mapping, "zone.$name.flapping_threshold", "emqx.zones", [
{datatype, string}
]}.
{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [
{datatype, {duration, s}}
{datatype, flag},
{default, off}
]}.
%% @doc Force connection/session process GC after this number of
@ -919,15 +928,6 @@ end}.
{translation, "emqx.zones", fun(Conf) ->
Mapping = fun("retain_available", Val) ->
{retain_available, Val};
("flapping_threshold", Val) ->
[Limit, Duration] = string:tokens(Val, ", "),
FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of
Min when is_integer(Min) ->
{list_to_integer(Limit), Min};
{error, Reason} ->
error(Reason)
end,
{flapping_threshold, FlappingThreshold};
("wildcard_subscription", Val) ->
{wildcard_subscription, Val};
("shared_subscription", Val) ->

View File

@ -38,6 +38,8 @@
, info/1
]).
-export([is_enabled/1]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
@ -70,10 +72,18 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec(check(emqx_types:client()) -> boolean()).
check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) ->
ets:member(?BANNED_TAB, {client_id, ClientId})
orelse ets:member(?BANNED_TAB, {username, Username})
orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).
check(#{zone := Zone,
client_id := ClientId,
username := Username,
peername := {IPAddr, _}
}) ->
is_enabled(Zone) andalso
ets:member(?BANNED_TAB, {client_id, ClientId})
orelse ets:member(?BANNED_TAB, {username, Username})
orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).
is_enabled(Zone) ->
emqx_zone:get_env(Zone, enable_ban, false).
-spec(add(emqx_types:banned()) -> ok).
add(Banned) when is_record(Banned, banned) ->

View File

@ -221,6 +221,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
fun init_protocol/2,
fun enrich_client/2,
fun set_logger_meta/2,
fun check_banned/2,
fun check_flapping/2,
fun auth_connect/2], ConnPkt, Channel) of
{ok, NConnPkt, NChannel} ->
process_connect(NConnPkt, NChannel);
@ -862,8 +864,6 @@ validate_packet(Packet, _Channel) ->
check_connect(ConnPkt, Channel) ->
pipeline([fun check_proto_ver/2,
fun check_client_id/2,
%%fun check_flapping/2,
fun check_banned/2,
fun check_will_topic/2,
fun check_will_retain/2], ConnPkt, Channel).
@ -898,20 +898,6 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId},
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
end.
%%TODO: check banned...
check_banned(#mqtt_packet_connect{client_id = ClientId,
username = Username},
#channel{client = Client = #{zone := Zone}}) ->
case emqx_zone:get_env(Zone, enable_ban, false) of
true ->
case emqx_banned:check(Client#{client_id => ClientId,
username => Username}) of
true -> {error, ?RC_BANNED};
false -> ok
end;
false -> ok
end.
check_will_topic(#mqtt_packet_connect{will_flag = false}, _Channel) ->
ok;
check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _Channel) ->
@ -974,6 +960,22 @@ fix_mountpoint(_ConnPkt, Client = #{mountpoint := Mountpoint}) ->
set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) ->
emqx_logger:set_metadata_client_id(ClientId).
%%--------------------------------------------------------------------
%% Check banned/flapping
%%--------------------------------------------------------------------
check_banned(_ConnPkt, #channel{client = Client}) ->
case emqx_banned:check(Client) of
true -> {error, ?RC_BANNED};
false -> ok
end.
check_flapping(_ConnPkt, #channel{client = Client}) ->
case emqx_flapping:check(Client) of
true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
false -> ok
end.
%%--------------------------------------------------------------------
%% Auth Connect
%%--------------------------------------------------------------------

View File

@ -14,129 +14,186 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This module is used to garbage clean the flapping records.
-module(emqx_flapping).
-behaviour(gen_server).
-include("emqx.hrl").
-include("types.hrl").
-include("logger.hrl").
-behaviour(gen_statem).
-logger_header("[Flapping]").
-export([start_link/0]).
%% gen_statem callbacks
%% API
-export([check/1, detect/1]).
%% gen_server callbacks
-export([ init/1
, initialized/3
, callback_mode/0
, terminate/3
, code_change/4
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% Tab
-define(FLAPPING_TAB, ?MODULE).
-define(default_flapping_clean_interval, 3600000).
-export([check/3]).
-record(flapping, {
client_id :: binary(),
check_count :: integer(),
timestamp :: integer()
%% Default Policy
-define(FLAPPING_THRESHOLD, 30).
-define(FLAPPING_DURATION, 60000).
-define(FLAPPING_BANNED_INTERVAL, 300000).
-define(DEFAULT_DETECT_POLICY,
#{threshold => ?FLAPPING_THRESHOLD,
duration => ?FLAPPING_DURATION,
banned_interval => ?FLAPPING_BANNED_INTERVAL
}).
-type(flapping_record() :: #flapping{}).
-record(flapping, {
client_id :: emqx_types:client_id(),
peername :: emqx_types:peername(),
started_at :: pos_integer(),
detect_cnt :: pos_integer(),
banned_at :: pos_integer()
}).
-type(flapping_state() :: flapping | ok).
-opaque(flapping() :: #flapping{}).
%% @doc This function is used to initialize flapping records
%% the expiry time unit is minutes.
-spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()).
init_flapping(ClientId, Interval) ->
#flapping{client_id = ClientId,
check_count = 1,
timestamp = emqx_time:now_secs() + Interval}.
-export_type([flapping/0]).
%% @doc This function is used to initialize flapping records
%% the expiry time unit is minutes.
-spec(check(Action :: atom(), ClientId :: binary(),
Threshold :: {integer(), integer()}) -> flapping_state()).
check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) ->
check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)).
-spec(check(Action :: atom(), ClientId :: binary(),
Threshold :: {integer(), integer()},
InitFlapping :: flapping_record()) -> flapping_state()).
check(Action, ClientId, Threshold, InitFlapping) ->
case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of
1 -> ok;
CheckCount ->
case ets:lookup(?FLAPPING_TAB, ClientId) of
[Flapping] ->
check_flapping(Action, CheckCount, Threshold, Flapping);
_Flapping ->
ok
end
end.
check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
Flapping = #flapping{ client_id = ClientId
, timestamp = Timestamp }) ->
case emqx_time:now_secs() of
NowTimestamp when NowTimestamp =< Timestamp,
CheckCount > TimesThreshold ->
ets:delete(?FLAPPING_TAB, ClientId),
flapping;
NowTimestamp when NowTimestamp > Timestamp,
Action =:= disconnect ->
ets:delete(?FLAPPING_TAB, ClientId),
ok;
NowTimestamp ->
NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval},
ets:insert(?FLAPPING_TAB, NewFlapping),
ok
end.
%%--------------------------------------------------------------------
%% gen_statem callbacks
%%--------------------------------------------------------------------
-spec(start_link() -> startlink_ret()).
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% @doc Check flapping when a MQTT client connected.
-spec(check(emqx_types:client()) -> boolean()).
check(#{zone := Zone, client_id := ClientId}) ->
is_enabled(Zone) andalso check(ClientId, get_policy()).
check(ClientId, #{banned_interval := Interval}) ->
case ets:lookup(?FLAPPING_TAB, {banned, ClientId}) of
[] -> false;
[#flapping{banned_at = BannedAt}] ->
now_diff(BannedAt) < Interval
end.
%% @doc Detect flapping when a MQTT client disconnected.
-spec(detect(emqx_types:client()) -> boolean()).
detect(Client = #{zone := Zone}) ->
is_enabled(Zone) andalso detect(Client, get_policy()).
detect(#{client_id := ClientId, peername := Peername},
Policy = #{threshold := Threshold}) ->
try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of
Cnt when Cnt < Threshold -> false;
_Cnt -> case ets:lookup(?FLAPPING_TAB, ClientId) of
[Flapping] ->
ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}),
true;
[] -> false
end
catch
error:badarg ->
%% Create a flapping record.
Flapping = #flapping{client_id = ClientId,
peername = Peername,
started_at = emqx_time:now_ms(),
detect_cnt = 1
},
true = ets:insert(?FLAPPING_TAB, Flapping),
false
end.
-compile({inline, [is_enabled/1, get_policy/0, now_diff/1]}).
-spec(is_enabled(emqx_types:zone()) -> boolean()).
is_enabled(Zone) ->
emqx_zone:get(Zone, enable_flapping_detect, false).
get_policy() ->
emqx:get_env(flapping_detect_policy, ?DEFAULT_DETECT_POLICY).
now_diff(TS) -> emqx_time:now_ms() - TS.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
Interval = emqx:get_env(flapping_clean_interval, ?default_flapping_clean_interval),
TabOpts = [ public
, set
, {keypos, 2}
, {write_concurrency, true}
, {read_concurrency, true}],
ok = emqx_tables:new(?FLAPPING_TAB, TabOpts),
{ok, initialized, #{timer_interval => Interval}}.
#{duration := Duration, banned_interval := Interval} = get_policy(),
ok = emqx_tables:new(?FLAPPING_TAB, [public, set,
{keypos, 2},
{read_concurrency, true},
{write_concurrency, true}
]),
State = #{time => max(Duration, Interval) + 1, tref => undefined},
{ok, ensure_timer(State), hibernate}.
callback_mode() -> [state_functions, state_enter].
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
initialized(enter, _OldState, #{timer_interval := Time}) ->
Action = {state_timeout, Time, clean_expired_records},
{keep_state_and_data, Action};
initialized(state_timeout, clean_expired_records, #{}) ->
clean_expired_records(),
repeat_state_and_data.
handle_cast({detected, Flapping = #flapping{client_id = ClientId,
peername = Peername,
started_at = StartedAt,
detect_cnt = DetectCnt},
#{duration := Duration}}, State) ->
case (Interval = now_diff(StartedAt)) < Duration of
true -> %% Flapping happened:(
%% Log first
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:format(Peername), DetectCnt, Duration]),
%% TODO: Send Alarm
%% Banned.
BannedFlapping = Flapping#flapping{client_id = {banned, ClientId},
banned_at = emqx_time:now_ms()
},
ets:insert(?FLAPPING_TAB, BannedFlapping);
false ->
?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:format(Peername), DetectCnt, Interval]),
ets:delete_object(?FLAPPING_TAB, Flapping)
end,
{noreply, State};
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
terminate(_Reason, _StateName, _State) ->
emqx_tables:delete(?FLAPPING_TAB),
handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) ->
with_flapping_tab(fun expire_flapping/2,
[emqx_time:now_ms(), get_policy()]),
{noreply, ensure_timer(State#{tref => undefined}), hibernate};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% @doc clean expired records in ets
clean_expired_records() ->
NowTime = emqx_time:now_secs(),
MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}],
ets:select_delete(?FLAPPING_TAB, MatchSpec).
ensure_timer(State = #{time := Time, tref := undefined}) ->
State#{tref => emqx_misc:start_timer(Time, expire_flapping)};
ensure_timer(State) -> State.
with_flapping_tab(Fun, Args) ->
case ets:info(?FLAPPING_TAB, size) of
undefined -> ok;
0 -> ok;
_Size -> erlang:apply(Fun, Args)
end.
expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) ->
ets:select_delete(?FLAPPING_TAB,
[{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
[{'<', '$1', NowTime-Duration}], [true]},
{#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'},
[{'<', '$1', NowTime-Interval}], [true]}]).