Merge pull request #9139 from lafirest/fix/clean_blocked_user_data

feat(banned): clean retained/delayed data when client is banned
This commit is contained in:
lafirest 2022-10-20 18:00:48 +08:00 committed by GitHub
commit c0c2657a80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 331 additions and 32 deletions

View File

@ -9,6 +9,7 @@
Prior to this fix, the message publish APIs (`api/v5/publish` and `api/v5/publish/bulk`) echos the message back to the client in HTTP body. Prior to this fix, the message publish APIs (`api/v5/publish` and `api/v5/publish/bulk`) echos the message back to the client in HTTP body.
This change fixed it to only send back the message ID. This change fixed it to only send back the message ID.
* Add /trace/:name/log_detail HTTP API to return trace file's size and mtime [#9152](https://github.com/emqx/emqx/pull/9152) * Add /trace/:name/log_detail HTTP API to return trace file's size and mtime [#9152](https://github.com/emqx/emqx/pull/9152)
* Allow clear retained/delayed data when client is banned.[#9139](https://github.com/emqx/emqx/pull/9139)
## Bug fixes ## Bug fixes

View File

@ -495,6 +495,19 @@ emqx_schema {
} }
} }
flapping_detect_clean_when_banned {
desc {
en: "Clean retained/delayed messages when client is banned.\n"
"Note: This may be expensive and only supports users banned by clientid."
zh: "当客户端被禁时删除其保留、延迟消息"
"注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。"
}
label: {
en: "Clean when banned"
zh: "被禁时清理消息"
}
}
persistent_session_store_enabled { persistent_session_store_enabled {
desc { desc {
en: "Use the database to store information about persistent sessions.\n" en: "Use the database to store information about persistent sessions.\n"

View File

@ -9,6 +9,7 @@
{emqx_conf,2}. {emqx_conf,2}.
{emqx_dashboard,1}. {emqx_dashboard,1}.
{emqx_delayed,1}. {emqx_delayed,1}.
{emqx_delayed,2}.
{emqx_exhook,1}. {emqx_exhook,1}.
{emqx_gateway_api_listeners,1}. {emqx_gateway_api_listeners,1}.
{emqx_gateway_cm,1}. {emqx_gateway_cm,1}.

View File

@ -32,11 +32,13 @@
-export([ -export([
check/1, check/1,
create/1, create/1,
create/2,
look_up/1, look_up/1,
delete/1, delete/1,
info/1, info/1,
format/1, format/1,
parse/1 parse/1,
parse_opts/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -63,6 +65,13 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-type banned_opts() :: #{
clean => boolean(),
atom() => term()
}.
-export_type([banned_opts/0]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -141,6 +150,11 @@ parse(Params) ->
{error, ErrorReason} {error, ErrorReason}
end end
end. end.
parse_opts(Params) ->
Clean = maps:get(<<"clean">>, Params, false),
#{clean => Clean}.
pares_who(#{as := As, who := Who}) -> pares_who(#{as := As, who := Who}) ->
pares_who(#{<<"as">> => As, <<"who">> => Who}); pares_who(#{<<"as">> => As, <<"who">> => Who});
pares_who(#{<<"as">> := peerhost, <<"who">> := Peerhost0}) -> pares_who(#{<<"as">> := peerhost, <<"who">> := Peerhost0}) ->
@ -162,13 +176,15 @@ to_rfc3339(Timestamp) ->
-spec create(emqx_types:banned() | map()) -> -spec create(emqx_types:banned() | map()) ->
{ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}. {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}.
create(#{ create(
#{
who := Who, who := Who,
by := By, by := By,
reason := Reason, reason := Reason,
at := At, at := At,
until := Until until := Until
}) -> } = Data
) ->
Banned = #banned{ Banned = #banned{
who = Who, who = Who,
by = By, by = By,
@ -176,11 +192,16 @@ create(#{
at = At, at = At,
until = Until until = Until
}, },
create(Banned); create(Banned, Data);
create(Banned = #banned{who = Who}) -> create(Banned = #banned{}) ->
create(Banned, #{clean => false}).
-spec create(emqx_types:banned(), banned_opts()) ->
{ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}.
create(Banned = #banned{who = Who}, Opts) ->
case look_up(Who) of case look_up(Who) of
[] -> [] ->
mria:dirty_write(?BANNED_TAB, Banned), insert_banned(Banned, Opts),
{ok, Banned}; {ok, Banned};
[OldBanned = #banned{until = Until}] -> [OldBanned = #banned{until = Until}] ->
%% Don't support shorten or extend the until time by overwrite. %% Don't support shorten or extend the until time by overwrite.
@ -190,7 +211,7 @@ create(Banned = #banned{who = Who}) ->
{error, {already_exist, OldBanned}}; {error, {already_exist, OldBanned}};
%% overwrite expired one is ok. %% overwrite expired one is ok.
false -> false ->
mria:dirty_write(?BANNED_TAB, Banned), insert_banned(Banned, Opts),
{ok, Banned} {ok, Banned}
end end
end. end.
@ -266,3 +287,12 @@ expire_banned_items(Now) ->
ok, ok,
?BANNED_TAB ?BANNED_TAB
). ).
insert_banned(Banned, Opts) ->
mria:dirty_write(?BANNED_TAB, Banned),
run_hooks(Banned, Opts).
run_hooks(Banned, #{clean := true}) ->
emqx_hooks:run('client.banned', [Banned]);
run_hooks(_Banned, _Opts) ->
ok.

View File

@ -121,7 +121,7 @@ handle_cast(
started_at = StartedAt, started_at = StartedAt,
detect_cnt = DetectCnt detect_cnt = DetectCnt
}, },
#{window_time := WindTime, ban_time := Interval}}, #{window_time := WindTime, ban_time := Interval, clean_when_banned := Clean}},
State State
) -> ) ->
case now_diff(StartedAt) < WindTime of case now_diff(StartedAt) < WindTime of
@ -145,7 +145,7 @@ handle_cast(
at = Now, at = Now,
until = Now + (Interval div 1000) until = Now + (Interval div 1000)
}, },
{ok, _} = emqx_banned:create(Banned), {ok, _} = emqx_banned:create(Banned, #{clean => Clean}),
ok; ok;
false -> false ->
?SLOG( ?SLOG(

View File

@ -640,6 +640,14 @@ fields("flapping_detect") ->
default => "5m", default => "5m",
desc => ?DESC(flapping_detect_ban_time) desc => ?DESC(flapping_detect_ban_time)
} }
)},
{"clean_when_banned",
sc(
boolean(),
#{
default => false,
desc => ?DESC(flapping_detect_clean_when_banned)
}
)} )}
]; ];
fields("force_shutdown") -> fields("force_shutdown") ->

View File

@ -34,7 +34,8 @@ init_per_suite(Config) ->
% 0.1s % 0.1s
window_time => 100, window_time => 100,
%% 2s %% 2s
ban_time => 2000 ban_time => 2000,
clean_when_banned => false
} }
), ),
Config. Config.

View File

@ -95,4 +95,16 @@ emqx_mgmt_api_banned {
zh: """封禁结束时间""" zh: """封禁结束时间"""
} }
} }
clean {
desc {
en: """Clean retained/delayed messages when client is banned."""
"""Note: This may be expensive and only supports users banned by clientid."""
zh: """当客户端被禁时删除其保留、延迟消息"""
"""注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。"""
}
label {
en: """Clean when banned"""
zh: """被禁时清理消息"""
}
}
} }

View File

@ -150,6 +150,13 @@ fields(ban) ->
desc => ?DESC(until), desc => ?DESC(until),
required => false, required => false,
example => <<"2021-10-25T21:53:47+08:00">> example => <<"2021-10-25T21:53:47+08:00">>
})},
{clean,
hoconsc:mk(boolean(), #{
desc => ?DESC(clean),
required => false,
default => false,
example => false
})} })}
]. ].
@ -161,7 +168,8 @@ banned(post, #{body := Body}) ->
{error, Reason} -> {error, Reason} ->
{400, 'BAD_REQUEST', list_to_binary(Reason)}; {400, 'BAD_REQUEST', list_to_binary(Reason)};
Ban -> Ban ->
case emqx_banned:create(Ban) of Opts = emqx_banned:parse_opts(Body),
case emqx_banned:create(Ban, Opts) of
{ok, Banned} -> {ok, Banned} ->
{200, format(Banned)}; {200, format(Banned)};
{error, {already_exist, Old}} -> {error, {already_exist, Old}} ->

View File

@ -23,6 +23,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% Mnesia bootstrap %% Mnesia bootstrap
-export([mnesia/1]). -export([mnesia/1]).
@ -31,7 +32,8 @@
-export([ -export([
start_link/0, start_link/0,
on_message_publish/1 on_message_publish/1,
on_client_banned/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -44,7 +46,7 @@
code_change/3 code_change/3
]). ]).
%% gen_server callbacks %% API
-export([ -export([
load/0, load/0,
unload/0, unload/0,
@ -57,7 +59,9 @@
delete_delayed_message/1, delete_delayed_message/1,
delete_delayed_message/2, delete_delayed_message/2,
cluster_list/1, cluster_list/1,
cluster_query/4 cluster_query/4,
clean_by_clientid/1,
do_clean_by_clientid/1
]). ]).
-export([ -export([
@ -138,6 +142,11 @@ on_message_publish(
on_message_publish(Msg) -> on_message_publish(Msg) ->
{ok, Msg}. {ok, Msg}.
on_client_banned(#banned{who = {clientid, ClientId}}) ->
clean_by_clientid(ClientId);
on_client_banned(_) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Start delayed publish server %% Start delayed publish server
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -228,7 +237,7 @@ get_delayed_message(Id) ->
get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Node, Id) when Node =:= node() ->
get_delayed_message(Id); get_delayed_message(Id);
get_delayed_message(Node, Id) -> get_delayed_message(Node, Id) ->
emqx_delayed_proto_v1:get_delayed_message(Node, Id). emqx_delayed_proto_v2:get_delayed_message(Node, Id).
-spec delete_delayed_message(binary()) -> with_id_return(). -spec delete_delayed_message(binary()) -> with_id_return().
delete_delayed_message(Id) -> delete_delayed_message(Id) ->
@ -243,7 +252,7 @@ delete_delayed_message(Id) ->
delete_delayed_message(Node, Id) when Node =:= node() -> delete_delayed_message(Node, Id) when Node =:= node() ->
delete_delayed_message(Id); delete_delayed_message(Id);
delete_delayed_message(Node, Id) -> delete_delayed_message(Node, Id) ->
emqx_delayed_proto_v1:delete_delayed_message(Node, Id). emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
update_config(Config) -> update_config(Config) ->
emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
@ -252,6 +261,15 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) ->
Enable = maps:get(enable, NewConf, undefined), Enable = maps:get(enable, NewConf, undefined),
load_or_unload(Enable). load_or_unload(Enable).
clean_by_clientid(ClientId) ->
Nodes = mria_mnesia:running_nodes(),
emqx_delayed_proto_v2:clean_by_clientid(Nodes, ClientId).
do_clean_by_clientid(ClientId) ->
ets:select_delete(
?TAB, ets:fun2ms(fun(#delayed_message{msg = Msg}) -> Msg#message.from =:= ClientId end)
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callback %% gen_server callback
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -383,9 +401,11 @@ delayed_count() -> mnesia:table_info(?TAB, size).
do_load_or_unload(true, State) -> do_load_or_unload(true, State) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB), emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST),
State; State;
do_load_or_unload(false, #{publish_timer := PubTimer} = State) -> do_load_or_unload(false, #{publish_timer := PubTimer} = State) ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}),
emqx_misc:cancel_timer(PubTimer), emqx_misc:cancel_timer(PubTimer),
ets:delete_all_objects(?TAB), ets:delete_all_objects(?TAB),
State#{publish_timer := undefined, publish_at := 0}; State#{publish_timer := undefined, publish_at := 0};

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_delayed_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_delayed_message/2,
delete_delayed_message/2,
clean_by_clientid/2
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.0.9".
-spec get_delayed_message(node(), binary()) ->
emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
get_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
delete_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
-spec clean_by_clientid(list(node()), emqx_types:clientid()) ->
emqx_rpc:erpc_multicall().
clean_by_clientid(Nodes, ClientID) ->
erpc:multicall(Nodes, emqx_delayed, do_clean_by_clientid, [ClientID], ?TIMEOUT).

View File

@ -212,6 +212,67 @@ t_delayed_precision(_) ->
_ = on_message_publish(DelayedMsg0), _ = on_message_publish(DelayedMsg0),
?assert(FutureDiff() =< MaxSpan). ?assert(FutureDiff() =< MaxSpan).
t_banned_clean(_) ->
emqx:update_config([delayed, max_delayed_messages], 10000),
ClientId1 = <<"bc1">>,
ClientId2 = <<"bc2">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
{ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C2),
[
begin
emqtt:publish(
Conn,
<<"$delayed/60/0/", ClientId/binary>>,
<<"">>,
[{qos, 0}, {retain, false}]
),
emqtt:publish(
Conn,
<<"$delayed/60/1/", ClientId/binary>>,
<<"">>,
[{qos, 0}, {retain, false}]
)
end
|| {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2])
],
emqtt:publish(
C2,
<<"$delayed/60/2/", ClientId2/binary>>,
<<"">>,
[{qos, 0}, {retain, false}]
),
timer:sleep(500),
?assertMatch(#{meta := #{count := 5}}, emqx_delayed:list(#{page => 1, limit => 10})),
Now = erlang:system_time(second),
Who = {clientid, ClientId2},
try
emqx_banned:create(#{
who => Who,
by => <<"test">>,
reason => <<"test">>,
at => Now,
until => Now + 120,
clean => true
}),
timer:sleep(500),
?assertMatch(#{meta := #{count := 2}}, emqx_delayed:list(#{page => 1, limit => 10}))
after
emqx_banned:delete(Who),
emqx_delayed:clean_by_clientid(ClientId1)
end,
timer:sleep(500),
ok = emqtt:disconnect(C1),
ok = emqtt:disconnect(C2).
subscribe_proc() -> subscribe_proc() ->
Self = self(), Self = self(),
Ref = erlang:make_ref(), Ref = erlang:make_ref(),

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.5"}, {vsn, "5.0.6"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx]}, {applications, [kernel, stdlib, emqx]},

View File

@ -19,6 +19,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-include("emqx_retainer.hrl"). -include("emqx_retainer.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
@ -26,7 +27,8 @@
-export([ -export([
on_session_subscribed/4, on_session_subscribed/4,
on_message_publish/2 on_message_publish/2,
on_client_banned/1
]). ]).
-export([ -export([
@ -39,6 +41,7 @@
get_expiry_time/1, get_expiry_time/1,
update_config/1, update_config/1,
clean/0, clean/0,
clean_by_clientid/1,
delete/1, delete/1,
page_read/3, page_read/3,
post_config_update/5, post_config_update/5,
@ -80,6 +83,7 @@
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
-callback clear_expired(context()) -> ok. -callback clear_expired(context()) -> ok.
-callback clean(context()) -> ok. -callback clean(context()) -> ok.
-callback clean_by_clientid(context(), emqx_types:clientid()) -> ok.
-callback size(context()) -> non_neg_integer(). -callback size(context()) -> non_neg_integer().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -118,6 +122,11 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
on_message_publish(Msg, _) -> on_message_publish(Msg, _) ->
{ok, Msg}. {ok, Msg}.
on_client_banned(#banned{who = {clientid, ClientId}}) ->
clean_by_clientid(ClientId);
on_client_banned(_) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -151,6 +160,9 @@ update_config(Conf) ->
clean() -> clean() ->
call(?FUNCTION_NAME). call(?FUNCTION_NAME).
clean_by_clientid(ClientId) ->
call({?FUNCTION_NAME, ClientId}).
delete(Topic) -> delete(Topic) ->
call({?FUNCTION_NAME, Topic}). call({?FUNCTION_NAME, Topic}).
@ -207,6 +219,9 @@ handle_call({update_config, NewConf, OldConf}, _, State) ->
handle_call(clean, _, #{context := Context} = State) -> handle_call(clean, _, #{context := Context} = State) ->
clean(Context), clean(Context),
{reply, ok, State}; {reply, ok, State};
handle_call({clean_by_clientid, ClientId}, _, #{context := Context} = State) ->
clean_by_clientid(Context, ClientId),
{reply, ok, State};
handle_call({delete, Topic}, _, #{context := Context} = State) -> handle_call({delete, Topic}, _, #{context := Context} = State) ->
delete_message(Context, Topic), delete_message(Context, Topic),
{reply, ok, State}; {reply, ok, State};
@ -298,6 +313,11 @@ clean(Context) ->
Mod = get_backend_module(), Mod = get_backend_module(),
Mod:clean(Context). Mod:clean(Context).
-spec clean_by_clientid(context(), emqx_types:clientid()) -> ok.
clean_by_clientid(Context, ClientId) ->
Mod = get_backend_module(),
Mod:clean_by_clientid(Context, ClientId).
-spec update_config(state(), hocons:config(), hocons:config()) -> state(). -spec update_config(state(), hocons:config(), hocons:config()) -> state().
update_config(State, Conf, OldConf) -> update_config(State, Conf, OldConf) ->
update_config( update_config(
@ -433,11 +453,13 @@ load(Context) ->
'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER 'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER
), ),
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER), ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER),
ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST),
emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
ok. ok.
unload() -> unload() ->
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}), ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}),
ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}),
emqx_stats:cancel_update(emqx_retainer_stats), emqx_stats:cancel_update(emqx_retainer_stats),
ok. ok.

View File

@ -33,13 +33,14 @@
match_messages/3, match_messages/3,
clear_expired/1, clear_expired/1,
clean/1, clean/1,
clean_by_clientid/2,
size/1 size/1
]). ]).
%% Internal exports (RPC) %% Internal exports (RPC)
-export([ -export([
do_store_retained/1, do_store_retained/1,
do_clear_expired/0, do_clear/1,
do_delete_message/1, do_delete_message/1,
do_populate_index_meta/1, do_populate_index_meta/1,
do_reindex_batch/2 do_reindex_batch/2
@ -61,6 +62,8 @@
-record(retained_index, {key, expiry_time}). -record(retained_index, {key, expiry_time}).
-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}). -record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
-type retained_message() :: #retained_message{}.
-define(META_KEY, index_meta). -define(META_KEY, index_meta).
-define(CLEAR_BATCH_SIZE, 1000). -define(CLEAR_BATCH_SIZE, 1000).
@ -164,18 +167,22 @@ do_store_retained(#message{topic = Topic} = Msg) ->
end. end.
clear_expired(_) -> clear_expired(_) ->
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0), NowMs = erlang:system_time(millisecond),
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [
fun(
#retained_message{expiry_time = ExpiryTime}
) ->
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
end
]),
ok. ok.
do_clear_expired() -> -spec do_clear(fun((retained_message()) -> boolean())) -> ok.
NowMs = erlang:system_time(millisecond), do_clear(Pred) ->
QH = qlc:q([ QH = qlc:q([
TopicTokens TopicTokens
|| #retained_message{ || #retained_message{topic = TopicTokens} = Data <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
topic = TopicTokens, Pred(Data)
expiry_time = ExpiryTime
} <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
]), ]),
QC = qlc:cursor(QH), QC = qlc:cursor(QH),
clear_batch(db_indices(write), QC). clear_batch(db_indices(write), QC).
@ -263,6 +270,14 @@ clean(_) ->
_ = mria:clear_table(?TAB_INDEX), _ = mria:clear_table(?TAB_INDEX),
ok. ok.
clean_by_clientid(_, ClientId) ->
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [
fun(Msg) ->
Msg#retained_message.msg#message.from =:= ClientId
end
]),
ok.
size(_) -> size(_) ->
table_size(). table_size().

View File

@ -626,6 +626,66 @@ t_get_basic_usage_info(_Config) ->
?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()), ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()),
ok. ok.
t_banned_clean(_) ->
ClientId1 = <<"bc1">>,
ClientId2 = <<"bc2">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
{ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C2),
[
begin
emqtt:publish(
Conn,
<<"bc/0/", ClientId/binary>>,
<<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(
Conn,
<<"bc/1/", ClientId/binary>>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]
)
end
|| {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2])
],
emqtt:publish(
C2,
<<"bc/2/", ClientId2/binary>>,
<<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]
),
timer:sleep(500),
{ok, List} = emqx_retainer:page_read(<<"bc/+/+">>, 1, 10),
?assertEqual(5, length(List)),
Now = erlang:system_time(second),
Who = {clientid, ClientId2},
emqx_banned:create(#{
who => Who,
by => <<"test">>,
reason => <<"test">>,
at => Now,
until => Now + 120,
clean => true
}),
timer:sleep(500),
{ok, List2} = emqx_retainer:page_read(<<"bc/#">>, 1, 10),
?assertEqual(2, length(List2)),
emqx_banned:delete(Who),
emqx_retainer:clean(),
timer:sleep(500),
ok = emqtt:disconnect(C1),
ok = emqtt:disconnect(C2).
%% test whether the app can start normally after disabling emqx_retainer %% test whether the app can start normally after disabling emqx_retainer
%% fix: https://github.com/emqx/emqx/pull/8911 %% fix: https://github.com/emqx/emqx/pull/8911
test_disable_then_start(_Config) -> test_disable_then_start(_Config) ->