feat: filter out messages which the source client is banned when delivering the retained message
This commit is contained in:
parent
155d4a9818
commit
4290847b9d
|
@ -37,8 +37,7 @@
|
||||||
}).
|
}).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[t_banned_delayed].
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
%% emqx_common_test_helpers:all(?MODULE).
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
|
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-include("emqx_retainer.hrl").
|
-include("emqx_retainer.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([
|
-export([
|
||||||
|
@ -286,7 +287,20 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_deliver([Msg | T], Pid, Topic) ->
|
do_deliver([Msg | T], Pid, Topic) ->
|
||||||
Pid ! {deliver, Topic, Msg},
|
case emqx_banned:look_up({clientid, Msg#message.from}) of
|
||||||
|
[] ->
|
||||||
|
Pid ! {deliver, Topic, Msg},
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
?tp(
|
||||||
|
notice,
|
||||||
|
ignore_retained_message_deliver,
|
||||||
|
#{
|
||||||
|
reason => "client is banned",
|
||||||
|
clienid => Msg#message.from
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end,
|
||||||
do_deliver(T, Pid, Topic);
|
do_deliver(T, Pid, Topic);
|
||||||
do_deliver([], _, _) ->
|
do_deliver([], _, _) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -639,6 +639,47 @@ test_disable_then_start(_Config) ->
|
||||||
?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
|
?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_deliver_when_banned(_) ->
|
||||||
|
ClientId = <<"c1">>,
|
||||||
|
|
||||||
|
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
|
lists:foreach(
|
||||||
|
fun(I) ->
|
||||||
|
Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])),
|
||||||
|
emqtt:publish(
|
||||||
|
C1,
|
||||||
|
Topic,
|
||||||
|
<<"this is a retained message">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
lists:seq(1, 3)
|
||||||
|
),
|
||||||
|
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Who = {clientid, ClientId},
|
||||||
|
emqx_banned:create(#{
|
||||||
|
who => Who,
|
||||||
|
by => <<"test">>,
|
||||||
|
reason => <<"test">>,
|
||||||
|
at => Now,
|
||||||
|
until => Now + 120
|
||||||
|
}),
|
||||||
|
|
||||||
|
timer:sleep(100),
|
||||||
|
snabbkaffe:start_trace(),
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
timer:sleep(500),
|
||||||
|
|
||||||
|
Trace = snabbkaffe:collect_trace(),
|
||||||
|
?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
|
||||||
|
snabbkaffe:stop(),
|
||||||
|
emqx_banned:delete(Who),
|
||||||
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue