From 13f3dafe2297df37aab0d48511889275fb13350f Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Sun, 24 Apr 2022 14:10:51 +0800 Subject: [PATCH 1/3] refactor: enhance the flapping detect accuracy Count the `flapping` event as long as a client try to connect to server whatever it suceed or failed. It is more helpful to improve stablebility. --- src/emqx_channel.erl | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index b8f3c5b2c..38cc8f989 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -288,7 +288,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun enrich_client/2, fun set_log_meta/2, fun check_banned/2, - fun auth_connect/2 + fun auth_connect/2, + fun flapping_detect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> NChannel1 = NChannel#channel{ @@ -1022,11 +1023,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> shutdown(Reason, Channel); -handle_info({sock_closed, Reason}, Channel = - #channel{conn_state = connected, - clientinfo = ClientInfo = #{zone := Zone}}) -> - emqx_zone:enable_flapping_detect(Zone) - andalso emqx_flapping:detect(ClientInfo), +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) -> Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; @@ -1335,6 +1332,13 @@ auth_connect(#mqtt_packet_connect{password = Password}, {error, emqx_reason_codes:connack_error(Reason)} end. +%%-------------------------------------------------------------------- +%% Flapping + +flapping_detect(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> + _ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), + {ok, Channel}. + %%-------------------------------------------------------------------- %% Enhanced Authentication From 165842ded4822ef49cf7d6f6743816cc7539a816 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 26 Sep 2022 14:08:28 +0800 Subject: [PATCH 2/3] chore: update changes.md --- CHANGES-4.3.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 713f89f37..62df98f2d 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -32,6 +32,8 @@ File format: - Added a test to prevent a last will testament message to be published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) +- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045) + ### Bug fixes - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) From e2b0048e88850389f42e3b6c4abbae60f183dbf5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 26 Sep 2022 15:03:30 +0800 Subject: [PATCH 3/3] refactor(flapping): count flapping event if connecting failed --- src/emqx_channel.erl | 6 +++--- test/emqx_channel_SUITE.erl | 38 +++++++++++++++++++++++++++++------- test/emqx_flapping_SUITE.erl | 2 +- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 38cc8f989..1c0c6ee72 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -288,8 +288,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun enrich_client/2, fun set_log_meta/2, fun check_banned/2, - fun auth_connect/2, - fun flapping_detect/2 + fun count_flapping_event/2, + fun auth_connect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> NChannel1 = NChannel#channel{ @@ -1335,7 +1335,7 @@ auth_connect(#mqtt_packet_connect{password = Password}, %%-------------------------------------------------------------------- %% Flapping -flapping_detect(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> +count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> _ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), {ok, Channel}. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 00edde5b1..4f250bd4a 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -33,11 +33,6 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> %% CM Meck ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), - %% Access Control Meck - ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), - ok = meck:expect(emqx_access_control, authenticate, - fun(_) -> {ok, #{auth_result => success}} end), - ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end), %% Broker Meck ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), %% Hooks Meck @@ -53,8 +48,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - meck:unload([emqx_access_control, - emqx_metrics, + meck:unload([emqx_metrics, emqx_session, emqx_broker, emqx_hooks, @@ -63,10 +57,16 @@ end_per_suite(_Config) -> init_per_testcase(_TestCase, Config) -> meck:new(emqx_zone, [passthrough, no_history, no_link]), + %% Access Control Meck + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, authenticate, + fun(_) -> {ok, #{auth_result => success}} end), + ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end), Config. end_per_testcase(_TestCase, Config) -> meck:unload([emqx_zone]), + meck:unload([emqx_access_control]), Config. %%-------------------------------------------------------------------- @@ -853,6 +853,30 @@ t_ws_cookie_init(_) -> Channel = emqx_channel:init(ConnInfo, [{zone, zone}]), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). +%%-------------------------------------------------------------------- +%% Test cases for other mechnisms +%%-------------------------------------------------------------------- + +t_flapping_detect(_) -> + Parent = self(), + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end), + ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end), + ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end), + IdleChannel = channel(#{conn_state => idle}), + {shutdown, not_authorized, _ConnAck, _Channel} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel), + receive + flapping_detect -> ok + after 2000 -> + ?assert(false, "Flapping detect should be exected in connecting progress") + end, + meck:unload([emqx_flapping]). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index eadd89192..8074a8607 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -72,4 +72,4 @@ t_expired_detecting(_) -> (_) -> false end, ets:tab2list(emqx_flapping))), timer:sleep(200), ?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false; - (_) -> true end, ets:tab2list(emqx_flapping))). \ No newline at end of file + (_) -> true end, ets:tab2list(emqx_flapping))).