diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 3bafc6a1d..84e49c5be 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[Retainer]"). @@ -74,11 +75,12 @@ on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}) -> %% @private dispatch(Pid, Topic) -> - Msgs = case emqx_topic:wildcard(Topic) of - false -> read_messages(Topic); - true -> match_messages(Topic) - end, + MsgsT = case emqx_topic:wildcard(Topic) of + false -> read_messages(Topic); + true -> match_messages(Topic) + end, Now = erlang:system_time(millisecond), + Msgs = drop_banned_messages(MsgsT), [Pid ! {deliver, Topic, refresh_timestamp_expiry(Msg, Now)} || Msg <- sort_retained(Msgs)]. %% RETAIN flag set to 1 and payload containing zero bytes @@ -332,3 +334,21 @@ refresh_timestamp_expiry(Msg = #message{headers = refresh_timestamp_expiry(Msg, Now) -> Msg#message{timestamp = Now}. + +drop_banned_messages(Msgs) -> + lists:filter(fun(Msg) -> + case emqx_banned:look_up({clientid, Msg#message.from}) of + [] -> + true; + _ -> + ?tp( + notice, + ignore_retained_message_deliver, + #{ + reason => "client is banned", + clientid => Msg#message.from + } + ), + false + end + end, Msgs). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 787c49bcb..6e2f7b1ca 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_ct:all(?MODULE). @@ -189,6 +190,53 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). +t_deliver_when_banned(_) -> + Client1 = <<"c1">>, + Client2 = <<"c2">>, + + {ok, C1} = emqtt:start_link([{clientid, Client1}, {clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + + lists:foreach( + fun(I) -> + Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])), + Msg = emqx_message:make(Client2, 0, Topic, <<"this is a retained message">>), + Msg2 = emqx_message:set_flag(retain, Msg), + emqx:publish(Msg2) + end, + lists:seq(1, 3) + ), + + Now = erlang:system_time(second), + Who = {clientid, Client2}, + + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120 + }), + timer:sleep(100), + + snabbkaffe:start_trace(), + + {ok, SubRef} = + snabbkaffe_collector:subscribe(?match_event(#{?snk_kind := ignore_retained_message_deliver}), + _NEvents = 3, + _Timeout = 10000, + 0), + + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]), + {ok, Trace} = snabbkaffe_collector:receive_events(SubRef), + ?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))), + + snabbkaffe:stop(), + + emqx_banned:delete(Who), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), + ok = emqtt:disconnect(C1). + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------