From ebaba0c2b179a4a8005ede762faa1a1d1d91d782 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 8 Nov 2022 11:36:11 +0800 Subject: [PATCH] fix: Revert "feat(banned): clean retained/delayed data when client is banned" This reverts commit 69701ff578adf2085f32cd4af484e725ea704123. --- apps/emqx/i18n/emqx_schema_i18n.conf | 13 ---- apps/emqx/priv/bpapi.versions | 1 - apps/emqx/src/emqx_banned.erl | 54 ++++------------ apps/emqx/src/emqx_flapping.erl | 4 +- apps/emqx/src/emqx_schema.erl | 8 --- apps/emqx/test/emqx_flapping_SUITE.erl | 3 +- .../i18n/emqx_mgmt_api_banned_i18n.conf | 12 ---- .../src/emqx_mgmt_api_banned.erl | 10 +-- apps/emqx_modules/src/emqx_delayed.erl | 30 ++------- .../src/proto/emqx_delayed_proto_v2.erl | 47 -------------- apps/emqx_modules/test/emqx_delayed_SUITE.erl | 61 ------------------- apps/emqx_retainer/src/emqx_retainer.erl | 24 +------- .../src/emqx_retainer_mnesia.erl | 33 +++------- .../test/emqx_retainer_SUITE.erl | 60 ------------------ 14 files changed, 31 insertions(+), 329 deletions(-) delete mode 100644 apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 23784785a..714a08704 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -494,19 +494,6 @@ emqx_schema { } } - flapping_detect_clean_when_banned { - desc { - en: "Clean retained/delayed messages when client is banned.\n" - "Note: This may be expensive and only supports users banned by clientid." - zh: "当客户端被禁时删除其保留、延迟消息" - "注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。" - } - label: { - en: "Clean when banned" - zh: "被禁时清理消息" - } - } - persistent_session_store_enabled { desc { en: "Use the database to store information about persistent sessions.\n" diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 29872f143..9997055dc 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -9,7 +9,6 @@ {emqx_conf,2}. {emqx_dashboard,1}. {emqx_delayed,1}. -{emqx_delayed,2}. {emqx_exhook,1}. {emqx_gateway_api_listeners,1}. {emqx_gateway_cm,1}. diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index d7443543f..cf81c735b 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -32,13 +32,11 @@ -export([ check/1, create/1, - create/2, look_up/1, delete/1, info/1, format/1, - parse/1, - parse_opts/1 + parse/1 ]). %% gen_server callbacks @@ -65,13 +63,6 @@ -compile(nowarn_export_all). -endif. --type banned_opts() :: #{ - clean => boolean(), - atom() => term() -}. - --export_type([banned_opts/0]). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -150,11 +141,6 @@ parse(Params) -> {error, ErrorReason} end end. - -parse_opts(Params) -> - Clean = maps:get(<<"clean">>, Params, false), - #{clean => Clean}. - pares_who(#{as := As, who := Who}) -> pares_who(#{<<"as">> => As, <<"who">> => Who}); pares_who(#{<<"as">> := peerhost, <<"who">> := Peerhost0}) -> @@ -176,15 +162,13 @@ to_rfc3339(Timestamp) -> -spec create(emqx_types:banned() | map()) -> {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}. -create( - #{ - who := Who, - by := By, - reason := Reason, - at := At, - until := Until - } = Data -) -> +create(#{ + who := Who, + by := By, + reason := Reason, + at := At, + until := Until +}) -> Banned = #banned{ who = Who, by = By, @@ -192,16 +176,11 @@ create( at = At, until = Until }, - create(Banned, Data); -create(Banned = #banned{}) -> - create(Banned, #{clean => false}). - --spec create(emqx_types:banned(), banned_opts()) -> - {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}. -create(Banned = #banned{who = Who}, Opts) -> + create(Banned); +create(Banned = #banned{who = Who}) -> case look_up(Who) of [] -> - insert_banned(Banned, Opts), + mria:dirty_write(?BANNED_TAB, Banned), {ok, Banned}; [OldBanned = #banned{until = Until}] -> %% Don't support shorten or extend the until time by overwrite. @@ -211,7 +190,7 @@ create(Banned = #banned{who = Who}, Opts) -> {error, {already_exist, OldBanned}}; %% overwrite expired one is ok. false -> - insert_banned(Banned, Opts), + mria:dirty_write(?BANNED_TAB, Banned), {ok, Banned} end end. @@ -287,12 +266,3 @@ expire_banned_items(Now) -> ok, ?BANNED_TAB ). - -insert_banned(Banned, Opts) -> - mria:dirty_write(?BANNED_TAB, Banned), - run_hooks(Banned, Opts). - -run_hooks(Banned, #{clean := true}) -> - emqx_hooks:run('client.banned', [Banned]); -run_hooks(_Banned, _Opts) -> - ok. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index f0492cbf9..7e72c488f 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -121,7 +121,7 @@ handle_cast( started_at = StartedAt, detect_cnt = DetectCnt }, - #{window_time := WindTime, ban_time := Interval, clean_when_banned := Clean}}, + #{window_time := WindTime, ban_time := Interval}}, State ) -> case now_diff(StartedAt) < WindTime of @@ -145,7 +145,7 @@ handle_cast( at = Now, until = Now + (Interval div 1000) }, - {ok, _} = emqx_banned:create(Banned, #{clean => Clean}), + {ok, _} = emqx_banned:create(Banned), ok; false -> ?SLOG( diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 74ed51a38..4c26f86f9 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -640,14 +640,6 @@ fields("flapping_detect") -> default => "5m", desc => ?DESC(flapping_detect_ban_time) } - )}, - {"clean_when_banned", - sc( - boolean(), - #{ - default => false, - desc => ?DESC(flapping_detect_clean_when_banned) - } )} ]; fields("force_shutdown") -> diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index 774ccaae2..f37e20fdc 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -34,8 +34,7 @@ init_per_suite(Config) -> % 0.1s window_time => 100, %% 2s - ban_time => 2000, - clean_when_banned => false + ban_time => 2000 } ), Config. diff --git a/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf b/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf index 04b96891c..3045cb293 100644 --- a/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf +++ b/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf @@ -95,16 +95,4 @@ emqx_mgmt_api_banned { zh: """封禁结束时间""" } } - clean { - desc { - en: """Clean retained/delayed messages when client is banned.""" - """Note: This may be expensive and only supports users banned by clientid.""" - zh: """当客户端被禁时删除其保留、延迟消息""" - """注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。""" - } - label { - en: """Clean when banned""" - zh: """被禁时清理消息""" - } - } } diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index 1eacc67c2..2eb8908c6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -150,13 +150,6 @@ fields(ban) -> desc => ?DESC(until), required => false, example => <<"2021-10-25T21:53:47+08:00">> - })}, - {clean, - hoconsc:mk(boolean(), #{ - desc => ?DESC(clean), - required => false, - default => false, - example => false })} ]. @@ -168,8 +161,7 @@ banned(post, #{body := Body}) -> {error, Reason} -> {400, 'BAD_REQUEST', list_to_binary(Reason)}; Ban -> - Opts = emqx_banned:parse_opts(Body), - case emqx_banned:create(Ban, Opts) of + case emqx_banned:create(Ban) of {ok, Banned} -> {200, format(Banned)}; {error, {already_exist, Old}} -> diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 6a8b8de5b..76646bc64 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -23,7 +23,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). %% Mnesia bootstrap -export([mnesia/1]). @@ -32,8 +31,7 @@ -export([ start_link/0, - on_message_publish/1, - on_client_banned/1 + on_message_publish/1 ]). %% gen_server callbacks @@ -46,7 +44,7 @@ code_change/3 ]). -%% API +%% gen_server callbacks -export([ load/0, unload/0, @@ -59,9 +57,7 @@ delete_delayed_message/1, delete_delayed_message/2, cluster_list/1, - cluster_query/4, - clean_by_clientid/1, - do_clean_by_clientid/1 + cluster_query/4 ]). -export([ @@ -142,11 +138,6 @@ on_message_publish( on_message_publish(Msg) -> {ok, Msg}. -on_client_banned(#banned{who = {clientid, ClientId}}) -> - clean_by_clientid(ClientId); -on_client_banned(_) -> - ok. - %%-------------------------------------------------------------------- %% Start delayed publish server %%-------------------------------------------------------------------- @@ -237,7 +228,7 @@ get_delayed_message(Id) -> get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Id); get_delayed_message(Node, Id) -> - emqx_delayed_proto_v2:get_delayed_message(Node, Id). + emqx_delayed_proto_v1:get_delayed_message(Node, Id). -spec delete_delayed_message(binary()) -> with_id_return(). delete_delayed_message(Id) -> @@ -252,7 +243,7 @@ delete_delayed_message(Id) -> delete_delayed_message(Node, Id) when Node =:= node() -> delete_delayed_message(Id); delete_delayed_message(Node, Id) -> - emqx_delayed_proto_v2:delete_delayed_message(Node, Id). + emqx_delayed_proto_v1:delete_delayed_message(Node, Id). update_config(Config) -> emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). @@ -261,15 +252,6 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) -> Enable = maps:get(enable, NewConf, undefined), load_or_unload(Enable). -clean_by_clientid(ClientId) -> - Nodes = mria_mnesia:running_nodes(), - emqx_delayed_proto_v2:clean_by_clientid(Nodes, ClientId). - -do_clean_by_clientid(ClientId) -> - ets:select_delete( - ?TAB, ets:fun2ms(fun(#delayed_message{msg = Msg}) -> Msg#message.from =:= ClientId end) - ). - %%-------------------------------------------------------------------- %% gen_server callback %%-------------------------------------------------------------------- @@ -401,11 +383,9 @@ delayed_count() -> mnesia:table_info(?TAB, size). do_load_or_unload(true, State) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB), - ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST), State; do_load_or_unload(false, #{publish_timer := PubTimer} = State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), - ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}), emqx_misc:cancel_timer(PubTimer), ets:delete_all_objects(?TAB), State#{publish_timer := undefined, publish_at := 0}; diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl deleted file mode 100644 index 9bbf35720..000000000 --- a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl +++ /dev/null @@ -1,47 +0,0 @@ -%%-------------------------------------------------------------------- -%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_delayed_proto_v2). - --behaviour(emqx_bpapi). - --export([ - introduced_in/0, - get_delayed_message/2, - delete_delayed_message/2, - clean_by_clientid/2 -]). - --include_lib("emqx/include/bpapi.hrl"). - --define(TIMEOUT, 15000). - -introduced_in() -> - "5.0.10". - --spec get_delayed_message(node(), binary()) -> - emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). -get_delayed_message(Node, Id) -> - rpc:call(Node, emqx_delayed, get_delayed_message, [Id]). - --spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). -delete_delayed_message(Node, Id) -> - rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). - --spec clean_by_clientid(list(node()), emqx_types:clientid()) -> - emqx_rpc:erpc_multicall(). -clean_by_clientid(Nodes, ClientID) -> - erpc:multicall(Nodes, emqx_delayed, do_clean_by_clientid, [ClientID], ?TIMEOUT). diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index bd1de3849..e5e3db98c 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -212,67 +212,6 @@ t_delayed_precision(_) -> _ = on_message_publish(DelayedMsg0), ?assert(FutureDiff() =< MaxSpan). -t_banned_clean(_) -> - emqx:update_config([delayed, max_delayed_messages], 10000), - ClientId1 = <<"bc1">>, - ClientId2 = <<"bc2">>, - {ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - - {ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C2), - - [ - begin - emqtt:publish( - Conn, - <<"$delayed/60/0/", ClientId/binary>>, - <<"">>, - [{qos, 0}, {retain, false}] - ), - emqtt:publish( - Conn, - <<"$delayed/60/1/", ClientId/binary>>, - <<"">>, - [{qos, 0}, {retain, false}] - ) - end - || {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2]) - ], - - emqtt:publish( - C2, - <<"$delayed/60/2/", ClientId2/binary>>, - <<"">>, - [{qos, 0}, {retain, false}] - ), - - timer:sleep(500), - ?assertMatch(#{meta := #{count := 5}}, emqx_delayed:list(#{page => 1, limit => 10})), - - Now = erlang:system_time(second), - Who = {clientid, ClientId2}, - try - emqx_banned:create(#{ - who => Who, - by => <<"test">>, - reason => <<"test">>, - at => Now, - until => Now + 120, - clean => true - }), - - timer:sleep(500), - - ?assertMatch(#{meta := #{count := 2}}, emqx_delayed:list(#{page => 1, limit => 10})) - after - emqx_banned:delete(Who), - emqx_delayed:clean_by_clientid(ClientId1) - end, - timer:sleep(500), - ok = emqtt:disconnect(C1), - ok = emqtt:disconnect(C2). - subscribe_proc() -> Self = self(), Ref = erlang:make_ref(), diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index d6a025a41..5d911b5f4 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -19,7 +19,6 @@ -behaviour(gen_server). -include("emqx_retainer.hrl"). --include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -27,8 +26,7 @@ -export([ on_session_subscribed/4, - on_message_publish/2, - on_client_banned/1 + on_message_publish/2 ]). -export([ @@ -41,7 +39,6 @@ get_expiry_time/1, update_config/1, clean/0, - clean_by_clientid/1, delete/1, page_read/3, post_config_update/5, @@ -83,7 +80,6 @@ -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback clear_expired(context()) -> ok. -callback clean(context()) -> ok. --callback clean_by_clientid(context(), emqx_types:clientid()) -> ok. -callback size(context()) -> non_neg_integer(). %%-------------------------------------------------------------------- @@ -122,11 +118,6 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> on_message_publish(Msg, _) -> {ok, Msg}. -on_client_banned(#banned{who = {clientid, ClientId}}) -> - clean_by_clientid(ClientId); -on_client_banned(_) -> - ok. - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -160,9 +151,6 @@ update_config(Conf) -> clean() -> call(?FUNCTION_NAME). -clean_by_clientid(ClientId) -> - call({?FUNCTION_NAME, ClientId}). - delete(Topic) -> call({?FUNCTION_NAME, Topic}). @@ -219,9 +207,6 @@ handle_call({update_config, NewConf, OldConf}, _, State) -> handle_call(clean, _, #{context := Context} = State) -> clean(Context), {reply, ok, State}; -handle_call({clean_by_clientid, ClientId}, _, #{context := Context} = State) -> - clean_by_clientid(Context, ClientId), - {reply, ok, State}; handle_call({delete, Topic}, _, #{context := Context} = State) -> delete_message(Context, Topic), {reply, ok, State}; @@ -313,11 +298,6 @@ clean(Context) -> Mod = get_backend_module(), Mod:clean(Context). --spec clean_by_clientid(context(), emqx_types:clientid()) -> ok. -clean_by_clientid(Context, ClientId) -> - Mod = get_backend_module(), - Mod:clean_by_clientid(Context, ClientId). - -spec update_config(state(), hocons:config(), hocons:config()) -> state(). update_config(State, Conf, OldConf) -> update_config( @@ -453,13 +433,11 @@ load(Context) -> 'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER ), ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER), - ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST), emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), ok. unload() -> ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}), - ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}), emqx_stats:cancel_update(emqx_retainer_stats), ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 281d9c3ce..c236b9c28 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -33,14 +33,13 @@ match_messages/3, clear_expired/1, clean/1, - clean_by_clientid/2, size/1 ]). %% Internal exports (RPC) -export([ do_store_retained/1, - do_clear/1, + do_clear_expired/0, do_delete_message/1, do_populate_index_meta/1, do_reindex_batch/2 @@ -62,8 +61,6 @@ -record(retained_index, {key, expiry_time}). -record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}). --type retained_message() :: #retained_message{}. - -define(META_KEY, index_meta). -define(CLEAR_BATCH_SIZE, 1000). @@ -167,22 +164,18 @@ do_store_retained(#message{topic = Topic} = Msg) -> end. clear_expired(_) -> - NowMs = erlang:system_time(millisecond), - {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [ - fun( - #retained_message{expiry_time = ExpiryTime} - ) -> - (ExpiryTime =/= 0) and (ExpiryTime < NowMs) - end - ]), + {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0), ok. --spec do_clear(fun((retained_message()) -> boolean())) -> ok. -do_clear(Pred) -> +do_clear_expired() -> + NowMs = erlang:system_time(millisecond), QH = qlc:q([ TopicTokens - || #retained_message{topic = TopicTokens} = Data <- mnesia:table(?TAB_MESSAGE, [{lock, write}]), - Pred(Data) + || #retained_message{ + topic = TopicTokens, + expiry_time = ExpiryTime + } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]), + (ExpiryTime =/= 0) and (ExpiryTime < NowMs) ]), QC = qlc:cursor(QH), clear_batch(db_indices(write), QC). @@ -270,14 +263,6 @@ clean(_) -> _ = mria:clear_table(?TAB_INDEX), ok. -clean_by_clientid(_, ClientId) -> - {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [ - fun(Msg) -> - Msg#retained_message.msg#message.from =:= ClientId - end - ]), - ok. - size(_) -> table_size(). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 23d4aee98..09e6c4bb4 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -626,66 +626,6 @@ t_get_basic_usage_info(_Config) -> ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()), ok. -t_banned_clean(_) -> - ClientId1 = <<"bc1">>, - ClientId2 = <<"bc2">>, - {ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - - {ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C2), - - [ - begin - emqtt:publish( - Conn, - <<"bc/0/", ClientId/binary>>, - <<"this is a retained message 0">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - Conn, - <<"bc/1/", ClientId/binary>>, - <<"this is a retained message 1">>, - [{qos, 0}, {retain, true}] - ) - end - || {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2]) - ], - - emqtt:publish( - C2, - <<"bc/2/", ClientId2/binary>>, - <<"this is a retained message 2">>, - [{qos, 0}, {retain, true}] - ), - - timer:sleep(500), - {ok, List} = emqx_retainer:page_read(<<"bc/+/+">>, 1, 10), - ?assertEqual(5, length(List)), - - Now = erlang:system_time(second), - Who = {clientid, ClientId2}, - emqx_banned:create(#{ - who => Who, - by => <<"test">>, - reason => <<"test">>, - at => Now, - until => Now + 120, - clean => true - }), - - timer:sleep(500), - - {ok, List2} = emqx_retainer:page_read(<<"bc/#">>, 1, 10), - ?assertEqual(2, length(List2)), - - emqx_banned:delete(Who), - emqx_retainer:clean(), - timer:sleep(500), - ok = emqtt:disconnect(C1), - ok = emqtt:disconnect(C2). - %% test whether the app can start normally after disabling emqx_retainer %% fix: https://github.com/emqx/emqx/pull/8911 test_disable_then_start(_Config) ->