feat(banned): kick session when it is banned by clientid
This commit is contained in:
parent
56d443d19a
commit
085074ac41
|
@ -21,6 +21,7 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
@ -180,7 +181,7 @@ create(#{
|
||||||
create(Banned = #banned{who = Who}) ->
|
create(Banned = #banned{who = Who}) ->
|
||||||
case look_up(Who) of
|
case look_up(Who) of
|
||||||
[] ->
|
[] ->
|
||||||
mria:dirty_write(?BANNED_TAB, Banned),
|
insert_banned(Banned),
|
||||||
{ok, Banned};
|
{ok, Banned};
|
||||||
[OldBanned = #banned{until = Until}] ->
|
[OldBanned = #banned{until = Until}] ->
|
||||||
%% Don't support shorten or extend the until time by overwrite.
|
%% Don't support shorten or extend the until time by overwrite.
|
||||||
|
@ -190,7 +191,7 @@ create(Banned = #banned{who = Who}) ->
|
||||||
{error, {already_exist, OldBanned}};
|
{error, {already_exist, OldBanned}};
|
||||||
%% overwrite expired one is ok.
|
%% overwrite expired one is ok.
|
||||||
false ->
|
false ->
|
||||||
mria:dirty_write(?BANNED_TAB, Banned),
|
insert_banned(Banned),
|
||||||
{ok, Banned}
|
{ok, Banned}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -266,3 +267,21 @@ expire_banned_items(Now) ->
|
||||||
ok,
|
ok,
|
||||||
?BANNED_TAB
|
?BANNED_TAB
|
||||||
).
|
).
|
||||||
|
|
||||||
|
insert_banned(Banned) ->
|
||||||
|
mria:dirty_write(?BANNED_TAB, Banned),
|
||||||
|
on_banned(Banned).
|
||||||
|
|
||||||
|
on_banned(#banned{who = {clientid, ClientId}}) ->
|
||||||
|
%% kick the session if the client is banned by clientid
|
||||||
|
?tp(
|
||||||
|
warning,
|
||||||
|
kick_session_due_to_banned,
|
||||||
|
#{
|
||||||
|
clientid => ClientId
|
||||||
|
}
|
||||||
|
),
|
||||||
|
emqx_cm:kick_session(ClientId),
|
||||||
|
ok;
|
||||||
|
on_banned(_) ->
|
||||||
|
ok.
|
||||||
|
|
|
@ -21,18 +21,20 @@
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
application:load(emqx),
|
emqx_common_test_helpers:start_apps([]),
|
||||||
ok = ekka:start(),
|
ok = ekka:start(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ekka:stop(),
|
ekka:stop(),
|
||||||
mria:stop(),
|
mria:stop(),
|
||||||
mria_mnesia:delete_schema().
|
mria_mnesia:delete_schema(),
|
||||||
|
emqx_common_test_helpers:stop_apps([]).
|
||||||
|
|
||||||
t_add_delete(_) ->
|
t_add_delete(_) ->
|
||||||
Banned = #banned{
|
Banned = #banned{
|
||||||
|
@ -111,3 +113,23 @@ t_unused(_) ->
|
||||||
%% expiry timer
|
%% expiry timer
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
ok = emqx_banned:stop().
|
ok = emqx_banned:stop().
|
||||||
|
|
||||||
|
t_kick(_) ->
|
||||||
|
ClientId = <<"client">>,
|
||||||
|
snabbkaffe:start_trace(),
|
||||||
|
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Who = {clientid, ClientId},
|
||||||
|
|
||||||
|
emqx_banned:create(#{
|
||||||
|
who => Who,
|
||||||
|
by => <<"test">>,
|
||||||
|
reason => <<"test">>,
|
||||||
|
at => Now,
|
||||||
|
until => Now + 120
|
||||||
|
}),
|
||||||
|
|
||||||
|
Trace = snabbkaffe:collect_trace(),
|
||||||
|
snabbkaffe:stop(),
|
||||||
|
emqx_banned:delete(Who),
|
||||||
|
?assertEqual(1, length(?of_kind(kick_session_due_to_banned, Trace))).
|
||||||
|
|
|
@ -640,26 +640,25 @@ test_disable_then_start(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_deliver_when_banned(_) ->
|
t_deliver_when_banned(_) ->
|
||||||
ClientId = <<"c1">>,
|
Client1 = <<"c1">>,
|
||||||
|
Client2 = <<"c2">>,
|
||||||
|
|
||||||
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clientid, Client1}, {clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(I) ->
|
fun(I) ->
|
||||||
Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])),
|
Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])),
|
||||||
emqtt:publish(
|
Msg = emqx_message:make(Client2, 0, Topic, <<"this is a retained message">>),
|
||||||
C1,
|
Msg2 = emqx_message:set_flag(retain, Msg),
|
||||||
Topic,
|
emqx:publish(Msg2)
|
||||||
<<"this is a retained message">>,
|
|
||||||
[{qos, 0}, {retain, true}]
|
|
||||||
)
|
|
||||||
end,
|
end,
|
||||||
lists:seq(1, 3)
|
lists:seq(1, 3)
|
||||||
),
|
),
|
||||||
|
|
||||||
Now = erlang:system_time(second),
|
Now = erlang:system_time(second),
|
||||||
Who = {clientid, ClientId},
|
Who = {clientid, Client2},
|
||||||
|
|
||||||
emqx_banned:create(#{
|
emqx_banned:create(#{
|
||||||
who => Who,
|
who => Who,
|
||||||
by => <<"test">>,
|
by => <<"test">>,
|
||||||
|
|
Loading…
Reference in New Issue