Merge pull request #9332 from lafirest/feat/filiter_is_banned_in_retained

feat: filter out messages which the source client is banned when delivering the retained message
This commit is contained in:
lafirest 2022-11-11 18:16:53 +08:00 committed by GitHub
commit d0df9e5213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 5 deletions

View File

@ -37,8 +37,7 @@
}). }).
all() -> all() ->
[t_banned_delayed]. emqx_common_test_helpers:all(?MODULE).
%% emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{ ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.6"}, {vsn, "5.0.7"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx]}, {applications, [kernel, stdlib, emqx]},

View File

@ -20,6 +20,7 @@
-include("emqx_retainer.hrl"). -include("emqx_retainer.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API %% API
-export([ -export([
@ -286,7 +287,20 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
end. end.
do_deliver([Msg | T], Pid, Topic) -> do_deliver([Msg | T], Pid, Topic) ->
case emqx_banned:look_up({clientid, Msg#message.from}) of
[] ->
Pid ! {deliver, Topic, Msg}, Pid ! {deliver, Topic, Msg},
ok;
_ ->
?tp(
notice,
ignore_retained_message_deliver,
#{
reason => "client is banned",
clientid => Msg#message.from
}
)
end,
do_deliver(T, Pid, Topic); do_deliver(T, Pid, Topic);
do_deliver([], _, _) -> do_deliver([], _, _) ->
ok. ok.

View File

@ -639,6 +639,47 @@ test_disable_then_start(_Config) ->
?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)), ?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
ok. ok.
t_deliver_when_banned(_) ->
ClientId = <<"c1">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(
fun(I) ->
Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])),
emqtt:publish(
C1,
Topic,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]
)
end,
lists:seq(1, 3)
),
Now = erlang:system_time(second),
Who = {clientid, ClientId},
emqx_banned:create(#{
who => Who,
by => <<"test">>,
reason => <<"test">>,
at => Now,
until => Now + 120
}),
timer:sleep(100),
snabbkaffe:start_trace(),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
timer:sleep(500),
Trace = snabbkaffe:collect_trace(),
?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
snabbkaffe:stop(),
emqx_banned:delete(Who),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
ok = emqtt:disconnect(C1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -2,5 +2,8 @@
## Enhancements ## Enhancements
- Security enhancement for retained messages [#9326](https://github.com/emqx/emqx/pull/9326).
The retained messages will not be published if the publisher client is banned.
## Bug fixes ## Bug fixes

View File

@ -2,4 +2,8 @@
## 增强 ## 增强
## 修复 - 增强 `保留消息` 的安全性 [#9332](https://github.com/emqx/emqx/pull/9332)。
现在投递保留消息前,会先过滤掉来源客户端被封禁了的那些消息。
## Bug fixes