Merge pull request #9045 from HJianBo/enhance-flapping-detect
refactor: enhance the flapping detect accuracy
This commit is contained in:
commit
a1affa94b5
|
@ -32,11 +32,14 @@ File format:
|
||||||
- Added a test to prevent a last will testament message to be
|
- Added a test to prevent a last will testament message to be
|
||||||
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
|
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
|
||||||
|
|
||||||
|
- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045)
|
||||||
|
|
||||||
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
|
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
|
||||||
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
|
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
|
||||||
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
|
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
|
||||||
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
|
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
|
||||||
|
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
||||||
|
|
|
@ -288,6 +288,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
fun enrich_client/2,
|
fun enrich_client/2,
|
||||||
fun set_log_meta/2,
|
fun set_log_meta/2,
|
||||||
fun check_banned/2,
|
fun check_banned/2,
|
||||||
|
fun count_flapping_event/2,
|
||||||
fun auth_connect/2
|
fun auth_connect/2
|
||||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||||
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||||
|
@ -1022,11 +1023,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
||||||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
||||||
shutdown(Reason, Channel);
|
shutdown(Reason, Channel);
|
||||||
|
|
||||||
handle_info({sock_closed, Reason}, Channel =
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) ->
|
||||||
#channel{conn_state = connected,
|
|
||||||
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
||||||
emqx_zone:enable_flapping_detect(Zone)
|
|
||||||
andalso emqx_flapping:detect(ClientInfo),
|
|
||||||
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
||||||
case maybe_shutdown(Reason, Channel1) of
|
case maybe_shutdown(Reason, Channel1) of
|
||||||
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
||||||
|
@ -1335,6 +1332,13 @@ auth_connect(#mqtt_packet_connect{password = Password},
|
||||||
{error, emqx_reason_codes:connack_error(Reason)}
|
{error, emqx_reason_codes:connack_error(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Flapping
|
||||||
|
|
||||||
|
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||||
|
_ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
|
||||||
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enhanced Authentication
|
%% Enhanced Authentication
|
||||||
|
|
||||||
|
|
|
@ -33,11 +33,6 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
%% CM Meck
|
%% CM Meck
|
||||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||||
%% Access Control Meck
|
|
||||||
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
|
||||||
ok = meck:expect(emqx_access_control, authenticate,
|
|
||||||
fun(_) -> {ok, #{auth_result => success}} end),
|
|
||||||
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
|
|
||||||
%% Broker Meck
|
%% Broker Meck
|
||||||
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
||||||
%% Hooks Meck
|
%% Hooks Meck
|
||||||
|
@ -53,8 +48,7 @@ init_per_suite(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
meck:unload([emqx_access_control,
|
meck:unload([emqx_metrics,
|
||||||
emqx_metrics,
|
|
||||||
emqx_session,
|
emqx_session,
|
||||||
emqx_broker,
|
emqx_broker,
|
||||||
emqx_hooks,
|
emqx_hooks,
|
||||||
|
@ -63,10 +57,16 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
meck:new(emqx_zone, [passthrough, no_history, no_link]),
|
meck:new(emqx_zone, [passthrough, no_history, no_link]),
|
||||||
|
%% Access Control Meck
|
||||||
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(emqx_access_control, authenticate,
|
||||||
|
fun(_) -> {ok, #{auth_result => success}} end),
|
||||||
|
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_TestCase, Config) ->
|
end_per_testcase(_TestCase, Config) ->
|
||||||
meck:unload([emqx_zone]),
|
meck:unload([emqx_zone]),
|
||||||
|
meck:unload([emqx_access_control]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -853,6 +853,30 @@ t_ws_cookie_init(_) ->
|
||||||
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]),
|
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]),
|
||||||
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test cases for other mechnisms
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_flapping_detect(_) ->
|
||||||
|
Parent = self(),
|
||||||
|
ok = meck:expect(emqx_cm, open_session,
|
||||||
|
fun(true, _ClientInfo, _ConnInfo) ->
|
||||||
|
{ok, #{session => session(), present => false}}
|
||||||
|
end),
|
||||||
|
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
|
||||||
|
ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
|
||||||
|
ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end),
|
||||||
|
IdleChannel = channel(#{conn_state => idle}),
|
||||||
|
{shutdown, not_authorized, _ConnAck, _Channel} =
|
||||||
|
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
|
||||||
|
receive
|
||||||
|
flapping_detect -> ok
|
||||||
|
after 2000 ->
|
||||||
|
?assert(false, "Flapping detect should be exected in connecting progress")
|
||||||
|
end,
|
||||||
|
meck:unload([emqx_flapping]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -72,4 +72,4 @@ t_expired_detecting(_) ->
|
||||||
(_) -> false end, ets:tab2list(emqx_flapping))),
|
(_) -> false end, ets:tab2list(emqx_flapping))),
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
|
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
|
||||||
(_) -> true end, ets:tab2list(emqx_flapping))).
|
(_) -> true end, ets:tab2list(emqx_flapping))).
|
||||||
|
|
Loading…
Reference in New Issue