Merge pull request #3033 from emqx/flapping2

Optimize flapping and banned modules
This commit is contained in:
tigercl 2019-11-18 10:50:35 +08:00 committed by GitHub
commit b10cd85ba3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 113 deletions

View File

@ -154,15 +154,13 @@
%% Banned %% Banned
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-type(banned_who() :: {clientid, binary()}
| {username, binary()}
| {ip_address, inet:ip_address()}).
-record(banned, { -record(banned, {
who :: banned_who(), who :: {clientid, binary()}
reason :: binary(), | {username, binary()}
| {ip_address, inet:ip_address()},
by :: binary(), by :: binary(),
desc :: binary(), reason :: binary(),
at :: integer(),
until :: integer() until :: integer()
}). }).

View File

@ -33,7 +33,7 @@
-export([start_link/0, stop/0]). -export([start_link/0, stop/0]).
-export([ check/1 -export([ check/1
, add/1 , create/1
, delete/1 , delete/1
, info/1 , info/1
]). ]).
@ -74,21 +74,39 @@ start_link() ->
stop() -> gen_server:stop(?MODULE). stop() -> gen_server:stop(?MODULE).
-spec(check(emqx_types:clientinfo()) -> boolean()). -spec(check(emqx_types:clientinfo()) -> boolean()).
check(#{clientid := ClientId, check(ClientInfo) ->
username := Username, do_check({clientid, maps:get(clientid, ClientInfo, undefined)})
peerhost := IPAddr}) -> orelse do_check({username, maps:get(username, ClientInfo, undefined)})
ets:member(?BANNED_TAB, {clientid, ClientId}) orelse do_check({peerhost, maps:get(peerhost, ClientInfo, undefined)}).
orelse ets:member(?BANNED_TAB, {username, Username})
orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).
-spec(add(emqx_types:banned()) -> ok). do_check({_, undefined}) ->
add(Banned) when is_record(Banned, banned) -> false;
do_check(Who) when is_tuple(Who) ->
case mnesia:dirty_read(?BANNED_TAB, Who) of
[] -> false;
[#banned{until = Until}] ->
Until > erlang:system_time(millisecond)
end.
-spec(create(emqx_types:banned()) -> ok).
create(#{who := Who,
by := By,
reason := Reason,
at := At,
until := Until}) ->
mnesia:dirty_write(?BANNED_TAB, #banned{who = Who,
by = By,
reason = Reason,
at = At,
until = Until});
create(Banned) when is_record(Banned, banned) ->
mnesia:dirty_write(?BANNED_TAB, Banned). mnesia:dirty_write(?BANNED_TAB, Banned).
-spec(delete({clientid, emqx_types:clientid()} -spec(delete({clientid, emqx_types:clientid()}
| {username, emqx_types:username()} | {username, emqx_types:username()}
| {peerhost, emqx_types:peerhost()}) -> ok). | {peerhost, emqx_types:peerhost()}) -> ok).
delete(Key) -> mnesia:dirty_delete(?BANNED_TAB, Key). delete(Who) ->
mnesia:dirty_delete(?BANNED_TAB, Who).
info(InfoKey) -> info(InfoKey) ->
mnesia:table_info(?BANNED_TAB, InfoKey). mnesia:table_info(?BANNED_TAB, InfoKey).

View File

@ -219,7 +219,6 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
fun enrich_client/2, fun enrich_client/2,
fun set_logger_meta/2, fun set_logger_meta/2,
fun check_banned/2, fun check_banned/2,
fun check_flapping/2,
fun auth_connect/2], ConnPkt, Channel) of fun auth_connect/2], ConnPkt, Channel) of
{ok, NConnPkt, NChannel} -> {ok, NConnPkt, NChannel} ->
process_connect(NConnPkt, NChannel); process_connect(NConnPkt, NChannel);
@ -942,7 +941,7 @@ set_logger_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId). emqx_logger:set_metadata_clientid(ClientId).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Check banned/flapping %% Check banned
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
@ -951,13 +950,6 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
false -> ok false -> ok
end. end.
check_flapping(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
case emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:check(ClientInfo) of
true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
false -> ok
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Auth Connect %% Auth Connect
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -27,7 +27,7 @@
-export([start_link/0, stop/0]). -export([start_link/0, stop/0]).
%% API %% API
-export([check/1, detect/1]). -export([detect/1]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
@ -54,8 +54,7 @@
clientid :: emqx_types:clientid(), clientid :: emqx_types:clientid(),
peerhost :: emqx_types:peerhost(), peerhost :: emqx_types:peerhost(),
started_at :: pos_integer(), started_at :: pos_integer(),
detect_cnt :: pos_integer(), detect_cnt :: pos_integer()
banned_at :: pos_integer()
}). }).
-opaque(flapping() :: #flapping{}). -opaque(flapping() :: #flapping{}).
@ -68,27 +67,14 @@ start_link() ->
stop() -> gen_server:stop(?MODULE). stop() -> gen_server:stop(?MODULE).
%% @doc Check flapping when a MQTT client connected.
-spec(check(emqx_types:clientinfo()) -> boolean()).
check(#{clientid := ClientId}) ->
check(ClientId, get_policy()).
check(ClientId, #{banned_interval := Interval}) ->
case ets:lookup(?FLAPPING_TAB, {banned, ClientId}) of
[] -> false;
[#flapping{banned_at = BannedAt}] ->
now_diff(BannedAt) < Interval
end.
%% @doc Detect flapping when a MQTT client disconnected. %% @doc Detect flapping when a MQTT client disconnected.
-spec(detect(emqx_types:clientinfo()) -> boolean()). -spec(detect(emqx_types:clientinfo()) -> boolean()).
detect(Client) -> detect(Client, get_policy()). detect(Client) -> detect(Client, get_policy()).
detect(#{clientid := ClientId, peerhost := PeerHost}, detect(#{clientid := ClientId, peerhost := PeerHost}, Policy = #{threshold := Threshold}) ->
Policy = #{threshold := Threshold}) ->
try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of
Cnt when Cnt < Threshold -> false; Cnt when Cnt < Threshold -> false;
_Cnt -> case ets:lookup(?FLAPPING_TAB, ClientId) of _Cnt -> case ets:take(?FLAPPING_TAB, ClientId) of
[Flapping] -> [Flapping] ->
ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}), ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}),
true; true;
@ -118,52 +104,44 @@ now_diff(TS) -> erlang:system_time(millisecond) - TS.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
#{duration := Duration, banned_interval := Interval} = get_policy(),
ok = emqx_tables:new(?FLAPPING_TAB, [public, set, ok = emqx_tables:new(?FLAPPING_TAB, [public, set,
{keypos, 2}, {keypos, 2},
{read_concurrency, true}, {read_concurrency, true},
{write_concurrency, true} {write_concurrency, true}
]), ]),
State = #{time => max(Duration, Interval) + 1, tref => undefined}, {ok, #{}, hibernate}.
{ok, ensure_timer(State), hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({detected, Flapping = #flapping{clientid = ClientId, handle_cast({detected, #flapping{clientid = ClientId,
peerhost = PeerHost, peerhost = PeerHost,
started_at = StartedAt, started_at = StartedAt,
detect_cnt = DetectCnt}, detect_cnt = DetectCnt},
#{duration := Duration}}, State) -> #{duration := Duration, banned_interval := Interval}}, State) ->
case (Interval = now_diff(StartedAt)) < Duration of case now_diff(StartedAt) < Duration of
true -> %% Flapping happened:( true -> %% Flapping happened:(
%% Log first
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]), [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]),
%% Banned. Now = erlang:system_time(millisecond),
BannedFlapping = Flapping#flapping{clientid = {banned, ClientId}, Banned = #banned{who = {clientid, ClientId},
banned_at = erlang:system_time(millisecond) by = <<"flapping detector">>,
}, reason = <<"flapping is detected">>,
alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}), at = Now,
ets:insert(?FLAPPING_TAB, BannedFlapping); until = Now + Interval},
alarm_handler:set_alarm({{flapping_detected, ClientId}, Banned}),
emqx_banned:create(Banned);
false -> false ->
?LOG(warning, "~s(~s) disconnected ~w times in ~wms", ?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval]) [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval])
end, end,
ets:delete_object(?FLAPPING_TAB, Flapping),
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]), ?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) ->
with_flapping_tab(fun expire_flapping/2,
[erlang:system_time(millisecond),
get_policy()]),
{noreply, ensure_timer(State#{tref => undefined}), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
@ -173,34 +151,3 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
ensure_timer(State = #{time := Time, tref := undefined}) ->
State#{tref => emqx_misc:start_timer(Time, expire_flapping)};
ensure_timer(State) -> State.
with_flapping_tab(Fun, Args) ->
case ets:info(?FLAPPING_TAB, size) of
undefined -> ok;
0 -> ok;
_Size -> erlang:apply(Fun, Args)
end.
expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) ->
case ets:select(?FLAPPING_TAB,
[{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
[{'<', '$1', NowTime-Duration}], ['$_']},
{#flapping{clientid = {banned, '_'}, banned_at = '$1', _ = '_'},
[{'<', '$1', NowTime-Interval}], ['$_']}]) of
[] -> ok;
Flappings ->
lists:foreach(fun(Flapping = #flapping{clientid = {banned, ClientId}}) ->
ets:delete_object(?FLAPPING_TAB, Flapping),
alarm_handler:clear_alarm({flapping_detected, ClientId});
(_) -> ok
end, Flappings)
end.

View File

@ -38,20 +38,20 @@ end_per_suite(_Config) ->
t_add_delete(_) -> t_add_delete(_) ->
Banned = #banned{who = {clientid, <<"TestClient">>}, Banned = #banned{who = {clientid, <<"TestClient">>},
reason = <<"test">>,
by = <<"banned suite">>, by = <<"banned suite">>,
desc = <<"test">>, reason = <<"test">>,
at = erlang:system_time(second),
until = erlang:system_time(second) + 1000 until = erlang:system_time(second) + 1000
}, },
ok = emqx_banned:add(Banned), ok = emqx_banned:create(Banned),
?assertEqual(1, emqx_banned:info(size)), ?assertEqual(1, emqx_banned:info(size)),
ok = emqx_banned:delete({clientid, <<"TestClient">>}), ok = emqx_banned:delete({clientid, <<"TestClient">>}),
?assertEqual(0, emqx_banned:info(size)). ?assertEqual(0, emqx_banned:info(size)).
t_check(_) -> t_check(_) ->
ok = emqx_banned:add(#banned{who = {clientid, <<"BannedClient">>}}), ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}}),
ok = emqx_banned:add(#banned{who = {username, <<"BannedUser">>}}), ok = emqx_banned:create(#banned{who = {username, <<"BannedUser">>}}),
ok = emqx_banned:add(#banned{who = {ipaddr, {192,168,0,1}}}), ok = emqx_banned:create(#banned{who = {peerhost, {192,168,0,1}}}),
?assertEqual(3, emqx_banned:info(size)), ?assertEqual(3, emqx_banned:info(size)),
ClientInfo1 = #{clientid => <<"BannedClient">>, ClientInfo1 = #{clientid => <<"BannedClient">>,
username => <<"user">>, username => <<"user">>,
@ -75,7 +75,7 @@ t_check(_) ->
?assertNot(emqx_banned:check(ClientInfo4)), ?assertNot(emqx_banned:check(ClientInfo4)),
ok = emqx_banned:delete({clientid, <<"BannedClient">>}), ok = emqx_banned:delete({clientid, <<"BannedClient">>}),
ok = emqx_banned:delete({username, <<"BannedUser">>}), ok = emqx_banned:delete({username, <<"BannedUser">>}),
ok = emqx_banned:delete({ipaddr, {192,168,0,1}}), ok = emqx_banned:delete({peerhost, {192,168,0,1}}),
?assertNot(emqx_banned:check(ClientInfo1)), ?assertNot(emqx_banned:check(ClientInfo1)),
?assertNot(emqx_banned:check(ClientInfo2)), ?assertNot(emqx_banned:check(ClientInfo2)),
?assertNot(emqx_banned:check(ClientInfo3)), ?assertNot(emqx_banned:check(ClientInfo3)),
@ -84,9 +84,8 @@ t_check(_) ->
t_unused(_) -> t_unused(_) ->
{ok, Banned} = emqx_banned:start_link(), {ok, Banned} = emqx_banned:start_link(),
ok = emqx_banned:add(#banned{who = {clientid, <<"BannedClient">>}, ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>},
until = erlang:system_time(second) until = erlang:system_time(second)}),
}),
?assertEqual(ignored, gen_server:call(Banned, unexpected_req)), ?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)), ?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
?assertEqual(ok, Banned ! ok), ?assertEqual(ok, Banned ! ok),

View File

@ -45,13 +45,13 @@ t_detect_check(_) ->
peerhost => {127,0,0,1} peerhost => {127,0,0,1}
}, },
false = emqx_flapping:detect(ClientInfo), false = emqx_flapping:detect(ClientInfo),
false = emqx_flapping:check(ClientInfo), false = emqx_banned:check(ClientInfo),
false = emqx_flapping:detect(ClientInfo), false = emqx_flapping:detect(ClientInfo),
false = emqx_flapping:check(ClientInfo), false = emqx_banned:check(ClientInfo),
true = emqx_flapping:detect(ClientInfo), true = emqx_flapping:detect(ClientInfo),
timer:sleep(100), timer:sleep(100),
true = emqx_flapping:check(ClientInfo), true = emqx_banned:check(ClientInfo),
timer:sleep(300), timer:sleep(200),
false = emqx_flapping:check(ClientInfo), false = emqx_banned:check(ClientInfo),
ok = emqx_flapping:stop(). ok = emqx_flapping:stop().