feat: filter out messages which the source client is banned when delivering the retained message
This commit is contained in:
parent
9d6efeaea8
commit
7fd7cfc665
|
@ -22,6 +22,7 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-logger_header("[Retainer]").
|
-logger_header("[Retainer]").
|
||||||
|
|
||||||
|
@ -74,11 +75,12 @@ on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}) ->
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
dispatch(Pid, Topic) ->
|
dispatch(Pid, Topic) ->
|
||||||
Msgs = case emqx_topic:wildcard(Topic) of
|
MsgsT = case emqx_topic:wildcard(Topic) of
|
||||||
false -> read_messages(Topic);
|
false -> read_messages(Topic);
|
||||||
true -> match_messages(Topic)
|
true -> match_messages(Topic)
|
||||||
end,
|
end,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
|
Msgs = drop_banned_messages(MsgsT),
|
||||||
[Pid ! {deliver, Topic, refresh_timestamp_expiry(Msg, Now)} || Msg <- sort_retained(Msgs)].
|
[Pid ! {deliver, Topic, refresh_timestamp_expiry(Msg, Now)} || Msg <- sort_retained(Msgs)].
|
||||||
|
|
||||||
%% RETAIN flag set to 1 and payload containing zero bytes
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
||||||
|
@ -332,3 +334,21 @@ refresh_timestamp_expiry(Msg = #message{headers =
|
||||||
|
|
||||||
refresh_timestamp_expiry(Msg, Now) ->
|
refresh_timestamp_expiry(Msg, Now) ->
|
||||||
Msg#message{timestamp = Now}.
|
Msg#message{timestamp = Now}.
|
||||||
|
|
||||||
|
drop_banned_messages(Msgs) ->
|
||||||
|
lists:filter(fun(Msg) ->
|
||||||
|
case emqx_banned:look_up({clientid, Msg#message.from}) of
|
||||||
|
[] ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
?tp(
|
||||||
|
notice,
|
||||||
|
ignore_retained_message_deliver,
|
||||||
|
#{
|
||||||
|
reason => "client is banned",
|
||||||
|
clientid => Msg#message.from
|
||||||
|
}
|
||||||
|
),
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end, Msgs).
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
@ -189,6 +190,53 @@ t_stop_publish_clear_msg(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_deliver_when_banned(_) ->
|
||||||
|
Client1 = <<"c1">>,
|
||||||
|
Client2 = <<"c2">>,
|
||||||
|
|
||||||
|
{ok, C1} = emqtt:start_link([{clientid, Client1}, {clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
|
lists:foreach(
|
||||||
|
fun(I) ->
|
||||||
|
Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])),
|
||||||
|
Msg = emqx_message:make(Client2, 0, Topic, <<"this is a retained message">>),
|
||||||
|
Msg2 = emqx_message:set_flag(retain, Msg),
|
||||||
|
emqx:publish(Msg2)
|
||||||
|
end,
|
||||||
|
lists:seq(1, 3)
|
||||||
|
),
|
||||||
|
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Who = {clientid, Client2},
|
||||||
|
|
||||||
|
emqx_banned:create(#{
|
||||||
|
who => Who,
|
||||||
|
by => <<"test">>,
|
||||||
|
reason => <<"test">>,
|
||||||
|
at => Now,
|
||||||
|
until => Now + 120
|
||||||
|
}),
|
||||||
|
timer:sleep(100),
|
||||||
|
|
||||||
|
snabbkaffe:start_trace(),
|
||||||
|
|
||||||
|
{ok, SubRef} =
|
||||||
|
snabbkaffe_collector:subscribe(?match_event(#{?snk_kind := ignore_retained_message_deliver}),
|
||||||
|
_NEvents = 3,
|
||||||
|
_Timeout = 10000,
|
||||||
|
0),
|
||||||
|
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
{ok, Trace} = snabbkaffe_collector:receive_events(SubRef),
|
||||||
|
?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
|
||||||
|
|
||||||
|
snabbkaffe:stop(),
|
||||||
|
|
||||||
|
emqx_banned:delete(Who),
|
||||||
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue