Merge pull request #8067 from lafirest/fix/topic_rewarite
feat(rewrite): Support %u and %c placeholders in topic rewrite rules
This commit is contained in:
commit
2aa164e36a
|
@ -23,7 +23,7 @@
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([ compile/1
|
-export([ compile/1
|
||||||
, match_and_rewrite/2
|
, match_and_rewrite/3
|
||||||
]).
|
]).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
@ -49,14 +49,17 @@ load(RawRules) ->
|
||||||
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
|
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
|
||||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
|
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
|
||||||
|
|
||||||
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
|
rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
|
||||||
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
Binds = fill_client_binds(ClientInfo),
|
||||||
|
{ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
|
||||||
|
|
||||||
rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
|
rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
|
||||||
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
Binds = fill_client_binds(ClientInfo),
|
||||||
|
{ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
|
||||||
|
|
||||||
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
|
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
|
||||||
{ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}.
|
Binds = fill_client_binds(Message),
|
||||||
|
{ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}.
|
||||||
|
|
||||||
unload(_) ->
|
unload(_) ->
|
||||||
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
|
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
|
||||||
|
@ -80,16 +83,16 @@ compile(Rules) ->
|
||||||
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
|
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
|
||||||
{PubRules, SubRules}.
|
{PubRules, SubRules}.
|
||||||
|
|
||||||
match_and_rewrite(Topic, []) ->
|
match_and_rewrite(Topic, [], _) ->
|
||||||
Topic;
|
Topic;
|
||||||
|
|
||||||
match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) ->
|
match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules], Binds) ->
|
||||||
case emqx_topic:match(Topic, Filter) of
|
case emqx_topic:match(Topic, Filter) of
|
||||||
true -> rewrite(Topic, MP, Dest);
|
true -> rewrite(Topic, MP, Dest, Binds);
|
||||||
false -> match_and_rewrite(Topic, Rules)
|
false -> match_and_rewrite(Topic, Rules, Binds)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
rewrite(Topic, MP, Dest) ->
|
rewrite(Topic, MP, Dest, Binds) ->
|
||||||
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
||||||
{match, Captured} ->
|
{match, Captured} ->
|
||||||
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|
||||||
|
@ -97,7 +100,21 @@ rewrite(Topic, MP, Dest) ->
|
||||||
iolist_to_binary(lists:foldl(
|
iolist_to_binary(lists:foldl(
|
||||||
fun({Var, Val}, Acc) ->
|
fun({Var, Val}, Acc) ->
|
||||||
re:replace(Acc, Var, Val, [global])
|
re:replace(Acc, Var, Val, [global])
|
||||||
end, Dest, Vars));
|
end, Dest, Binds ++ Vars));
|
||||||
nomatch -> Topic
|
nomatch -> Topic
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
fill_client_binds(#{clientid := ClientId, username := Username}) ->
|
||||||
|
filter_client_binds([{"%c", ClientId}, {"%u", Username}]);
|
||||||
|
|
||||||
|
fill_client_binds(#message{from = ClientId, headers = Headers}) ->
|
||||||
|
Username = maps:get(username, Headers, undefined),
|
||||||
|
filter_client_binds([{"%c", ClientId}, {"%u", Username}]).
|
||||||
|
|
||||||
|
filter_client_binds(Binds) ->
|
||||||
|
lists:filter(fun({_, undefined}) -> false;
|
||||||
|
({_, <<"">>}) -> false;
|
||||||
|
({_, ""}) -> false;
|
||||||
|
(_) -> true
|
||||||
|
end,
|
||||||
|
Binds).
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_modules,
|
{application, emqx_modules,
|
||||||
[{description, "EMQ X Module Management"},
|
[{description, "EMQ X Module Management"},
|
||||||
{vsn, "4.3.6"},
|
{vsn, "4.3.7"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{applications, [kernel,stdlib]},
|
{applications, [kernel,stdlib]},
|
||||||
{mod, {emqx_modules_app, []}},
|
{mod, {emqx_modules_app, []}},
|
||||||
|
|
|
@ -1,49 +1,63 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.5",[{load_module,emqx_modules,brutal_purge,soft_purge,[]}]},
|
[{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.5",
|
||||||
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[2-3]">>,
|
{<<"4\\.3\\.[2-3]">>,
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_mod_delayed,{advanced,[]}},
|
{update,emqx_mod_delayed,{advanced,[]}},
|
||||||
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.5",[{load_module,emqx_modules,brutal_purge,soft_purge,[]}]},
|
[{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.5",
|
||||||
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[2-3]">>,
|
{<<"4\\.3\\.[2-3]">>,
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_mod_delayed,{advanced,[]}},
|
{update,emqx_mod_delayed,{advanced,[]}},
|
||||||
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}]}.
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -23,7 +23,11 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
|
-define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
|
||||||
{rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
|
{rewrite, pub, <<"name/#">>,<<"^name/(.+)$">>,<<"pub/%u/$1">>},
|
||||||
|
{rewrite, pub, <<"c/#">>,<<"^c/(.+)$">>,<<"pub/%c/$1">>},
|
||||||
|
{rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>},
|
||||||
|
{rewrite, sub, <<"name/#">>,<<"^name/(.+)$">>,<<"sub/%u/$1">>},
|
||||||
|
{rewrite, sub, <<"c/#">>,<<"^c/(.+)$">>,<<"sub/%c/$1">>}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
@ -41,16 +45,18 @@ end_per_suite(_Config) ->
|
||||||
%% Test case for emqx_mod_write
|
%% Test case for emqx_mod_write
|
||||||
t_mod_rewrite(_Config) ->
|
t_mod_rewrite(_Config) ->
|
||||||
ok = emqx_mod_rewrite:load(?RULES),
|
ok = emqx_mod_rewrite:load(?RULES),
|
||||||
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
|
{ok, C} = emqtt:start_link([{clientid, <<"c1">>}, {username , <<"u1">>}]),
|
||||||
{ok, _} = emqtt:connect(C),
|
{ok, _} = emqtt:connect(C),
|
||||||
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
|
|
||||||
PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>],
|
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"name/1">>, <<"c/1">>],
|
||||||
SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>],
|
PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"pub/u1/1">>, <<"pub/c1/1">>],
|
||||||
SubDestTopics = [<<"y/z/b">>, <<"y/def">>],
|
SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>, <<"name/1">>, <<"c/1">>],
|
||||||
|
SubDestTopics = [<<"y/z/b">>, <<"y/def">>, <<"sub/u1/1">>, <<"sub/c1/1">>],
|
||||||
|
|
||||||
%% Sub Rules
|
%% Sub Rules
|
||||||
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
|
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
|
Subscriptions = emqx_broker:subscriptions(<<"c1">>),
|
||||||
?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
|
?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
|
||||||
RecvTopics1 = [begin
|
RecvTopics1 = [begin
|
||||||
ok = emqtt:publish(C, Topic, <<"payload">>),
|
ok = emqtt:publish(C, Topic, <<"payload">>),
|
||||||
|
@ -60,7 +66,8 @@ t_mod_rewrite(_Config) ->
|
||||||
?assertEqual(SubDestTopics, RecvTopics1),
|
?assertEqual(SubDestTopics, RecvTopics1),
|
||||||
{ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics),
|
{ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
|
?assertEqual([], emqx_broker:subscriptions(<<"c1">>)),
|
||||||
|
|
||||||
%% Pub Rules
|
%% Pub Rules
|
||||||
{ok, _, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
|
{ok, _, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
|
||||||
RecvTopics2 = [begin
|
RecvTopics2 = [begin
|
||||||
|
@ -76,10 +83,10 @@ t_mod_rewrite(_Config) ->
|
||||||
|
|
||||||
t_rewrite_rule(_Config) ->
|
t_rewrite_rule(_Config) ->
|
||||||
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
|
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
|
||||||
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
|
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
|
||||||
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
|
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
|
||||||
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),
|
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),
|
||||||
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
|
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
|
Loading…
Reference in New Issue