fix(config): make flapping work with the new config

This commit is contained in:
Shawn 2021-07-13 16:39:59 +08:00
parent 871353704a
commit b123299c70
6 changed files with 25 additions and 23 deletions

View File

@ -382,9 +382,9 @@ normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark})
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [HighWatermark])); list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [HighWatermark]));
normalize_message(high_cpu_usage, #{usage := Usage}) -> normalize_message(high_cpu_usage, #{usage := Usage}) ->
list_to_binary(io_lib:format("~p% cpu usage", [Usage])); list_to_binary(io_lib:format("~s cpu usage", [Usage]));
normalize_message(too_many_processes, #{usage := Usage}) -> normalize_message(too_many_processes, #{usage := Usage}) ->
list_to_binary(io_lib:format("~p% process usage", [Usage])); list_to_binary(io_lib:format("~s process usage", [Usage]));
normalize_message(partition, #{occurred := Node}) -> normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->

View File

@ -125,7 +125,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
by = <<"flapping detector">>, by = <<"flapping detector">>,
reason = <<"flapping is detected">>, reason = <<"flapping is detected">>,
at = Now, at = Now,
until = Now + Interval}, until = Now + (Interval div 1000)},
emqx_banned:create(Banned); emqx_banned:create(Banned);
false -> false ->
?LOG(warning, "~s(~s) disconnected ~w times in ~wms", ?LOG(warning, "~s(~s) disconnected ~w times in ~wms",

View File

@ -143,7 +143,7 @@ handle_info({timeout, _Timer, check}, State) ->
case emqx_vm:cpu_util() of %% TODO: should be improved? case emqx_vm:cpu_util() of %% TODO: should be improved?
0 -> ok; 0 -> ok;
Busy when Busy >= CPUHighWatermark -> Busy when Busy >= CPUHighWatermark ->
emqx_alarm:activate(high_cpu_usage, #{usage => Busy, emqx_alarm:activate(high_cpu_usage, #{usage => io_lib:format("~p%", [Busy]),
high_watermark => CPUHighWatermark, high_watermark => CPUHighWatermark,
low_watermark => CPULowWatermark}), low_watermark => CPULowWatermark}),
start_check_timer(); start_check_timer();

View File

@ -42,19 +42,19 @@ end_per_suite(_Config) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_stats_fun(_) -> t_stats_fun(_) ->
?assertEqual(0, emqx_stats:getstat('subscribers.count')), Subscribers = emqx_stats:getstat('subscribers.count'),
?assertEqual(0, emqx_stats:getstat('subscriptions.count')), Subscriptions = emqx_stats:getstat('subscriptions.count'),
?assertEqual(0, emqx_stats:getstat('suboptions.count')), Subopts = emqx_stats:getstat('suboptions.count'),
ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
emqx_broker:stats_fun(), emqx_broker:stats_fun(),
ct:sleep(10), ct:sleep(10),
?assertEqual(2, emqx_stats:getstat('subscribers.count')), ?assertEqual(Subscribers + 2, emqx_stats:getstat('subscribers.count')),
?assertEqual(2, emqx_stats:getstat('subscribers.max')), ?assertEqual(Subscribers + 2, emqx_stats:getstat('subscribers.max')),
?assertEqual(2, emqx_stats:getstat('subscriptions.count')), ?assertEqual(Subscriptions + 2, emqx_stats:getstat('subscriptions.count')),
?assertEqual(2, emqx_stats:getstat('subscriptions.max')), ?assertEqual(Subscriptions + 2, emqx_stats:getstat('subscriptions.max')),
?assertEqual(2, emqx_stats:getstat('suboptions.count')), ?assertEqual(Subopts + 2, emqx_stats:getstat('suboptions.count')),
?assertEqual(2, emqx_stats:getstat('suboptions.max')). ?assertEqual(Subopts + 2, emqx_stats:getstat('suboptions.max')).
t_subscribed(_) -> t_subscribed(_) ->
emqx_broker:subscribe(<<"topic">>), emqx_broker:subscribe(<<"topic">>),

View File

@ -28,8 +28,8 @@ init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
emqx_config:put_listener_conf(default, mqtt_tcp, [flapping_detect], emqx_config:put_listener_conf(default, mqtt_tcp, [flapping_detect],
#{max_count => 3, #{max_count => 3,
window_time => 100, window_time => 100, % 0.1s
ban_time => 2 ban_time => 2000 %% 2s
}), }),
Config. Config.
@ -41,7 +41,7 @@ end_per_suite(_Config) ->
t_detect_check(_) -> t_detect_check(_) ->
ClientInfo = #{zone => default, ClientInfo = #{zone => default,
listener => mqtt_tcp, listener => mqtt_tcp,
clientid => <<"clientid">>, clientid => <<"client007">>,
peerhost => {127,0,0,1} peerhost => {127,0,0,1}
}, },
false = emqx_flapping:detect(ClientInfo), false = emqx_flapping:detect(ClientInfo),
@ -50,6 +50,8 @@ t_detect_check(_) ->
false = emqx_banned:check(ClientInfo), false = emqx_banned:check(ClientInfo),
true = emqx_flapping:detect(ClientInfo), true = emqx_flapping:detect(ClientInfo),
timer:sleep(50), timer:sleep(50),
ct:pal("the table emqx_banned: ~p, nowsec: ~p", [ets:tab2list(emqx_banned),
erlang:system_time(second)]),
true = emqx_banned:check(ClientInfo), true = emqx_banned:check(ClientInfo),
timer:sleep(3000), timer:sleep(3000),
false = emqx_banned:check(ClientInfo), false = emqx_banned:check(ClientInfo),
@ -63,11 +65,11 @@ t_detect_check(_) ->
t_expired_detecting(_) -> t_expired_detecting(_) ->
ClientInfo = #{zone => default, ClientInfo = #{zone => default,
listener => mqtt_tcp, listener => mqtt_tcp,
clientid => <<"clientid">>, clientid => <<"client008">>,
peerhost => {127,0,0,1}}, peerhost => {127,0,0,1}},
false = emqx_flapping:detect(ClientInfo), false = emqx_flapping:detect(ClientInfo),
?assertEqual(true, lists:any(fun({flapping, <<"clientid">>, _, _, _}) -> true; ?assertEqual(true, lists:any(fun({flapping, <<"client008">>, _, _, _}) -> true;
(_) -> false end, ets:tab2list(emqx_flapping))), (_) -> false end, ets:tab2list(emqx_flapping))),
timer:sleep(200), timer:sleep(200),
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false; ?assertEqual(true, lists:all(fun({flapping, <<"client008">>, _, _, _}) -> false;
(_) -> true end, ets:tab2list(emqx_flapping))). (_) -> true end, ets:tab2list(emqx_flapping))).

View File

@ -54,7 +54,7 @@ t_session_init(_) ->
#{receive_maximum => 64}), #{receive_maximum => 64}),
?assertEqual(#{}, emqx_session:info(subscriptions, Session)), ?assertEqual(#{}, emqx_session:info(subscriptions, Session)),
?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)), ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)),
?assertEqual(0, emqx_session:info(subscriptions_max, Session)), ?assertEqual(infinity, emqx_session:info(subscriptions_max, Session)),
?assertEqual(false, emqx_session:info(upgrade_qos, Session)), ?assertEqual(false, emqx_session:info(upgrade_qos, Session)),
?assertEqual(0, emqx_session:info(inflight_cnt, Session)), ?assertEqual(0, emqx_session:info(inflight_cnt, Session)),
?assertEqual(64, emqx_session:info(inflight_max, Session)), ?assertEqual(64, emqx_session:info(inflight_max, Session)),
@ -73,13 +73,13 @@ t_session_init(_) ->
t_session_info(_) -> t_session_info(_) ->
?assertMatch(#{subscriptions := #{}, ?assertMatch(#{subscriptions := #{},
upgrade_qos := false, upgrade_qos := false,
retry_interval := 0, retry_interval := 30,
await_rel_timeout := 300 await_rel_timeout := 300
}, emqx_session:info(session())). }, emqx_session:info(session())).
t_session_stats(_) -> t_session_stats(_) ->
Stats = emqx_session:stats(session()), Stats = emqx_session:stats(session()),
?assertMatch(#{subscriptions_max := 0, ?assertMatch(#{subscriptions_max := infinity,
inflight_max := 0, inflight_max := 0,
mqueue_len := 0, mqueue_len := 0,
mqueue_max := 1000, mqueue_max := 1000,
@ -153,7 +153,7 @@ t_publish_qos2_with_error_return(_) ->
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1). {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1).
t_is_awaiting_full_false(_) -> t_is_awaiting_full_false(_) ->
Session = session(#{max_awaiting_rel => 0}), Session = session(#{max_awaiting_rel => infinity}),
?assertNot(emqx_session:is_awaiting_full(Session)). ?assertNot(emqx_session:is_awaiting_full(Session)).
t_is_awaiting_full_true(_) -> t_is_awaiting_full_true(_) ->