fix(banned): verify delayed/taken over/retained messages against clientid_re rules
This commit is contained in:
parent
e67b078775
commit
78a87ab5a6
|
@ -31,6 +31,7 @@
|
|||
|
||||
-export([
|
||||
check/1,
|
||||
check_clientid/1,
|
||||
create/1,
|
||||
look_up/1,
|
||||
delete/1,
|
||||
|
@ -114,6 +115,10 @@ check(ClientInfo) ->
|
|||
do_check({peerhost, maps:get(peerhost, ClientInfo, undefined)}) orelse
|
||||
do_check_rules(ClientInfo).
|
||||
|
||||
-spec check_clientid(emqx_types:clientid()) -> boolean().
|
||||
check_clientid(ClientId) ->
|
||||
do_check({clientid, ClientId}) orelse do_check_rules(#{clientid => ClientId}).
|
||||
|
||||
-spec format(emqx_types:banned()) -> map().
|
||||
format(#banned{
|
||||
who = Who0,
|
||||
|
|
|
@ -601,7 +601,7 @@ should_keep(MsgDeliver) ->
|
|||
not is_banned_msg(MsgDeliver).
|
||||
|
||||
is_banned_msg(#message{from = ClientId}) ->
|
||||
[] =/= emqx_banned:look_up({clientid, ClientId}).
|
||||
emqx_banned:check_clientid(ClientId).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -112,6 +112,10 @@ t_check(_) ->
|
|||
?assertNot(emqx_banned:check(ClientInfoValidFull)),
|
||||
?assertNot(emqx_banned:check(ClientInfoValidEmpty)),
|
||||
?assertNot(emqx_banned:check(ClientInfoValidOnlyClientId)),
|
||||
|
||||
?assert(emqx_banned:check_clientid(<<"BannedClient">>)),
|
||||
?assert(emqx_banned:check_clientid(<<"BannedClientRE">>)),
|
||||
|
||||
ok = emqx_banned:delete(emqx_banned:who(clientid, <<"BannedClient">>)),
|
||||
ok = emqx_banned:delete(emqx_banned:who(username, <<"BannedUser">>)),
|
||||
ok = emqx_banned:delete(emqx_banned:who(peerhost, {192, 168, 0, 1})),
|
||||
|
@ -127,6 +131,10 @@ t_check(_) ->
|
|||
?assertNot(emqx_banned:check(ClientInfoBannedUsernameRE)),
|
||||
?assertNot(emqx_banned:check(ClientInfoBannedAddrNet)),
|
||||
?assertNot(emqx_banned:check(ClientInfoValidFull)),
|
||||
|
||||
?assertNot(emqx_banned:check_clientid(<<"BannedClient">>)),
|
||||
?assertNot(emqx_banned:check_clientid(<<"BannedClientRE">>)),
|
||||
|
||||
?assertEqual(0, emqx_banned:info(size)).
|
||||
|
||||
t_unused(_) ->
|
||||
|
|
|
@ -461,10 +461,10 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
|
|||
[] ->
|
||||
ok;
|
||||
[#delayed_message{msg = Msg}] ->
|
||||
case emqx_banned:look_up({clientid, Msg#message.from}) of
|
||||
[] ->
|
||||
case emqx_banned:check_clientid(Msg#message.from) of
|
||||
false ->
|
||||
emqx_pool:async_submit(fun emqx:publish/1, [Msg]);
|
||||
_ ->
|
||||
true ->
|
||||
?tp(
|
||||
notice,
|
||||
ignore_delayed_message_publish,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_modules, [
|
||||
{description, "EMQX Modules"},
|
||||
{vsn, "5.0.26"},
|
||||
{vsn, "5.0.27"},
|
||||
{modules, []},
|
||||
{applications, [kernel, stdlib, emqx, emqx_ctl, observer_cli]},
|
||||
{mod, {emqx_modules_app, []}},
|
||||
|
|
|
@ -215,8 +215,10 @@ t_banned_delayed(_) ->
|
|||
emqx:update_config([delayed, max_delayed_messages], 10000),
|
||||
ClientId1 = <<"bc1">>,
|
||||
ClientId2 = <<"bc2">>,
|
||||
ClientId3 = <<"bc3">>,
|
||||
|
||||
Now = erlang:system_time(second),
|
||||
|
||||
Who = emqx_banned:who(clientid, ClientId2),
|
||||
emqx_banned:create(#{
|
||||
who => Who,
|
||||
|
@ -225,12 +227,20 @@ t_banned_delayed(_) ->
|
|||
at => Now,
|
||||
until => Now + 120
|
||||
}),
|
||||
WhoRE = emqx_banned:who(clientid_re, <<"c3">>),
|
||||
emqx_banned:create(#{
|
||||
who => WhoRE,
|
||||
by => <<"test">>,
|
||||
reason => <<"test">>,
|
||||
at => Now,
|
||||
until => Now + 120
|
||||
}),
|
||||
|
||||
snabbkaffe:start_trace(),
|
||||
{ok, SubRef} =
|
||||
snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := ignore_delayed_message_publish}),
|
||||
_NEvents = 2,
|
||||
_NEvents = 4,
|
||||
_Timeout = 10000,
|
||||
0
|
||||
),
|
||||
|
@ -240,15 +250,16 @@ t_banned_delayed(_) ->
|
|||
Msg = emqx_message:make(ClientId, <<"$delayed/1/bc">>, <<"payload">>),
|
||||
emqx_delayed:on_message_publish(Msg)
|
||||
end,
|
||||
[ClientId1, ClientId1, ClientId1, ClientId2, ClientId2]
|
||||
[ClientId1, ClientId1, ClientId1, ClientId2, ClientId2, ClientId3, ClientId3]
|
||||
),
|
||||
|
||||
{ok, Trace} = snabbkaffe:receive_events(SubRef),
|
||||
snabbkaffe:stop(),
|
||||
emqx_banned:delete(Who),
|
||||
emqx_banned:delete(WhoRE),
|
||||
mnesia:clear_table(emqx_delayed),
|
||||
|
||||
?assertEqual(2, length(?of_kind(ignore_delayed_message_publish, Trace))).
|
||||
?assertEqual(4, length(?of_kind(ignore_delayed_message_publish, Trace))).
|
||||
|
||||
subscribe_proc() ->
|
||||
Self = self(),
|
||||
|
|
|
@ -336,10 +336,10 @@ deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
|
|||
|
||||
deliver_to_client([Msg | T], Pid, Topic) ->
|
||||
_ =
|
||||
case emqx_banned:look_up({clientid, Msg#message.from}) of
|
||||
[] ->
|
||||
case emqx_banned:check_clientid(Msg#message.from) of
|
||||
false ->
|
||||
Pid ! {deliver, Topic, Msg};
|
||||
_ ->
|
||||
true ->
|
||||
?tp(
|
||||
notice,
|
||||
ignore_retained_message_deliver,
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
Check retained messages, delayed messages and taken over session messages against actual banned client regular expressions.
|
||||
|
||||
Previously, messages that were delayed by some reasons, could surpass actual client ID ban specified with regular expression.
|
Loading…
Reference in New Issue