Correct timestamp for banned (#3188)
This commit is contained in:
parent
9a76164e65
commit
f6b2c9a69f
|
@ -640,15 +640,15 @@ end}.
|
||||||
{translation, "emqx.flapping_detect_policy", fun(Conf) ->
|
{translation, "emqx.flapping_detect_policy", fun(Conf) ->
|
||||||
Policy = cuttlefish:conf_get("flapping_detect_policy", Conf),
|
Policy = cuttlefish:conf_get("flapping_detect_policy", Conf),
|
||||||
[Threshold, Duration, Interval] = string:tokens(Policy, ", "),
|
[Threshold, Duration, Interval] = string:tokens(Policy, ", "),
|
||||||
ParseDuration = fun(S) ->
|
ParseDuration = fun(S, Dur) ->
|
||||||
case cuttlefish_duration:parse(S, ms) of
|
case cuttlefish_duration:parse(S, Dur) of
|
||||||
I when is_integer(I) -> I;
|
I when is_integer(I) -> I;
|
||||||
{error, Reason} -> error(Reason)
|
{error, Reason} -> error(Reason)
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
#{threshold => list_to_integer(Threshold),
|
#{threshold => list_to_integer(Threshold),
|
||||||
duration => ParseDuration(Duration),
|
duration => ParseDuration(Duration, ms),
|
||||||
banned_interval => ParseDuration(Interval)
|
banned_interval => ParseDuration(Interval, s)
|
||||||
}
|
}
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,7 @@ do_check(Who) when is_tuple(Who) ->
|
||||||
case mnesia:dirty_read(?BANNED_TAB, Who) of
|
case mnesia:dirty_read(?BANNED_TAB, Who) of
|
||||||
[] -> false;
|
[] -> false;
|
||||||
[#banned{until = Until}] ->
|
[#banned{until = Until}] ->
|
||||||
Until > erlang:system_time(millisecond)
|
Until > erlang:system_time(second)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(create(emqx_types:banned()) -> ok).
|
-spec(create(emqx_types:banned()) -> ok).
|
||||||
|
|
|
@ -124,7 +124,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
|
||||||
true -> %% Flapping happened:(
|
true -> %% Flapping happened:(
|
||||||
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
|
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
|
||||||
[ClientId, inet:ntoa(PeerHost), DetectCnt, Duration]),
|
[ClientId, inet:ntoa(PeerHost), DetectCnt, Duration]),
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(second),
|
||||||
Banned = #banned{who = {clientid, ClientId},
|
Banned = #banned{who = {clientid, ClientId},
|
||||||
by = <<"flapping detector">>,
|
by = <<"flapping detector">>,
|
||||||
reason = <<"flapping is detected">>,
|
reason = <<"flapping is detected">>,
|
||||||
|
|
|
@ -31,7 +31,7 @@ set_special_configs(emqx) ->
|
||||||
application:set_env(emqx, flapping_detect_policy,
|
application:set_env(emqx, flapping_detect_policy,
|
||||||
#{threshold => 3,
|
#{threshold => 3,
|
||||||
duration => 100,
|
duration => 100,
|
||||||
banned_interval => 200
|
banned_interval => 2
|
||||||
});
|
});
|
||||||
set_special_configs(_App) -> ok.
|
set_special_configs(_App) -> ok.
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ t_detect_check(_) ->
|
||||||
true = emqx_flapping:detect(ClientInfo),
|
true = emqx_flapping:detect(ClientInfo),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
true = emqx_banned:check(ClientInfo),
|
true = emqx_banned:check(ClientInfo),
|
||||||
timer:sleep(200),
|
timer:sleep(3000),
|
||||||
false = emqx_banned:check(ClientInfo),
|
false = emqx_banned:check(ClientInfo),
|
||||||
Childrens = supervisor:which_children(emqx_cm_sup),
|
Childrens = supervisor:which_children(emqx_cm_sup),
|
||||||
{flapping, Pid, _, _} = lists:keyfind(flapping, 1, Childrens),
|
{flapping, Pid, _, _} = lists:keyfind(flapping, 1, Childrens),
|
||||||
|
|
|
@ -147,8 +147,8 @@ t_connect_keepalive_timeout(_) ->
|
||||||
Msg ->
|
Msg ->
|
||||||
ReasonCode = 141,
|
ReasonCode = 141,
|
||||||
?assertMatch({disconnected, ReasonCode, _Channel}, Msg)
|
?assertMatch({disconnected, ReasonCode, _Channel}, Msg)
|
||||||
after
|
after round(timer:seconds(Keepalive) * 2 * 1.5 ) ->
|
||||||
round(timer:seconds(Keepalive) * 2 * 1.5 ) -> error("keepalive timeout")
|
error("keepalive timeout")
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -160,7 +160,7 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
|
||||||
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
||||||
|
|
||||||
Topic = nth(1, ?TOPICS),
|
Topic = nth(1, ?TOPICS),
|
||||||
Shared_topic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)),
|
SharedTopic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)),
|
||||||
|
|
||||||
CRef = counters:new(1, [atomics]),
|
CRef = counters:new(1, [atomics]),
|
||||||
meck:expect(emqtt, connected,
|
meck:expect(emqtt, connected,
|
||||||
|
@ -174,18 +174,23 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
|
||||||
{clientid, <<"sub_client_1">>},
|
{clientid, <<"sub_client_1">>},
|
||||||
{keepalive, 5}]),
|
{keepalive, 5}]),
|
||||||
{ok, _} = emqtt:connect(Sub1),
|
{ok, _} = emqtt:connect(Sub1),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Sub1, Shared_topic, qos2),
|
{ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2),
|
||||||
|
|
||||||
{ok, Sub2} = emqtt:start_link([{proto_ver, v5},
|
{ok, Sub2} = emqtt:start_link([{proto_ver, v5},
|
||||||
{clientid, <<"sub_client_2">>},
|
{clientid, <<"sub_client_2">>},
|
||||||
{keepalive, 5}]),
|
{keepalive, 5}]),
|
||||||
{ok, _} = emqtt:connect(Sub2),
|
{ok, _} = emqtt:connect(Sub2),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Sub2, Shared_topic, qos2),
|
{ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2),
|
||||||
|
|
||||||
{ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]),
|
{ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]),
|
||||||
{ok, _} = emqtt:connect(Pub),
|
{ok, _} = emqtt:connect(Pub),
|
||||||
{ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2),
|
{ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2),
|
||||||
|
|
||||||
receive
|
receive
|
||||||
{disconnected,shutdown,for_testiong} -> ok
|
{'EXIT', _,{shutdown, for_testiong}} ->
|
||||||
|
ok
|
||||||
|
after 1000 ->
|
||||||
|
error("disconnected timeout")
|
||||||
end,
|
end,
|
||||||
|
|
||||||
?assertEqual(1, counters:get(CRef, 1)).
|
?assertEqual(1, counters:get(CRef, 1)).
|
||||||
|
|
Loading…
Reference in New Issue