diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 5864646ad..3d1576b0b 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -37,8 +37,7 @@ }). all() -> - [t_banned_delayed]. -%% emqx_common_test_helpers:all(?MODULE). + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{ diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index f52fd982c..c4df41ca4 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -20,6 +20,7 @@ -include("emqx_retainer.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API -export([ @@ -286,7 +287,20 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> end. do_deliver([Msg | T], Pid, Topic) -> - Pid ! {deliver, Topic, Msg}, + case emqx_banned:look_up({clientid, Msg#message.from}) of + [] -> + Pid ! {deliver, Topic, Msg}, + ok; + _ -> + ?tp( + notice, + ignore_retained_message_deliver, + #{ + reason => "client is banned", + clienid => Msg#message.from + } + ) + end, do_deliver(T, Pid, Topic); do_deliver([], _, _) -> ok. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 09e6c4bb4..86eaa4255 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -639,6 +639,47 @@ test_disable_then_start(_Config) -> ?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)), ok. +t_deliver_when_banned(_) -> + ClientId = <<"c1">>, + + {ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + + lists:foreach( + fun(I) -> + Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])), + emqtt:publish( + C1, + Topic, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ) + end, + lists:seq(1, 3) + ), + + Now = erlang:system_time(second), + Who = {clientid, ClientId}, + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120 + }), + + timer:sleep(100), + snabbkaffe:start_trace(), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]), + timer:sleep(500), + + Trace = snabbkaffe:collect_trace(), + ?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))), + snabbkaffe:stop(), + emqx_banned:delete(Who), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), + ok = emqtt:disconnect(C1). + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------