fix(rewrite): add topic rewrite validation for filter or name

This commit is contained in:
JimMoen 2022-11-14 09:38:18 +08:00
parent 60c8db5905
commit 1bb31776c9
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
4 changed files with 83 additions and 23 deletions

7
changes/v4.3.23-en.md Normal file
View File

@ -0,0 +1,7 @@
# v4.3.23
## Enhancements
- Added topic validation for `emqx_mod_rewrite`. The dest topics contains wildcards are not allowed to publish [#9359](https://github.com/emqx/emqx/issues/9359).
## Bug fixes

7
changes/v4.3.23-zh.md Normal file
View File

@ -0,0 +1,7 @@
# v4.3.23
## 增强
- 为主题重写模块增加主题合法性检查,带有通配符的目标主题不允许被发布 [#9359](https://github.com/emqx/emqx/issues/9359)。
## 修复

View File

@ -23,7 +23,7 @@
-include_lib("emqx/include/logger.hrl").
-ifdef(TEST).
-export([ compile/1
-export([ compile_rules/1
, match_and_rewrite/3
]).
-endif.
@ -40,13 +40,14 @@
, description/0
]).
-type(topic() :: binary()).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
load(RawRules) ->
{PubRules, SubRules} = compile(RawRules),
log_start(RawRules),
{PubRules, SubRules} = compile_rules(RawRules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
@ -75,30 +76,59 @@ description() ->
%% Internal functions
%%--------------------------------------------------------------------
log_start(Rules) ->
PubRules = [{pub, Topic, Re, Dest} || {rewrite, pub, Topic, Re, Dest} <- Rules],
SubRules = [{sub, Topic, Re, Dest} || {rewrite, sub, Topic, Re, Dest} <- Rules],
compile_rules(RawRules) ->
compile(validate_rules(RawRules)).
compile({PubRules, SubRules}) ->
CompileRE =
fun({rewrite, RewriteFrom, Re, RewriteTo}) ->
{ok, MP} = re:compile(Re),
{rewrite, RewriteFrom, MP, RewriteTo}
end,
{lists:map(CompileRE, PubRules), lists:map(CompileRE, SubRules)}.
validate_rules(Rules) ->
PubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
{rewrite, pub, RewriteFrom, Re, RewriteTo} <- Rules,
validate_rule(pub, RewriteFrom, RewriteTo)
],
SubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
{rewrite, sub, RewriteFrom, Re, RewriteTo} <- Rules,
validate_rule(sub, RewriteFrom, RewriteTo)
],
?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p",
[erlang:length(PubRules), erlang:length(SubRules)]),
log_rule(PubRules, 1),
log_rule(SubRules, 1).
log_rules(pub, PubRules),
log_rules(sub, SubRules),
{PubRules, SubRules}.
log_rule([], _Index) -> ok;
log_rule([{Type, Topic, Re, Dest} | Rules], Index) ->
validate_rule(Type, RewriteFrom, RewriteTo) ->
case validate_topic(filter, RewriteFrom) of
ok ->
case validate_topic(dest_topic_type(Type), RewriteTo) of
ok ->
true;
{error, Reason} ->
log_invalid_rule(to, Type, RewriteTo, Reason),
false
end;
{error, Reason} ->
log_invalid_rule(from, Type, RewriteFrom, Reason),
false
end.
log_invalid_rule(Direction, Type, Topic, Reason) ->
?LOG(warning, "Invalid rewrite ~p rule for rewrite ~p topic '~ts' discarded. Reason: ~p",
[type_to_name(Type), Direction, Topic, Reason]).
log_rules(Type, Rules) ->
do_log_rules(Type, Rules, 1).
do_log_rules(_Type, [], _Index) -> ok;
do_log_rules(Type, [{_, Topic, Re, Dest} | Rules], Index) ->
?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts",
[Type, Index, Topic, Re, Dest]),
log_rule(Rules, Index + 1).
compile(Rules) ->
PubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, pub, Topic, Re, Dest}<- Rules ],
SubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
{PubRules, SubRules}.
do_log_rules(Type, Rules, Index + 1).
match_and_rewrite(Topic, [], _) ->
Topic;
@ -138,3 +168,19 @@ filter_client_binds(Binds) ->
(_) -> true
end,
Binds).
type_to_name(pub) -> 'PUBLISH';
type_to_name(sub) -> 'SUBSCRIBE'.
dest_topic_type(pub) -> name;
dest_topic_type(sub) -> filter.
-spec(validate_topic(name | filter, topic()) -> ok | {error, term()}).
validate_topic(Type, Topic) ->
try
true = emqx_topic:validate(Type, Topic),
ok
catch
error:Reason ->
{error, Reason}
end.

View File

@ -82,7 +82,7 @@ t_mod_rewrite(_Config) ->
ok = emqx_mod_rewrite:unload(?RULES).
t_rewrite_rule(_Config) ->
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
{PubRules, SubRules} = emqx_mod_rewrite:compile_rules(?RULES),
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),