Merge pull request #9136 from emqx/enhance-flapping-detect-5
refactor: enhance the flapping detect accuracy
This commit is contained in:
commit
6f077c47e7
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
* Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973)
|
* Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973)
|
||||||
* Use milliseconds internally in emqx_delayed to store the publish time, improving precision.[#9060](https://github.com/emqx/emqx/pull/9060)
|
* Use milliseconds internally in emqx_delayed to store the publish time, improving precision.[#9060](https://github.com/emqx/emqx/pull/9060)
|
||||||
|
* More rigorous checking of flapping to improve stability of the system. [#9136](https://github.com/emqx/emqx/pull/9136)
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
||||||
|
|
|
@ -345,7 +345,8 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
|
||||||
fun check_connect/2,
|
fun check_connect/2,
|
||||||
fun enrich_client/2,
|
fun enrich_client/2,
|
||||||
fun set_log_meta/2,
|
fun set_log_meta/2,
|
||||||
fun check_banned/2
|
fun check_banned/2,
|
||||||
|
fun count_flapping_event/2
|
||||||
],
|
],
|
||||||
ConnPkt,
|
ConnPkt,
|
||||||
Channel#channel{conn_state = connecting}
|
Channel#channel{conn_state = connecting}
|
||||||
|
@ -1260,14 +1261,11 @@ handle_info(
|
||||||
{sock_closed, Reason},
|
{sock_closed, Reason},
|
||||||
Channel =
|
Channel =
|
||||||
#channel{
|
#channel{
|
||||||
conn_state = ConnState,
|
conn_state = ConnState
|
||||||
clientinfo = ClientInfo = #{zone := Zone}
|
|
||||||
}
|
}
|
||||||
) when
|
) when
|
||||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||||
->
|
->
|
||||||
emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
|
|
||||||
emqx_flapping:detect(ClientInfo),
|
|
||||||
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
||||||
case maybe_shutdown(Reason, Channel1) of
|
case maybe_shutdown(Reason, Channel1) of
|
||||||
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
||||||
|
@ -1636,6 +1634,14 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Flapping
|
||||||
|
|
||||||
|
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||||
|
emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
|
||||||
|
emqx_flapping:detect(ClientInfo),
|
||||||
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Authenticate
|
%% Authenticate
|
||||||
|
|
||||||
|
|
|
@ -207,14 +207,6 @@ init_per_suite(Config) ->
|
||||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
|
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
|
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
|
||||||
%% Access Control Meck
|
|
||||||
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
|
||||||
ok = meck:expect(
|
|
||||||
emqx_access_control,
|
|
||||||
authenticate,
|
|
||||||
fun(_) -> {ok, #{is_superuser => false}} end
|
|
||||||
),
|
|
||||||
ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
|
|
||||||
%% Broker Meck
|
%% Broker Meck
|
||||||
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
||||||
%% Hooks Meck
|
%% Hooks Meck
|
||||||
|
@ -234,7 +226,6 @@ init_per_suite(Config) ->
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
meck:unload([
|
meck:unload([
|
||||||
emqx_access_control,
|
|
||||||
emqx_metrics,
|
emqx_metrics,
|
||||||
emqx_session,
|
emqx_session,
|
||||||
emqx_broker,
|
emqx_broker,
|
||||||
|
@ -244,11 +235,21 @@ end_per_suite(_Config) ->
|
||||||
]).
|
]).
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
%% Access Control Meck
|
||||||
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(
|
||||||
|
emqx_access_control,
|
||||||
|
authenticate,
|
||||||
|
fun(_) -> {ok, #{is_superuser => false}} end
|
||||||
|
),
|
||||||
|
ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
|
||||||
|
%% Set confs
|
||||||
OldConf = set_test_listener_confs(),
|
OldConf = set_test_listener_confs(),
|
||||||
emqx_common_test_helpers:start_apps([]),
|
emqx_common_test_helpers:start_apps([]),
|
||||||
[{config, OldConf} | Config].
|
[{config, OldConf} | Config].
|
||||||
|
|
||||||
end_per_testcase(_TestCase, Config) ->
|
end_per_testcase(_TestCase, Config) ->
|
||||||
|
meck:unload([emqx_access_control]),
|
||||||
emqx_config:put(?config(config, Config)),
|
emqx_config:put(?config(config, Config)),
|
||||||
emqx_common_test_helpers:stop_apps([]),
|
emqx_common_test_helpers:stop_apps([]),
|
||||||
Config.
|
Config.
|
||||||
|
@ -1115,6 +1116,32 @@ t_ws_cookie_init(_) ->
|
||||||
),
|
),
|
||||||
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test cases for other mechnisms
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_flapping_detect(_) ->
|
||||||
|
emqx_config:put_zone_conf(default, [flapping_detect, enable], true),
|
||||||
|
Parent = self(),
|
||||||
|
ok = meck:expect(
|
||||||
|
emqx_cm,
|
||||||
|
open_session,
|
||||||
|
fun(true, _ClientInfo, _ConnInfo) ->
|
||||||
|
{ok, #{session => session(), present => false}}
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
|
||||||
|
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
|
||||||
|
IdleChannel = channel(#{conn_state => idle}),
|
||||||
|
{shutdown, not_authorized, _ConnAck, _Channel} =
|
||||||
|
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
|
||||||
|
receive
|
||||||
|
flapping_detect -> ok
|
||||||
|
after 2000 ->
|
||||||
|
?assert(false, "Flapping detect should be exected in connecting progress")
|
||||||
|
end,
|
||||||
|
meck:unload([emqx_flapping]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue