From dd7d4224cec928aeb62b6be2d9efd2eafd3620e7 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 8 Nov 2022 16:32:24 +0800 Subject: [PATCH] feat(delayed): check if the source client is banned when publishing a delayed message --- apps/emqx_modules/src/emqx_delayed.erl | 19 ++++++++-- apps/emqx_modules/test/emqx_delayed_SUITE.erl | 36 ++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 76646bc64..f511d74d9 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -373,8 +373,23 @@ do_publish({Ts, _Id}, Now, Acc) when Ts > Now -> Acc; do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> case mnesia:dirty_read(?TAB, Key) of - [] -> ok; - [#delayed_message{msg = Msg}] -> emqx_pool:async_submit(fun emqx:publish/1, [Msg]) + [] -> + ok; + [#delayed_message{msg = Msg}] -> + case emqx_banned:look_up({clientid, Msg#message.from}) of + [] -> + emqx_pool:async_submit(fun emqx:publish/1, [Msg]); + _ -> + ?tp( + notice, + ignore_delayed_message_publish, + #{ + reason => "client is banned", + clienid => Msg#message.from + } + ), + ok + end end, do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]). diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index e5e3db98c..5864646ad 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -26,6 +26,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %%-------------------------------------------------------------------- %% Setups @@ -36,7 +37,8 @@ }). all() -> - emqx_common_test_helpers:all(?MODULE). + [t_banned_delayed]. +%% emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{ @@ -212,6 +214,38 @@ t_delayed_precision(_) -> _ = on_message_publish(DelayedMsg0), ?assert(FutureDiff() =< MaxSpan). +t_banned_delayed(_) -> + emqx:update_config([delayed, max_delayed_messages], 10000), + ClientId1 = <<"bc1">>, + ClientId2 = <<"bc2">>, + + Now = erlang:system_time(second), + Who = {clientid, ClientId2}, + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120 + }), + + snabbkaffe:start_trace(), + lists:foreach( + fun(ClientId) -> + Msg = emqx_message:make(ClientId, <<"$delayed/1/bc">>, <<"payload">>), + emqx_delayed:on_message_publish(Msg) + end, + [ClientId1, ClientId1, ClientId1, ClientId2, ClientId2] + ), + + timer:sleep(2000), + Trace = snabbkaffe:collect_trace(), + snabbkaffe:stop(), + emqx_banned:delete(Who), + mnesia:clear_table(emqx_delayed), + + ?assertEqual(2, length(?of_kind(ignore_delayed_message_publish, Trace))). + subscribe_proc() -> Self = self(), Ref = erlang:make_ref(),