feat(banned): clean retained/delayed data when client is banned

This commit is contained in:
firest 2022-10-12 14:13:11 +08:00
parent ad0e9aa092
commit 69701ff578
14 changed files with 325 additions and 32 deletions

View File

@ -495,6 +495,19 @@ emqx_schema {
} }
} }
flapping_detect_clean_when_banned {
desc {
en: "Clean retained/delayed messages when client is banned.\n"
"Note: This may be expensive and only supports users banned by clientid."
zh: "当客户端被禁时删除其保留、延迟消息"
"注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。"
}
label: {
en: "Clean when banned"
zh: "被禁时清理消息"
}
}
persistent_session_store_enabled { persistent_session_store_enabled {
desc { desc {
en: "Use the database to store information about persistent sessions.\n" en: "Use the database to store information about persistent sessions.\n"

View File

@ -8,7 +8,7 @@
{emqx_conf,1}. {emqx_conf,1}.
{emqx_conf,2}. {emqx_conf,2}.
{emqx_dashboard,1}. {emqx_dashboard,1}.
{emqx_delayed,1}. {emqx_delayed,2}.
{emqx_exhook,1}. {emqx_exhook,1}.
{emqx_gateway_api_listeners,1}. {emqx_gateway_api_listeners,1}.
{emqx_gateway_cm,1}. {emqx_gateway_cm,1}.

View File

@ -32,11 +32,13 @@
-export([ -export([
check/1, check/1,
create/1, create/1,
create/2,
look_up/1, look_up/1,
delete/1, delete/1,
info/1, info/1,
format/1, format/1,
parse/1 parse/1,
parse_opts/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -63,6 +65,13 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-type banned_opts() :: #{
clean => boolean(),
atom() => term()
}.
-export_type([banned_opts/0]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -141,6 +150,11 @@ parse(Params) ->
{error, ErrorReason} {error, ErrorReason}
end end
end. end.
parse_opts(Params) ->
Clean = maps:get(<<"clean">>, Params, false),
#{clean => Clean}.
pares_who(#{as := As, who := Who}) -> pares_who(#{as := As, who := Who}) ->
pares_who(#{<<"as">> => As, <<"who">> => Who}); pares_who(#{<<"as">> => As, <<"who">> => Who});
pares_who(#{<<"as">> := peerhost, <<"who">> := Peerhost0}) -> pares_who(#{<<"as">> := peerhost, <<"who">> := Peerhost0}) ->
@ -162,13 +176,15 @@ to_rfc3339(Timestamp) ->
-spec create(emqx_types:banned() | map()) -> -spec create(emqx_types:banned() | map()) ->
{ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}. {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}.
create(#{ create(
who := Who, #{
by := By, who := Who,
reason := Reason, by := By,
at := At, reason := Reason,
until := Until at := At,
}) -> until := Until
} = Data
) ->
Banned = #banned{ Banned = #banned{
who = Who, who = Who,
by = By, by = By,
@ -176,11 +192,16 @@ create(#{
at = At, at = At,
until = Until until = Until
}, },
create(Banned); create(Banned, Data);
create(Banned = #banned{who = Who}) -> create(Banned = #banned{}) ->
create(Banned, #{clean => false}).
-spec create(emqx_types:banned(), banned_opts()) ->
{ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}.
create(Banned = #banned{who = Who}, Opts) ->
case look_up(Who) of case look_up(Who) of
[] -> [] ->
mria:dirty_write(?BANNED_TAB, Banned), insert_banned(Banned, Opts),
{ok, Banned}; {ok, Banned};
[OldBanned = #banned{until = Until}] -> [OldBanned = #banned{until = Until}] ->
%% Don't support shorten or extend the until time by overwrite. %% Don't support shorten or extend the until time by overwrite.
@ -190,7 +211,7 @@ create(Banned = #banned{who = Who}) ->
{error, {already_exist, OldBanned}}; {error, {already_exist, OldBanned}};
%% overwrite expired one is ok. %% overwrite expired one is ok.
false -> false ->
mria:dirty_write(?BANNED_TAB, Banned), insert_banned(Banned, Opts),
{ok, Banned} {ok, Banned}
end end
end. end.
@ -266,3 +287,12 @@ expire_banned_items(Now) ->
ok, ok,
?BANNED_TAB ?BANNED_TAB
). ).
insert_banned(Banned, Opts) ->
mria:dirty_write(?BANNED_TAB, Banned),
run_hooks(Banned, Opts).
run_hooks(Banned, #{clean := true}) ->
emqx_hooks:run('client.banned', [Banned]);
run_hooks(_Banned, _Opts) ->
ok.

View File

@ -121,7 +121,7 @@ handle_cast(
started_at = StartedAt, started_at = StartedAt,
detect_cnt = DetectCnt detect_cnt = DetectCnt
}, },
#{window_time := WindTime, ban_time := Interval}}, #{window_time := WindTime, ban_time := Interval, clean_when_banned := Clean}},
State State
) -> ) ->
case now_diff(StartedAt) < WindTime of case now_diff(StartedAt) < WindTime of
@ -145,7 +145,7 @@ handle_cast(
at = Now, at = Now,
until = Now + (Interval div 1000) until = Now + (Interval div 1000)
}, },
{ok, _} = emqx_banned:create(Banned), {ok, _} = emqx_banned:create(Banned, #{clean => Clean}),
ok; ok;
false -> false ->
?SLOG( ?SLOG(

View File

@ -640,6 +640,14 @@ fields("flapping_detect") ->
default => "5m", default => "5m",
desc => ?DESC(flapping_detect_ban_time) desc => ?DESC(flapping_detect_ban_time)
} }
)},
{"clean_when_banned",
sc(
boolean(),
#{
default => false,
desc => ?DESC(flapping_detect_clean_when_banned)
}
)} )}
]; ];
fields("force_shutdown") -> fields("force_shutdown") ->

View File

@ -34,7 +34,8 @@ init_per_suite(Config) ->
% 0.1s % 0.1s
window_time => 100, window_time => 100,
%% 2s %% 2s
ban_time => 2000 ban_time => 2000,
clean_when_banned => false
} }
), ),
Config. Config.

View File

@ -95,4 +95,16 @@ emqx_mgmt_api_banned {
zh: """封禁结束时间""" zh: """封禁结束时间"""
} }
} }
clean {
desc {
en: """Clean retained/delayed messages when client is banned."""
"""Note: This may be expensive and only supports users banned by clientid."""
zh: """当客户端被禁时删除其保留、延迟消息"""
"""注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。"""
}
label {
en: """Clean when banned"""
zh: """被禁时清理消息"""
}
}
} }

View File

@ -150,6 +150,12 @@ fields(ban) ->
desc => ?DESC(until), desc => ?DESC(until),
required => false, required => false,
example => <<"2021-10-25T21:53:47+08:00">> example => <<"2021-10-25T21:53:47+08:00">>
})},
{clean,
hoconsc:mk(boolean(), #{
desc => ?DESC(clean),
required => false,
example => false
})} })}
]. ].
@ -161,7 +167,8 @@ banned(post, #{body := Body}) ->
{error, Reason} -> {error, Reason} ->
{400, 'BAD_REQUEST', list_to_binary(Reason)}; {400, 'BAD_REQUEST', list_to_binary(Reason)};
Ban -> Ban ->
case emqx_banned:create(Ban) of Opts = emqx_banned:parse_opts(Body),
case emqx_banned:create(Ban, Opts) of
{ok, Banned} -> {ok, Banned} ->
{200, format(Banned)}; {200, format(Banned)};
{error, {already_exist, Old}} -> {error, {already_exist, Old}} ->

View File

@ -23,6 +23,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% Mnesia bootstrap %% Mnesia bootstrap
-export([mnesia/1]). -export([mnesia/1]).
@ -31,7 +32,8 @@
-export([ -export([
start_link/0, start_link/0,
on_message_publish/1 on_message_publish/1,
on_client_banned/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -44,7 +46,7 @@
code_change/3 code_change/3
]). ]).
%% gen_server callbacks %% API
-export([ -export([
load/0, load/0,
unload/0, unload/0,
@ -57,7 +59,9 @@
delete_delayed_message/1, delete_delayed_message/1,
delete_delayed_message/2, delete_delayed_message/2,
cluster_list/1, cluster_list/1,
cluster_query/4 cluster_query/4,
clean_by_clientid/1,
do_clean_by_clientid/1
]). ]).
-export([ -export([
@ -138,6 +142,11 @@ on_message_publish(
on_message_publish(Msg) -> on_message_publish(Msg) ->
{ok, Msg}. {ok, Msg}.
on_client_banned(#banned{who = {clientid, ClientId}}) ->
clean_by_clientid(ClientId);
on_client_banned(_) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Start delayed publish server %% Start delayed publish server
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -228,7 +237,7 @@ get_delayed_message(Id) ->
get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Node, Id) when Node =:= node() ->
get_delayed_message(Id); get_delayed_message(Id);
get_delayed_message(Node, Id) -> get_delayed_message(Node, Id) ->
emqx_delayed_proto_v1:get_delayed_message(Node, Id). emqx_delayed_proto_v2:get_delayed_message(Node, Id).
-spec delete_delayed_message(binary()) -> with_id_return(). -spec delete_delayed_message(binary()) -> with_id_return().
delete_delayed_message(Id) -> delete_delayed_message(Id) ->
@ -243,7 +252,7 @@ delete_delayed_message(Id) ->
delete_delayed_message(Node, Id) when Node =:= node() -> delete_delayed_message(Node, Id) when Node =:= node() ->
delete_delayed_message(Id); delete_delayed_message(Id);
delete_delayed_message(Node, Id) -> delete_delayed_message(Node, Id) ->
emqx_delayed_proto_v1:delete_delayed_message(Node, Id). emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
update_config(Config) -> update_config(Config) ->
emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
@ -252,6 +261,16 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) ->
Enable = maps:get(enable, NewConf, undefined), Enable = maps:get(enable, NewConf, undefined),
load_or_unload(Enable). load_or_unload(Enable).
clean_by_clientid(ClientId) ->
Nodes = mria_mnesia:running_nodes(),
_ = [emqx_delayed_proto_v2:clean_by_clientid(Node, ClientId) || Node <- Nodes],
ok.
do_clean_by_clientid(ClientId) ->
ets:select_delete(
?TAB, ets:fun2ms(fun(#delayed_message{msg = Msg}) -> Msg#message.from =:= ClientId end)
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callback %% gen_server callback
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -383,9 +402,11 @@ delayed_count() -> mnesia:table_info(?TAB, size).
do_load_or_unload(true, State) -> do_load_or_unload(true, State) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB), emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST),
State; State;
do_load_or_unload(false, #{publish_timer := PubTimer} = State) -> do_load_or_unload(false, #{publish_timer := PubTimer} = State) ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}),
emqx_misc:cancel_timer(PubTimer), emqx_misc:cancel_timer(PubTimer),
ets:delete_all_objects(?TAB), ets:delete_all_objects(?TAB),
State#{publish_timer := undefined, publish_at := 0}; State#{publish_timer := undefined, publish_at := 0};

View File

@ -0,0 +1,45 @@
%%--------------------------------------------------------------------
%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_delayed_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_delayed_message/2,
delete_delayed_message/2,
clean_by_clientid/2
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec get_delayed_message(node(), binary()) ->
emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
get_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
delete_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
-spec clean_by_clientid(node(), emqx_types:clientid()) ->
emqx_delayed:with_id_return() | emqx_rpc:badrpc().
clean_by_clientid(Node, ClientID) ->
rpc:call(Node, emqx_delayed, do_clean_by_clientid, [ClientID]).

View File

@ -212,6 +212,65 @@ t_delayed_precision(_) ->
_ = on_message_publish(DelayedMsg0), _ = on_message_publish(DelayedMsg0),
?assert(FutureDiff() =< MaxSpan). ?assert(FutureDiff() =< MaxSpan).
t_banned_clean(_) ->
emqx:update_config([delayed, max_delayed_messages], 10000),
ClientId1 = <<"bc1">>,
ClientId2 = <<"bc2">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
{ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C2),
[
begin
emqtt:publish(
Conn,
<<"$delayed/60/0/", ClientId/binary>>,
<<"">>,
[{qos, 0}, {retain, false}]
),
emqtt:publish(
Conn,
<<"$delayed/60/1/", ClientId/binary>>,
<<"">>,
[{qos, 0}, {retain, false}]
)
end
|| {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2])
],
emqtt:publish(
C2,
<<"$delayed/60/2/", ClientId2/binary>>,
<<"">>,
[{qos, 0}, {retain, false}]
),
timer:sleep(500),
?assertMatch(#{meta := #{count := 5}}, emqx_delayed:list(#{page => 1, limit => 10})),
Now = erlang:system_time(second),
Who = {clientid, ClientId2},
emqx_banned:create(#{
who => Who,
by => <<"test">>,
reason => <<"test">>,
at => Now,
until => Now + 120,
clean => true
}),
timer:sleep(500),
?assertMatch(#{meta := #{count := 2}}, emqx_delayed:list(#{page => 1, limit => 10})),
emqx_banned:delete(Who),
emqx_delayed:clean_by_clientid(ClientId1),
timer:sleep(500),
ok = emqtt:disconnect(C1),
ok = emqtt:disconnect(C2).
subscribe_proc() -> subscribe_proc() ->
Self = self(), Self = self(),
Ref = erlang:make_ref(), Ref = erlang:make_ref(),

View File

@ -19,6 +19,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-include("emqx_retainer.hrl"). -include("emqx_retainer.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
@ -26,7 +27,8 @@
-export([ -export([
on_session_subscribed/4, on_session_subscribed/4,
on_message_publish/2 on_message_publish/2,
on_client_banned/1
]). ]).
-export([ -export([
@ -39,6 +41,7 @@
get_expiry_time/1, get_expiry_time/1,
update_config/1, update_config/1,
clean/0, clean/0,
clean_by_clientid/1,
delete/1, delete/1,
page_read/3, page_read/3,
post_config_update/5, post_config_update/5,
@ -80,6 +83,7 @@
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
-callback clear_expired(context()) -> ok. -callback clear_expired(context()) -> ok.
-callback clean(context()) -> ok. -callback clean(context()) -> ok.
-callback clean_by_clientid(context(), emqx_types:clientid()) -> ok.
-callback size(context()) -> non_neg_integer(). -callback size(context()) -> non_neg_integer().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -118,6 +122,11 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
on_message_publish(Msg, _) -> on_message_publish(Msg, _) ->
{ok, Msg}. {ok, Msg}.
on_client_banned(#banned{who = {clientid, ClientId}}) ->
clean_by_clientid(ClientId);
on_client_banned(_) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -151,6 +160,9 @@ update_config(Conf) ->
clean() -> clean() ->
call(?FUNCTION_NAME). call(?FUNCTION_NAME).
clean_by_clientid(ClientId) ->
call({?FUNCTION_NAME, ClientId}).
delete(Topic) -> delete(Topic) ->
call({?FUNCTION_NAME, Topic}). call({?FUNCTION_NAME, Topic}).
@ -207,6 +219,9 @@ handle_call({update_config, NewConf, OldConf}, _, State) ->
handle_call(clean, _, #{context := Context} = State) -> handle_call(clean, _, #{context := Context} = State) ->
clean(Context), clean(Context),
{reply, ok, State}; {reply, ok, State};
handle_call({clean_by_clientid, ClientId}, _, #{context := Context} = State) ->
clean_by_clientid(Context, ClientId),
{reply, ok, State};
handle_call({delete, Topic}, _, #{context := Context} = State) -> handle_call({delete, Topic}, _, #{context := Context} = State) ->
delete_message(Context, Topic), delete_message(Context, Topic),
{reply, ok, State}; {reply, ok, State};
@ -298,6 +313,11 @@ clean(Context) ->
Mod = get_backend_module(), Mod = get_backend_module(),
Mod:clean(Context). Mod:clean(Context).
-spec clean_by_clientid(context(), emqx_types:clientid()) -> ok.
clean_by_clientid(Context, ClientId) ->
Mod = get_backend_module(),
Mod:clean_by_clientid(Context, ClientId).
-spec update_config(state(), hocons:config(), hocons:config()) -> state(). -spec update_config(state(), hocons:config(), hocons:config()) -> state().
update_config(State, Conf, OldConf) -> update_config(State, Conf, OldConf) ->
update_config( update_config(
@ -433,11 +453,13 @@ load(Context) ->
'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER 'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER
), ),
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER), ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER),
ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST),
emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
ok. ok.
unload() -> unload() ->
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}), ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}),
ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}),
emqx_stats:cancel_update(emqx_retainer_stats), emqx_stats:cancel_update(emqx_retainer_stats),
ok. ok.

View File

@ -33,13 +33,14 @@
match_messages/3, match_messages/3,
clear_expired/1, clear_expired/1,
clean/1, clean/1,
clean_by_clientid/2,
size/1 size/1
]). ]).
%% Internal exports (RPC) %% Internal exports (RPC)
-export([ -export([
do_store_retained/1, do_store_retained/1,
do_clear_expired/0, do_clear/1,
do_delete_message/1, do_delete_message/1,
do_populate_index_meta/1, do_populate_index_meta/1,
do_reindex_batch/2 do_reindex_batch/2
@ -61,6 +62,8 @@
-record(retained_index, {key, expiry_time}). -record(retained_index, {key, expiry_time}).
-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}). -record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
-type retained_message() :: #retained_message{}.
-define(META_KEY, index_meta). -define(META_KEY, index_meta).
-define(CLEAR_BATCH_SIZE, 1000). -define(CLEAR_BATCH_SIZE, 1000).
@ -164,18 +167,22 @@ do_store_retained(#message{topic = Topic} = Msg) ->
end. end.
clear_expired(_) -> clear_expired(_) ->
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0), NowMs = erlang:system_time(millisecond),
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [
fun(
#retained_message{expiry_time = ExpiryTime}
) ->
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
end
]),
ok. ok.
do_clear_expired() -> -spec do_clear(fun((retained_message()) -> boolean())) -> ok.
NowMs = erlang:system_time(millisecond), do_clear(Pred) ->
QH = qlc:q([ QH = qlc:q([
TopicTokens TopicTokens
|| #retained_message{ || #retained_message{topic = TopicTokens} = Data <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
topic = TopicTokens, Pred(Data)
expiry_time = ExpiryTime
} <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
]), ]),
QC = qlc:cursor(QH), QC = qlc:cursor(QH),
clear_batch(db_indices(write), QC). clear_batch(db_indices(write), QC).
@ -263,6 +270,14 @@ clean(_) ->
_ = mria:clear_table(?TAB_INDEX), _ = mria:clear_table(?TAB_INDEX),
ok. ok.
clean_by_clientid(_, ClientId) ->
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [
fun(Msg) ->
Msg#retained_message.msg#message.from =:= ClientId
end
]),
ok.
size(_) -> size(_) ->
table_size(). table_size().

View File

@ -626,6 +626,66 @@ t_get_basic_usage_info(_Config) ->
?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()), ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()),
ok. ok.
t_banned_clean(_) ->
ClientId1 = <<"bc1">>,
ClientId2 = <<"bc2">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
{ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C2),
[
begin
emqtt:publish(
Conn,
<<"bc/0/", ClientId/binary>>,
<<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(
Conn,
<<"bc/1/", ClientId/binary>>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]
)
end
|| {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2])
],
emqtt:publish(
C2,
<<"bc/2/", ClientId2/binary>>,
<<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]
),
timer:sleep(500),
{ok, List} = emqx_retainer:page_read(<<"bc/+/+">>, 1, 10),
?assertEqual(5, length(List)),
Now = erlang:system_time(second),
Who = {clientid, ClientId2},
emqx_banned:create(#{
who => Who,
by => <<"test">>,
reason => <<"test">>,
at => Now,
until => Now + 120,
clean => true
}),
timer:sleep(500),
{ok, List2} = emqx_retainer:page_read(<<"bc/#">>, 1, 10),
?assertEqual(2, length(List2)),
emqx_banned:delete(Who),
emqx_retainer:clean(),
timer:sleep(500),
ok = emqtt:disconnect(C1),
ok = emqtt:disconnect(C2).
%% test whether the app can start normally after disabling emqx_retainer %% test whether the app can start normally after disabling emqx_retainer
%% fix: https://github.com/emqx/emqx/pull/8911 %% fix: https://github.com/emqx/emqx/pull/8911
test_disable_then_start(_Config) -> test_disable_then_start(_Config) ->