Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-04-25 03:56:46 +08:00
commit af6ad8a90f
5 changed files with 22 additions and 48 deletions

View File

@ -694,7 +694,7 @@ zone.external.flapping_threshold = 10, 1m
## -s: second
##
## Default: 1h, 1 hour
zone.external.flapping_expiry_interval = 1h
zone.external.flapping_banned_expiry_interval = 1h
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##
@ -789,7 +789,7 @@ zone.internal.flapping_threshold = 10, 1m
## -s: second
##
## Default: 1h, 1 hour
zone.internal.flapping_expiry_interval = 1h
zone.internal.flapping_banned_expiry_interval = 1h
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##

View File

@ -836,7 +836,7 @@ end}.
{datatype, string}
]}.
{mapping, "zone.$name.flapping_expiry_interval", "emqx.zones", [
{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [
{datatype, {duration, s}}
]}.

View File

@ -207,4 +207,3 @@ stats_fun() ->
undefined -> ok;
Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
end.

View File

@ -50,27 +50,23 @@
%% the expiry time unit is minutes.
-spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()).
init_flapping(ClientId, Interval) ->
#flapping{ client_id = ClientId
, check_count = 1
, timestamp = emqx_time:now_secs() + Interval
}.
#flapping{client_id = ClientId,
check_count = 1,
timestamp = emqx_time:now_secs() + Interval}.
%% @doc This function is used to initialize flapping records
%% the expiry time unit is minutes.
-spec(check( Action :: atom()
, ClientId :: binary()
, Threshold :: {integer(), integer()})
-> flapping_state()).
-spec(check(Action :: atom(), ClientId :: binary(),
Threshold :: {integer(), integer()}) -> flapping_state()).
check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) ->
check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)).
-spec(check( Action :: atom()
, ClientId :: binary()
, Threshold :: {integer(), integer()}
, InitFlapping :: flapping_record())
-> flapping_state()).
-spec(check(Action :: atom(), ClientId :: binary(),
Threshold :: {integer(), integer()},
InitFlapping :: flapping_record()) -> flapping_state()).
check(Action, ClientId, Threshold, InitFlapping) ->
try ets:update_counter(?FLAPPING_TAB, ClientId, {_Pos = #flapping.check_count, 1}) of
case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of
1 -> ok;
CheckCount ->
case ets:lookup(?FLAPPING_TAB, ClientId) of
[Flapping] ->
@ -78,23 +74,14 @@ check(Action, ClientId, Threshold, InitFlapping) ->
_Flapping ->
ok
end
catch
error:badarg ->
ets:insert_new(?FLAPPING_TAB, InitFlapping),
ok
end.
-spec(check_flapping( Action :: atom()
, CheckTimes :: integer()
, Threshold :: {integer(), integer()}
, InitFlapping :: flapping_record())
-> flapping_state()).
check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval},
check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
Flapping = #flapping{ client_id = ClientId
, timestamp = Timestamp }) ->
case emqx_time:now_secs() of
NowTimestamp when NowTimestamp =< Timestamp,
CheckTimes > TimesThreshold ->
CheckCount > TimesThreshold ->
ets:delete(?FLAPPING_TAB, ClientId),
flapping;
NowTimestamp when NowTimestamp > Timestamp,
@ -110,7 +97,7 @@ check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval},
%%--------------------------------------------------------------------
%% gen_statem callbacks
%%--------------------------------------------------------------------
-spec(start_link(TimerInterval :: integer()) -> startlink_ret()).
-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()).
start_link(TimerInterval) ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []).
@ -145,17 +132,6 @@ terminate(_Reason, _StateName, _State) ->
%% @doc clean expired records in ets
clean_expired_records() ->
Records = ets:tab2list(?FLAPPING_TAB),
traverse_records(Records).
traverse_records([]) ->
ok;
traverse_records([#flapping{client_id = ClientId,
timestamp = Timestamp} | LeftRecords]) ->
case emqx_time:now_secs() > Timestamp of
true ->
ets:delete(?FLAPPING_TAB, ClientId);
false ->
true
end,
traverse_records(LeftRecords).
NowTime = emqx_time:now_secs(),
MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}],
ets:select_delete(?FLAPPING_TAB, MatchSpec).

View File

@ -943,16 +943,15 @@ flag(true) -> 1.
do_flapping_detect(Action, #pstate{zone = Zone,
client_id = ClientId,
enable_flapping_detect = true}) ->
ExpiryInterval = emqx_zone:get_env(Zone, flapping_expiry_interval, 3600000),
BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
Until = erlang:system_time(second) + ExpiryInterval,
Until = erlang:system_time(second) + BanExpiryInterval,
case emqx_flapping:check(Action, ClientId, Threshold) of
flapping ->
emqx_banned:add(#banned{who = {client_id, ClientId},
reason = <<"flapping">>,
by = <<"flapping_checker">>,
until = Until
}),
until = Until}),
ok;
_Other ->
ok