Merge pull request #9359 from JimMoen/fix-topic-rewrite-wildcard
Fix topic rewrite wildcard
This commit is contained in:
commit
e9490654ae
|
@ -0,0 +1,7 @@
|
||||||
|
# v4.3.23
|
||||||
|
|
||||||
|
## Enhancements
|
||||||
|
|
||||||
|
- Added topic validation for `emqx_mod_rewrite`. The dest topics contains wildcards are not allowed to publish [#9359](https://github.com/emqx/emqx/issues/9359).
|
||||||
|
|
||||||
|
## Bug fixes
|
|
@ -0,0 +1,7 @@
|
||||||
|
# v4.3.23
|
||||||
|
|
||||||
|
## 增强
|
||||||
|
|
||||||
|
- 为主题重写模块增加主题合法性检查,带有通配符的目标主题不允许被发布 [#9359](https://github.com/emqx/emqx/issues/9359)。
|
||||||
|
|
||||||
|
## 修复
|
|
@ -23,7 +23,7 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([ compile/1
|
-export([ compile_rules/1
|
||||||
, match_and_rewrite/3
|
, match_and_rewrite/3
|
||||||
]).
|
]).
|
||||||
-endif.
|
-endif.
|
||||||
|
@ -40,13 +40,14 @@
|
||||||
, description/0
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-type(topic() :: binary()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Load/Unload
|
%% Load/Unload
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
load(RawRules) ->
|
load(RawRules) ->
|
||||||
{PubRules, SubRules} = compile(RawRules),
|
{PubRules, SubRules} = compile_rules(RawRules),
|
||||||
log_start(RawRules),
|
|
||||||
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
|
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
|
||||||
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
|
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
|
||||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
|
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
|
||||||
|
@ -75,30 +76,59 @@ description() ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
log_start(Rules) ->
|
compile_rules(RawRules) ->
|
||||||
PubRules = [{pub, Topic, Re, Dest} || {rewrite, pub, Topic, Re, Dest} <- Rules],
|
compile(validate_rules(RawRules)).
|
||||||
SubRules = [{sub, Topic, Re, Dest} || {rewrite, sub, Topic, Re, Dest} <- Rules],
|
|
||||||
|
compile({PubRules, SubRules}) ->
|
||||||
|
CompileRE =
|
||||||
|
fun({rewrite, RewriteFrom, Re, RewriteTo}) ->
|
||||||
|
{ok, MP} = re:compile(Re),
|
||||||
|
{rewrite, RewriteFrom, MP, RewriteTo}
|
||||||
|
end,
|
||||||
|
{lists:map(CompileRE, PubRules), lists:map(CompileRE, SubRules)}.
|
||||||
|
|
||||||
|
validate_rules(Rules) ->
|
||||||
|
PubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
|
||||||
|
{rewrite, pub, RewriteFrom, Re, RewriteTo} <- Rules,
|
||||||
|
validate_rule(pub, RewriteFrom, RewriteTo)
|
||||||
|
],
|
||||||
|
SubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
|
||||||
|
{rewrite, sub, RewriteFrom, Re, RewriteTo} <- Rules,
|
||||||
|
validate_rule(sub, RewriteFrom, RewriteTo)
|
||||||
|
],
|
||||||
?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p",
|
?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p",
|
||||||
[erlang:length(PubRules), erlang:length(SubRules)]),
|
[erlang:length(PubRules), erlang:length(SubRules)]),
|
||||||
log_rule(PubRules, 1),
|
log_rules(pub, PubRules),
|
||||||
log_rule(SubRules, 1).
|
log_rules(sub, SubRules),
|
||||||
|
{PubRules, SubRules}.
|
||||||
|
|
||||||
log_rule([], _Index) -> ok;
|
validate_rule(Type, RewriteFrom, RewriteTo) ->
|
||||||
log_rule([{Type, Topic, Re, Dest} | Rules], Index) ->
|
case validate_topic(filter, RewriteFrom) of
|
||||||
|
ok ->
|
||||||
|
case validate_topic(dest_topic_type(Type), RewriteTo) of
|
||||||
|
ok ->
|
||||||
|
true;
|
||||||
|
{error, Reason} ->
|
||||||
|
log_invalid_rule(to, Type, RewriteTo, Reason),
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
log_invalid_rule(from, Type, RewriteFrom, Reason),
|
||||||
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
|
log_invalid_rule(Direction, Type, Topic, Reason) ->
|
||||||
|
?LOG(warning, "Invalid rewrite ~p rule for rewrite ~p topic '~ts' discarded. Reason: ~p",
|
||||||
|
[type_to_name(Type), Direction, Topic, Reason]).
|
||||||
|
|
||||||
|
log_rules(Type, Rules) ->
|
||||||
|
do_log_rules(Type, Rules, 1).
|
||||||
|
|
||||||
|
do_log_rules(_Type, [], _Index) -> ok;
|
||||||
|
do_log_rules(Type, [{_, Topic, Re, Dest} | Rules], Index) ->
|
||||||
?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts",
|
?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts",
|
||||||
[Type, Index, Topic, Re, Dest]),
|
[Type, Index, Topic, Re, Dest]),
|
||||||
log_rule(Rules, Index + 1).
|
do_log_rules(Type, Rules, Index + 1).
|
||||||
|
|
||||||
compile(Rules) ->
|
|
||||||
PubRules = [ begin
|
|
||||||
{ok, MP} = re:compile(Re),
|
|
||||||
{rewrite, Topic, MP, Dest}
|
|
||||||
end || {rewrite, pub, Topic, Re, Dest}<- Rules ],
|
|
||||||
SubRules = [ begin
|
|
||||||
{ok, MP} = re:compile(Re),
|
|
||||||
{rewrite, Topic, MP, Dest}
|
|
||||||
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
|
|
||||||
{PubRules, SubRules}.
|
|
||||||
|
|
||||||
match_and_rewrite(Topic, [], _) ->
|
match_and_rewrite(Topic, [], _) ->
|
||||||
Topic;
|
Topic;
|
||||||
|
@ -138,3 +168,19 @@ filter_client_binds(Binds) ->
|
||||||
(_) -> true
|
(_) -> true
|
||||||
end,
|
end,
|
||||||
Binds).
|
Binds).
|
||||||
|
|
||||||
|
type_to_name(pub) -> 'PUBLISH';
|
||||||
|
type_to_name(sub) -> 'SUBSCRIBE'.
|
||||||
|
|
||||||
|
dest_topic_type(pub) -> name;
|
||||||
|
dest_topic_type(sub) -> filter.
|
||||||
|
|
||||||
|
-spec(validate_topic(name | filter, topic()) -> ok | {error, term()}).
|
||||||
|
validate_topic(Type, Topic) ->
|
||||||
|
try
|
||||||
|
true = emqx_topic:validate(Type, Topic),
|
||||||
|
ok
|
||||||
|
catch
|
||||||
|
error:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_modules,
|
{application, emqx_modules,
|
||||||
[{description, "EMQ X Module Management"},
|
[{description, "EMQ X Module Management"},
|
||||||
{vsn, "4.3.10"},
|
{vsn, "4.3.11"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{applications, [kernel,stdlib]},
|
{applications, [kernel,stdlib]},
|
||||||
{mod, {emqx_modules_app, []}},
|
{mod, {emqx_modules_app, []}},
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.9",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
[{"4.3.10",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.9",
|
||||||
|
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[5-7]">>,
|
{<<"4\\.3\\.[5-7]">>,
|
||||||
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
||||||
|
@ -35,9 +39,13 @@
|
||||||
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.9",[{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
[{"4.3.10",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.9",
|
||||||
|
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[5-7]">>,
|
{<<"4\\.3\\.[5-7]">>,
|
||||||
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -30,6 +30,13 @@
|
||||||
{rewrite, sub, <<"c/#">>,<<"^c/(.+)$">>,<<"sub/%c/$1">>}
|
{rewrite, sub, <<"c/#">>,<<"^c/(.+)$">>,<<"sub/%c/$1">>}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(BAD_RULES_1, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/+">>}]).
|
||||||
|
|
||||||
|
%% empty topic filter/name won't be ranched cased `emqx.conf` will be checked before emqx started
|
||||||
|
%% but we need this check for emqx-ee modules api
|
||||||
|
-define(BAD_RULES_2, [{rewrite, pub, <<"">>,<<"^x/y/(.+)$">>,<<"z/y/+">>}]).
|
||||||
|
-define(BAD_RULES_3, [{rewrite, pub, <<"name/#">>,<<"^name/(.+)$">>,<<"">>}]).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -82,12 +89,30 @@ t_mod_rewrite(_Config) ->
|
||||||
ok = emqx_mod_rewrite:unload(?RULES).
|
ok = emqx_mod_rewrite:unload(?RULES).
|
||||||
|
|
||||||
t_rewrite_rule(_Config) ->
|
t_rewrite_rule(_Config) ->
|
||||||
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
|
{PubRules, SubRules} = emqx_mod_rewrite:compile_rules(?RULES),
|
||||||
|
%% assert ordering
|
||||||
|
?assertMatch([{rewrite, <<"x/#">>, _, <<"z/y/$1">>},
|
||||||
|
{rewrite, <<"name/#">>, _, <<"pub/%u/$1">>},
|
||||||
|
{rewrite, <<"c/#">>, _, <<"pub/%c/$1">>}],
|
||||||
|
PubRules),
|
||||||
|
?assertMatch([{rewrite, <<"y/+/z/#">>, _, <<"y/z/$2">>},
|
||||||
|
{rewrite, <<"name/#">>, _, <<"sub/%u/$1">>},
|
||||||
|
{rewrite, <<"c/#">>, _, <<"sub/%c/$1">>}],
|
||||||
|
SubRules),
|
||||||
|
|
||||||
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
|
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
|
||||||
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
|
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
|
||||||
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),
|
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),
|
||||||
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])).
|
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])).
|
||||||
|
|
||||||
|
t_rewrite_bad_rule_1(_Config) ->
|
||||||
|
?assertEqual({[], []}, emqx_mod_rewrite:compile_rules(?BAD_RULES_1)).
|
||||||
|
|
||||||
|
t_rewrite_bad_rule_2(_Config) ->
|
||||||
|
?assertEqual({[], []}, emqx_mod_rewrite:compile_rules(?BAD_RULES_2)).
|
||||||
|
|
||||||
|
t_rewrite_bad_rule_3(_Config) ->
|
||||||
|
?assertEqual({[], []}, emqx_mod_rewrite:compile_rules(?BAD_RULES_3)).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue