From 69701ff578adf2085f32cd4af484e725ea704123 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 12 Oct 2022 14:13:11 +0800 Subject: [PATCH 1/4] feat(banned): clean retained/delayed data when client is banned --- apps/emqx/i18n/emqx_schema_i18n.conf | 13 ++++ apps/emqx/priv/bpapi.versions | 2 +- apps/emqx/src/emqx_banned.erl | 54 +++++++++++++---- apps/emqx/src/emqx_flapping.erl | 4 +- apps/emqx/src/emqx_schema.erl | 8 +++ apps/emqx/test/emqx_flapping_SUITE.erl | 3 +- .../i18n/emqx_mgmt_api_banned_i18n.conf | 12 ++++ .../src/emqx_mgmt_api_banned.erl | 9 ++- apps/emqx_modules/src/emqx_delayed.erl | 31 ++++++++-- .../src/proto/emqx_delayed_proto_v2.erl | 45 ++++++++++++++ apps/emqx_modules/test/emqx_delayed_SUITE.erl | 59 ++++++++++++++++++ apps/emqx_retainer/src/emqx_retainer.erl | 24 +++++++- .../src/emqx_retainer_mnesia.erl | 33 +++++++--- .../test/emqx_retainer_SUITE.erl | 60 +++++++++++++++++++ 14 files changed, 325 insertions(+), 32 deletions(-) create mode 100644 apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 17e59a4dd..8f30d3c41 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -495,6 +495,19 @@ emqx_schema { } } + flapping_detect_clean_when_banned { + desc { + en: "Clean retained/delayed messages when client is banned.\n" + "Note: This may be expensive and only supports users banned by clientid." + zh: "当客户端被禁时删除其保留、延迟消息" + "注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。" + } + label: { + en: "Clean when banned" + zh: "被禁时清理消息" + } + } + persistent_session_store_enabled { desc { en: "Use the database to store information about persistent sessions.\n" diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 062fe084d..f5c515cce 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -8,7 +8,7 @@ {emqx_conf,1}. {emqx_conf,2}. {emqx_dashboard,1}. -{emqx_delayed,1}. +{emqx_delayed,2}. {emqx_exhook,1}. {emqx_gateway_api_listeners,1}. {emqx_gateway_cm,1}. diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index cf81c735b..d7443543f 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -32,11 +32,13 @@ -export([ check/1, create/1, + create/2, look_up/1, delete/1, info/1, format/1, - parse/1 + parse/1, + parse_opts/1 ]). %% gen_server callbacks @@ -63,6 +65,13 @@ -compile(nowarn_export_all). -endif. +-type banned_opts() :: #{ + clean => boolean(), + atom() => term() +}. + +-export_type([banned_opts/0]). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -141,6 +150,11 @@ parse(Params) -> {error, ErrorReason} end end. + +parse_opts(Params) -> + Clean = maps:get(<<"clean">>, Params, false), + #{clean => Clean}. + pares_who(#{as := As, who := Who}) -> pares_who(#{<<"as">> => As, <<"who">> => Who}); pares_who(#{<<"as">> := peerhost, <<"who">> := Peerhost0}) -> @@ -162,13 +176,15 @@ to_rfc3339(Timestamp) -> -spec create(emqx_types:banned() | map()) -> {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}. -create(#{ - who := Who, - by := By, - reason := Reason, - at := At, - until := Until -}) -> +create( + #{ + who := Who, + by := By, + reason := Reason, + at := At, + until := Until + } = Data +) -> Banned = #banned{ who = Who, by = By, @@ -176,11 +192,16 @@ create(#{ at = At, until = Until }, - create(Banned); -create(Banned = #banned{who = Who}) -> + create(Banned, Data); +create(Banned = #banned{}) -> + create(Banned, #{clean => false}). + +-spec create(emqx_types:banned(), banned_opts()) -> + {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}. +create(Banned = #banned{who = Who}, Opts) -> case look_up(Who) of [] -> - mria:dirty_write(?BANNED_TAB, Banned), + insert_banned(Banned, Opts), {ok, Banned}; [OldBanned = #banned{until = Until}] -> %% Don't support shorten or extend the until time by overwrite. @@ -190,7 +211,7 @@ create(Banned = #banned{who = Who}) -> {error, {already_exist, OldBanned}}; %% overwrite expired one is ok. false -> - mria:dirty_write(?BANNED_TAB, Banned), + insert_banned(Banned, Opts), {ok, Banned} end end. @@ -266,3 +287,12 @@ expire_banned_items(Now) -> ok, ?BANNED_TAB ). + +insert_banned(Banned, Opts) -> + mria:dirty_write(?BANNED_TAB, Banned), + run_hooks(Banned, Opts). + +run_hooks(Banned, #{clean := true}) -> + emqx_hooks:run('client.banned', [Banned]); +run_hooks(_Banned, _Opts) -> + ok. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 7e72c488f..f0492cbf9 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -121,7 +121,7 @@ handle_cast( started_at = StartedAt, detect_cnt = DetectCnt }, - #{window_time := WindTime, ban_time := Interval}}, + #{window_time := WindTime, ban_time := Interval, clean_when_banned := Clean}}, State ) -> case now_diff(StartedAt) < WindTime of @@ -145,7 +145,7 @@ handle_cast( at = Now, until = Now + (Interval div 1000) }, - {ok, _} = emqx_banned:create(Banned), + {ok, _} = emqx_banned:create(Banned, #{clean => Clean}), ok; false -> ?SLOG( diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 135992965..17bd03088 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -640,6 +640,14 @@ fields("flapping_detect") -> default => "5m", desc => ?DESC(flapping_detect_ban_time) } + )}, + {"clean_when_banned", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(flapping_detect_clean_when_banned) + } )} ]; fields("force_shutdown") -> diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index f37e20fdc..774ccaae2 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -34,7 +34,8 @@ init_per_suite(Config) -> % 0.1s window_time => 100, %% 2s - ban_time => 2000 + ban_time => 2000, + clean_when_banned => false } ), Config. diff --git a/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf b/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf index 3045cb293..04b96891c 100644 --- a/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf +++ b/apps/emqx_management/i18n/emqx_mgmt_api_banned_i18n.conf @@ -95,4 +95,16 @@ emqx_mgmt_api_banned { zh: """封禁结束时间""" } } + clean { + desc { + en: """Clean retained/delayed messages when client is banned.""" + """Note: This may be expensive and only supports users banned by clientid.""" + zh: """当客户端被禁时删除其保留、延迟消息""" + """注意: 这个操作开销可能较大,且只支持通过 clientid 封禁的用户数据。""" + } + label { + en: """Clean when banned""" + zh: """被禁时清理消息""" + } + } } diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index 2eb8908c6..e3a99038c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -150,6 +150,12 @@ fields(ban) -> desc => ?DESC(until), required => false, example => <<"2021-10-25T21:53:47+08:00">> + })}, + {clean, + hoconsc:mk(boolean(), #{ + desc => ?DESC(clean), + required => false, + example => false })} ]. @@ -161,7 +167,8 @@ banned(post, #{body := Body}) -> {error, Reason} -> {400, 'BAD_REQUEST', list_to_binary(Reason)}; Ban -> - case emqx_banned:create(Ban) of + Opts = emqx_banned:parse_opts(Body), + case emqx_banned:create(Ban, Opts) of {ok, Banned} -> {200, format(Banned)}; {error, {already_exist, Old}} -> diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 76646bc64..f703c9dce 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). %% Mnesia bootstrap -export([mnesia/1]). @@ -31,7 +32,8 @@ -export([ start_link/0, - on_message_publish/1 + on_message_publish/1, + on_client_banned/1 ]). %% gen_server callbacks @@ -44,7 +46,7 @@ code_change/3 ]). -%% gen_server callbacks +%% API -export([ load/0, unload/0, @@ -57,7 +59,9 @@ delete_delayed_message/1, delete_delayed_message/2, cluster_list/1, - cluster_query/4 + cluster_query/4, + clean_by_clientid/1, + do_clean_by_clientid/1 ]). -export([ @@ -138,6 +142,11 @@ on_message_publish( on_message_publish(Msg) -> {ok, Msg}. +on_client_banned(#banned{who = {clientid, ClientId}}) -> + clean_by_clientid(ClientId); +on_client_banned(_) -> + ok. + %%-------------------------------------------------------------------- %% Start delayed publish server %%-------------------------------------------------------------------- @@ -228,7 +237,7 @@ get_delayed_message(Id) -> get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Id); get_delayed_message(Node, Id) -> - emqx_delayed_proto_v1:get_delayed_message(Node, Id). + emqx_delayed_proto_v2:get_delayed_message(Node, Id). -spec delete_delayed_message(binary()) -> with_id_return(). delete_delayed_message(Id) -> @@ -243,7 +252,7 @@ delete_delayed_message(Id) -> delete_delayed_message(Node, Id) when Node =:= node() -> delete_delayed_message(Id); delete_delayed_message(Node, Id) -> - emqx_delayed_proto_v1:delete_delayed_message(Node, Id). + emqx_delayed_proto_v2:delete_delayed_message(Node, Id). update_config(Config) -> emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). @@ -252,6 +261,16 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) -> Enable = maps:get(enable, NewConf, undefined), load_or_unload(Enable). +clean_by_clientid(ClientId) -> + Nodes = mria_mnesia:running_nodes(), + _ = [emqx_delayed_proto_v2:clean_by_clientid(Node, ClientId) || Node <- Nodes], + ok. + +do_clean_by_clientid(ClientId) -> + ets:select_delete( + ?TAB, ets:fun2ms(fun(#delayed_message{msg = Msg}) -> Msg#message.from =:= ClientId end) + ). + %%-------------------------------------------------------------------- %% gen_server callback %%-------------------------------------------------------------------- @@ -383,9 +402,11 @@ delayed_count() -> mnesia:table_info(?TAB, size). do_load_or_unload(true, State) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB), + ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST), State; do_load_or_unload(false, #{publish_timer := PubTimer} = State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), + ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}), emqx_misc:cancel_timer(PubTimer), ets:delete_all_objects(?TAB), State#{publish_timer := undefined, publish_at := 0}; diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl new file mode 100644 index 000000000..bb09e47eb --- /dev/null +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl @@ -0,0 +1,45 @@ +%%-------------------------------------------------------------------- +%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_delayed_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_delayed_message/2, + delete_delayed_message/2, + clean_by_clientid/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_delayed_message(node(), binary()) -> + emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). +get_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, get_delayed_message, [Id]). + +-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). +delete_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). + +-spec clean_by_clientid(node(), emqx_types:clientid()) -> + emqx_delayed:with_id_return() | emqx_rpc:badrpc(). +clean_by_clientid(Node, ClientID) -> + rpc:call(Node, emqx_delayed, do_clean_by_clientid, [ClientID]). diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index e5e3db98c..f19d9388c 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -212,6 +212,65 @@ t_delayed_precision(_) -> _ = on_message_publish(DelayedMsg0), ?assert(FutureDiff() =< MaxSpan). +t_banned_clean(_) -> + emqx:update_config([delayed, max_delayed_messages], 10000), + ClientId1 = <<"bc1">>, + ClientId2 = <<"bc2">>, + {ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + + {ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C2), + + [ + begin + emqtt:publish( + Conn, + <<"$delayed/60/0/", ClientId/binary>>, + <<"">>, + [{qos, 0}, {retain, false}] + ), + emqtt:publish( + Conn, + <<"$delayed/60/1/", ClientId/binary>>, + <<"">>, + [{qos, 0}, {retain, false}] + ) + end + || {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2]) + ], + + emqtt:publish( + C2, + <<"$delayed/60/2/", ClientId2/binary>>, + <<"">>, + [{qos, 0}, {retain, false}] + ), + + timer:sleep(500), + ?assertMatch(#{meta := #{count := 5}}, emqx_delayed:list(#{page => 1, limit => 10})), + + Now = erlang:system_time(second), + Who = {clientid, ClientId2}, + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120, + clean => true + }), + + timer:sleep(500), + + ?assertMatch(#{meta := #{count := 2}}, emqx_delayed:list(#{page => 1, limit => 10})), + + emqx_banned:delete(Who), + emqx_delayed:clean_by_clientid(ClientId1), + timer:sleep(500), + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + subscribe_proc() -> Self = self(), Ref = erlang:make_ref(), diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 5d911b5f4..d6a025a41 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("emqx_retainer.hrl"). +-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -26,7 +27,8 @@ -export([ on_session_subscribed/4, - on_message_publish/2 + on_message_publish/2, + on_client_banned/1 ]). -export([ @@ -39,6 +41,7 @@ get_expiry_time/1, update_config/1, clean/0, + clean_by_clientid/1, delete/1, page_read/3, post_config_update/5, @@ -80,6 +83,7 @@ -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback clear_expired(context()) -> ok. -callback clean(context()) -> ok. +-callback clean_by_clientid(context(), emqx_types:clientid()) -> ok. -callback size(context()) -> non_neg_integer(). %%-------------------------------------------------------------------- @@ -118,6 +122,11 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> on_message_publish(Msg, _) -> {ok, Msg}. +on_client_banned(#banned{who = {clientid, ClientId}}) -> + clean_by_clientid(ClientId); +on_client_banned(_) -> + ok. + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -151,6 +160,9 @@ update_config(Conf) -> clean() -> call(?FUNCTION_NAME). +clean_by_clientid(ClientId) -> + call({?FUNCTION_NAME, ClientId}). + delete(Topic) -> call({?FUNCTION_NAME, Topic}). @@ -207,6 +219,9 @@ handle_call({update_config, NewConf, OldConf}, _, State) -> handle_call(clean, _, #{context := Context} = State) -> clean(Context), {reply, ok, State}; +handle_call({clean_by_clientid, ClientId}, _, #{context := Context} = State) -> + clean_by_clientid(Context, ClientId), + {reply, ok, State}; handle_call({delete, Topic}, _, #{context := Context} = State) -> delete_message(Context, Topic), {reply, ok, State}; @@ -298,6 +313,11 @@ clean(Context) -> Mod = get_backend_module(), Mod:clean(Context). +-spec clean_by_clientid(context(), emqx_types:clientid()) -> ok. +clean_by_clientid(Context, ClientId) -> + Mod = get_backend_module(), + Mod:clean_by_clientid(Context, ClientId). + -spec update_config(state(), hocons:config(), hocons:config()) -> state(). update_config(State, Conf, OldConf) -> update_config( @@ -433,11 +453,13 @@ load(Context) -> 'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER ), ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER), + ok = emqx_hooks:put('client.banned', {?MODULE, on_client_banned, []}, ?HP_LOWEST), emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), ok. unload() -> ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}), + ok = emqx_hooks:del('client.banned', {?MODULE, on_client_banned}), emqx_stats:cancel_update(emqx_retainer_stats), ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index c236b9c28..281d9c3ce 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -33,13 +33,14 @@ match_messages/3, clear_expired/1, clean/1, + clean_by_clientid/2, size/1 ]). %% Internal exports (RPC) -export([ do_store_retained/1, - do_clear_expired/0, + do_clear/1, do_delete_message/1, do_populate_index_meta/1, do_reindex_batch/2 @@ -61,6 +62,8 @@ -record(retained_index, {key, expiry_time}). -record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}). +-type retained_message() :: #retained_message{}. + -define(META_KEY, index_meta). -define(CLEAR_BATCH_SIZE, 1000). @@ -164,18 +167,22 @@ do_store_retained(#message{topic = Topic} = Msg) -> end. clear_expired(_) -> - {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0), + NowMs = erlang:system_time(millisecond), + {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [ + fun( + #retained_message{expiry_time = ExpiryTime} + ) -> + (ExpiryTime =/= 0) and (ExpiryTime < NowMs) + end + ]), ok. -do_clear_expired() -> - NowMs = erlang:system_time(millisecond), +-spec do_clear(fun((retained_message()) -> boolean())) -> ok. +do_clear(Pred) -> QH = qlc:q([ TopicTokens - || #retained_message{ - topic = TopicTokens, - expiry_time = ExpiryTime - } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]), - (ExpiryTime =/= 0) and (ExpiryTime < NowMs) + || #retained_message{topic = TopicTokens} = Data <- mnesia:table(?TAB_MESSAGE, [{lock, write}]), + Pred(Data) ]), QC = qlc:cursor(QH), clear_batch(db_indices(write), QC). @@ -263,6 +270,14 @@ clean(_) -> _ = mria:clear_table(?TAB_INDEX), ok. +clean_by_clientid(_, ClientId) -> + {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear/1, [ + fun(Msg) -> + Msg#retained_message.msg#message.from =:= ClientId + end + ]), + ok. + size(_) -> table_size(). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 09e6c4bb4..23d4aee98 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -626,6 +626,66 @@ t_get_basic_usage_info(_Config) -> ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()), ok. +t_banned_clean(_) -> + ClientId1 = <<"bc1">>, + ClientId2 = <<"bc2">>, + {ok, C1} = emqtt:start_link([{clientid, ClientId1}, {clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + + {ok, C2} = emqtt:start_link([{clientid, ClientId2}, {clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C2), + + [ + begin + emqtt:publish( + Conn, + <<"bc/0/", ClientId/binary>>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + Conn, + <<"bc/1/", ClientId/binary>>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}] + ) + end + || {ClientId, Conn} <- lists:zip([ClientId1, ClientId2], [C1, C2]) + ], + + emqtt:publish( + C2, + <<"bc/2/", ClientId2/binary>>, + <<"this is a retained message 2">>, + [{qos, 0}, {retain, true}] + ), + + timer:sleep(500), + {ok, List} = emqx_retainer:page_read(<<"bc/+/+">>, 1, 10), + ?assertEqual(5, length(List)), + + Now = erlang:system_time(second), + Who = {clientid, ClientId2}, + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120, + clean => true + }), + + timer:sleep(500), + + {ok, List2} = emqx_retainer:page_read(<<"bc/#">>, 1, 10), + ?assertEqual(2, length(List2)), + + emqx_banned:delete(Who), + emqx_retainer:clean(), + timer:sleep(500), + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + %% test whether the app can start normally after disabling emqx_retainer %% fix: https://github.com/emqx/emqx/pull/8911 test_disable_then_start(_Config) -> From b2eed47976dd60b008cec6107437dc4b6a53be74 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 18 Oct 2022 13:36:08 +0800 Subject: [PATCH 2/4] chore: bump retainer version && update CHANGES --- CHANGES-5.0.md | 1 + apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index 0f617678d..6e78d3532 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -9,6 +9,7 @@ Prior to this fix, the message publish APIs (`api/v5/publish` and `api/v5/publish/bulk`) echos the message back to the client in HTTP body. This change fixed it to only send back the message ID. * Add /trace/:name/log_detail HTTP API to return trace file's size and mtime [#9152](https://github.com/emqx/emqx/pull/9152) +* Allow clear retained/delayed data when client is banned.[#9139](https://github.com/emqx/emqx/pull/9139) ## Bug fixes diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 888335ab4..c49794cfe 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.5"}, + {vsn, "5.0.6"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx]}, From 3f1f2443108c7ceb4e9bcf29d6416fa7d141454f Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 12 Oct 2022 19:27:57 +0800 Subject: [PATCH 3/4] fix: fix bpapi versions --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index f5c515cce..da3e5d023 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -8,6 +8,7 @@ {emqx_conf,1}. {emqx_conf,2}. {emqx_dashboard,1}. +{emqx_delayed,1}. {emqx_delayed,2}. {emqx_exhook,1}. {emqx_gateway_api_listeners,1}. diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl index bb09e47eb..e08ab785a 100644 --- a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl @@ -28,7 +28,7 @@ -include_lib("emqx/include/bpapi.hrl"). introduced_in() -> - "5.0.0". + "5.0.9". -spec get_delayed_message(node(), binary()) -> emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). From bdaadda479ce27b640e4fe3238e12cca7dea5ece Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 18 Oct 2022 11:34:43 +0800 Subject: [PATCH 4/4] fix(banned): fix nit code --- .../src/emqx_mgmt_api_banned.erl | 1 + apps/emqx_modules/src/emqx_delayed.erl | 3 +- .../src/proto/emqx_delayed_proto_v2.erl | 10 ++++--- apps/emqx_modules/test/emqx_delayed_SUITE.erl | 28 ++++++++++--------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index e3a99038c..1eacc67c2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -155,6 +155,7 @@ fields(ban) -> hoconsc:mk(boolean(), #{ desc => ?DESC(clean), required => false, + default => false, example => false })} ]. diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index f703c9dce..6a8b8de5b 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -263,8 +263,7 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) -> clean_by_clientid(ClientId) -> Nodes = mria_mnesia:running_nodes(), - _ = [emqx_delayed_proto_v2:clean_by_clientid(Node, ClientId) || Node <- Nodes], - ok. + emqx_delayed_proto_v2:clean_by_clientid(Nodes, ClientId). do_clean_by_clientid(ClientId) -> ets:select_delete( diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl index e08ab785a..866b2cd40 100644 --- a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl @@ -27,6 +27,8 @@ -include_lib("emqx/include/bpapi.hrl"). +-define(TIMEOUT, 15000). + introduced_in() -> "5.0.9". @@ -39,7 +41,7 @@ get_delayed_message(Node, Id) -> delete_delayed_message(Node, Id) -> rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). --spec clean_by_clientid(node(), emqx_types:clientid()) -> - emqx_delayed:with_id_return() | emqx_rpc:badrpc(). -clean_by_clientid(Node, ClientID) -> - rpc:call(Node, emqx_delayed, do_clean_by_clientid, [ClientID]). +-spec clean_by_clientid(list(node()), emqx_types:clientid()) -> + emqx_rpc:erpc_multicall(). +clean_by_clientid(Nodes, ClientID) -> + erpc:multicall(Nodes, emqx_delayed, do_clean_by_clientid, [ClientID], ?TIMEOUT). diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index f19d9388c..bd1de3849 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -252,21 +252,23 @@ t_banned_clean(_) -> Now = erlang:system_time(second), Who = {clientid, ClientId2}, - emqx_banned:create(#{ - who => Who, - by => <<"test">>, - reason => <<"test">>, - at => Now, - until => Now + 120, - clean => true - }), + try + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120, + clean => true + }), - timer:sleep(500), + timer:sleep(500), - ?assertMatch(#{meta := #{count := 2}}, emqx_delayed:list(#{page => 1, limit => 10})), - - emqx_banned:delete(Who), - emqx_delayed:clean_by_clientid(ClientId1), + ?assertMatch(#{meta := #{count := 2}}, emqx_delayed:list(#{page => 1, limit => 10})) + after + emqx_banned:delete(Who), + emqx_delayed:clean_by_clientid(ClientId1) + end, timer:sleep(500), ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C2).