diff --git a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl index 14d91f4b3..8f2a816ba 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[Delayed]"). @@ -228,7 +229,20 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> case mnesia:dirty_read(?TAB, Key) of [] -> ok; [#delayed_message{msg = Msg}] -> - emqx_pool:async_submit(fun emqx:publish/1, [Msg]) + case emqx_banned:look_up({clientid, Msg#message.from}) of + [] -> + emqx_pool:async_submit(fun emqx:publish/1, [Msg]); + _ -> + ?tp( + notice, + ignore_delayed_message_publish, + #{ + reason => "client is banned", + clienid => Msg#message.from + } + ), + ok + end end, do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key|Acc]). diff --git a/lib-ce/emqx_modules/test/emqx_mod_delayed_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_delayed_SUITE.erl index 306a1a03d..715e541f0 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_delayed_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_delayed_SUITE.erl @@ -26,6 +26,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %%-------------------------------------------------------------------- %% Setups @@ -78,3 +79,46 @@ t_delayed_message(_) -> EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed), ?assertEqual([], EmptyKey), ok = emqx_mod_delayed:unload([]). + +t_banned_delayed(_) -> + ok = emqx_mod_delayed:load([]), + ClientId1 = <<"bc1">>, + ClientId2 = <<"bc2">>, + + Now = erlang:system_time(second), + Who = {clientid, ClientId2}, + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120 + }), + + snabbkaffe:start_trace(), + + {ok, SubRef} = + snabbkaffe_collector:subscribe(?match_event(#{?snk_kind := ignore_delayed_message_publish}), + _NEvents = 2, + _Timeout = 10000, + 0), + + lists:foreach( + fun(ClientId) -> + Msg = emqx_message:make(ClientId, <<"$delayed/1/bc">>, <<"payload">>), + emqx_mod_delayed:on_message_publish(Msg) + end, + [ClientId1, ClientId1, ClientId1, ClientId2, ClientId2] + ), + + ?assertMatch({ok, [#{?snk_kind := ignore_delayed_message_publish}, + #{?snk_kind := ignore_delayed_message_publish} + ]}, + snabbkaffe_collector:receive_events(SubRef)), + + snabbkaffe:stop(), + + emqx_banned:delete(Who), + EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed), + ?assertEqual([], EmptyKey), + ok = emqx_mod_delayed:unload([]). diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 6cc1580be..ba5259c52 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -36,6 +36,7 @@ , create/1 , delete/1 , info/1 + , look_up/1 ]). %% gen_server callbacks @@ -111,6 +112,9 @@ delete(Who) -> info(InfoKey) -> mnesia:table_info(?BANNED_TAB, InfoKey). +look_up(Who) -> + mnesia:dirty_read(?BANNED_TAB, Who). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -158,4 +162,3 @@ expire_banned_items(Now) -> mnesia:delete_object(?BANNED_TAB, B, sticky_write); (_, _Acc) -> ok end, ok, ?BANNED_TAB). -