diff --git a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl index b7068f4df..b51d29291 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl @@ -23,7 +23,7 @@ -ifdef(TEST). -export([ compile/1 - , match_and_rewrite/2 + , match_and_rewrite/3 ]). -endif. @@ -49,14 +49,17 @@ load(RawRules) -> emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}). -rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> - {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. +rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) -> + Binds = fill_client_binds(ClientInfo), + {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. -rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> - {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. +rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) -> + Binds = fill_client_binds(ClientInfo), + {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> - {ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}. + Binds = fill_client_binds(Message), + {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}. unload(_) -> emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), @@ -80,16 +83,16 @@ compile(Rules) -> end || {rewrite, sub, Topic, Re, Dest}<- Rules ], {PubRules, SubRules}. -match_and_rewrite(Topic, []) -> +match_and_rewrite(Topic, [], _) -> Topic; -match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) -> +match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules], Binds) -> case emqx_topic:match(Topic, Filter) of - true -> rewrite(Topic, MP, Dest); - false -> match_and_rewrite(Topic, Rules) + true -> rewrite(Topic, MP, Dest, Binds); + false -> match_and_rewrite(Topic, Rules, Binds) end. -rewrite(Topic, MP, Dest) -> +rewrite(Topic, MP, Dest, Binds) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> Vars = lists:zip(["\\$" ++ integer_to_list(I) @@ -97,7 +100,21 @@ rewrite(Topic, MP, Dest) -> iolist_to_binary(lists:foldl( fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) - end, Dest, Vars)); + end, Dest, Binds ++ Vars)); nomatch -> Topic end. +fill_client_binds(#{clientid := ClientId, username := Username}) -> + filter_client_binds([{"%c", ClientId}, {"%u", Username}]); + +fill_client_binds(#message{from = ClientId, headers = Headers}) -> + Username = maps:get(username, Headers, undefined), + filter_client_binds([{"%c", ClientId}, {"%u", Username}]). + +filter_client_binds(Binds) -> + lists:filter(fun({_, undefined}) -> false; + ({_, <<"">>}) -> false; + ({_, ""}) -> false; + (_) -> true + end, + Binds). diff --git a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl index 466f4a3f8..44574344b 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl @@ -23,7 +23,11 @@ -include_lib("eunit/include/eunit.hrl"). -define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>}, - {rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>} + {rewrite, pub, <<"name/#">>,<<"^name/(.+)$">>,<<"pub/%u/$1">>}, + {rewrite, pub, <<"c/#">>,<<"^c/(.+)$">>,<<"pub/%c/$1">>}, + {rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}, + {rewrite, sub, <<"name/#">>,<<"^name/(.+)$">>,<<"sub/%u/$1">>}, + {rewrite, sub, <<"c/#">>,<<"^c/(.+)$">>,<<"sub/%c/$1">>} ]). all() -> emqx_ct:all(?MODULE). @@ -41,16 +45,18 @@ end_per_suite(_Config) -> %% Test case for emqx_mod_write t_mod_rewrite(_Config) -> ok = emqx_mod_rewrite:load(?RULES), - {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), + {ok, C} = emqtt:start_link([{clientid, <<"c1">>}, {username , <<"u1">>}]), {ok, _} = emqtt:connect(C), - PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], - PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], - SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>], - SubDestTopics = [<<"y/z/b">>, <<"y/def">>], + + PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"name/1">>, <<"c/1">>], + PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"pub/u1/1">>, <<"pub/c1/1">>], + SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>, <<"name/1">>, <<"c/1">>], + SubDestTopics = [<<"y/z/b">>, <<"y/def">>, <<"sub/u1/1">>, <<"sub/c1/1">>], + %% Sub Rules {ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]), timer:sleep(100), - Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>), + Subscriptions = emqx_broker:subscriptions(<<"c1">>), ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), RecvTopics1 = [begin ok = emqtt:publish(C, Topic, <<"payload">>), @@ -60,7 +66,8 @@ t_mod_rewrite(_Config) -> ?assertEqual(SubDestTopics, RecvTopics1), {ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics), timer:sleep(100), - ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)), + ?assertEqual([], emqx_broker:subscriptions(<<"c1">>)), + %% Pub Rules {ok, _, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), RecvTopics2 = [begin @@ -76,10 +83,10 @@ t_mod_rewrite(_Config) -> t_rewrite_rule(_Config) -> {PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES), - ?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), - ?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), - ?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), - ?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). + ?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])), + ?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])), + ?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])), + ?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])). %%-------------------------------------------------------------------- %% Internal functions