From 1d0b479d8ee7d3fdeae0333aa4c0c417cd1bc645 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 27 May 2022 17:36:07 +0800 Subject: [PATCH] feat(rewrite): Support %u and %c placeholders in topic rewrite rules fix #7438 --- apps/emqx_modules/src/emqx_rewrite.erl | 44 ++++++++---- apps/emqx_modules/test/emqx_rewrite_SUITE.erl | 67 ++++++++++--------- 2 files changed, 68 insertions(+), 43 deletions(-) diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index 2da1a9f70..edad6dfc4 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -23,7 +23,7 @@ -ifdef(TEST). -export([ compile/1, - match_and_rewrite/2 + match_and_rewrite/3 ]). -endif. @@ -92,14 +92,17 @@ unregister_hook() -> emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). -rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> - {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. +rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) -> + Binds = fill_client_binds(ClientInfo), + {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. -rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> - {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. +rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) -> + Binds = fill_client_binds(ClientInfo), + {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> - {ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}. + Binds = fill_client_binds(Message), + {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}. %%-------------------------------------------------------------------- %% Telemetry @@ -135,15 +138,15 @@ compile(Rules) -> Rules ). -match_and_rewrite(Topic, []) -> +match_and_rewrite(Topic, [], _) -> Topic; -match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules]) -> +match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) -> case emqx_topic:match(Topic, Filter) of - true -> rewrite(Topic, MP, Dest); - false -> match_and_rewrite(Topic, Rules) + true -> rewrite(Topic, MP, Dest, Binds); + false -> match_and_rewrite(Topic, Rules, Binds) end. -rewrite(Topic, MP, Dest) -> +rewrite(Topic, MP, Dest, Binds) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> Vars = lists:zip( @@ -159,9 +162,26 @@ rewrite(Topic, MP, Dest) -> re:replace(Acc, Var, Val, [global]) end, Dest, - Vars + Binds ++ Vars ) ); nomatch -> Topic end. + +fill_client_binds(#{clientid := ClientId, username := Username}) -> + filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]); +fill_client_binds(#message{from = ClientId, headers = Headers}) -> + Username = maps:get(username, Headers, undefined), + filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]). + +filter_client_binds(Binds) -> + lists:filter( + fun + ({_, undefined}) -> false; + ({_, <<"">>}) -> false; + ({_, ""}) -> false; + (_) -> true + end, + Binds + ). diff --git a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl index 892c25a09..7a6f096d5 100644 --- a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl @@ -30,12 +30,36 @@ <<"re">> => <<"^x/y/(.+)$">>, <<"source_topic">> => <<"x/#">> }, + #{ + <<"action">> => <<"publish">>, + <<"dest_topic">> => <<"pub/${username}/$1">>, + <<"re">> => <<"^name/(.+)$">>, + <<"source_topic">> => <<"name/#">> + }, + #{ + <<"action">> => <<"publish">>, + <<"dest_topic">> => <<"pub/${clientid}/$1">>, + <<"re">> => <<"^c/(.+)$">>, + <<"source_topic">> => <<"c/#">> + }, #{ <<"action">> => <<"subscribe">>, <<"dest_topic">> => <<"y/z/$2">>, <<"re">> => <<"^y/(.+)/z/(.+)$">>, <<"source_topic">> => <<"y/+/z/#">> }, + #{ + <<"action">> => <<"subscribe">>, + <<"dest_topic">> => <<"sub/${username}/$1">>, + <<"re">> => <<"^name/(.+)$">>, + <<"source_topic">> => <<"name/#">> + }, + #{ + <<"action">> => <<"subscribe">>, + <<"dest_topic">> => <<"sub/${clientid}/$1">>, + <<"re">> => <<"^c/(.+)$">>, + <<"source_topic">> => <<"c/#">> + }, #{ <<"action">> => <<"all">>, <<"dest_topic">> => <<"all/x/$2">>, @@ -69,11 +93,11 @@ end_per_testcase(_TestCase, _Config) -> t_subscribe_rewrite(_Config) -> {ok, Conn} = init(), - SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>], - SubDestTopics = [<<"y/z/b">>, <<"y/def">>], + SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>, <<"name/1">>, <<"c/1">>], + SubDestTopics = [<<"y/z/b">>, <<"y/def">>, <<"sub/u1/1">>, <<"sub/c1/1">>], {ok, _Props1, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]), timer:sleep(150), - Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>), + Subscriptions = emqx_broker:subscriptions(<<"c1">>), ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), RecvTopics = [ begin @@ -86,14 +110,14 @@ t_subscribe_rewrite(_Config) -> ?assertEqual(SubDestTopics, RecvTopics), {ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics), timer:sleep(100), - ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)), + ?assertEqual([], emqx_broker:subscriptions(<<"c1">>)), terminate(Conn). t_publish_rewrite(_Config) -> {ok, Conn} = init(), - PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], - PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], + PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"name/1">>, <<"c/1">>], + PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"pub/u1/1">>, <<"pub/c1/1">>], {ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), RecvTopics = [ begin @@ -109,10 +133,10 @@ t_publish_rewrite(_Config) -> t_rewrite_rule(_Config) -> {PubRules, SubRules, []} = emqx_rewrite:compile(emqx:get_config([rewrite])), - ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), - ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), - ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), - ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). + ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])), + ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])), + ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])), + ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])). t_rewrite_re_error(_Config) -> Rules = [ @@ -134,26 +158,7 @@ t_rewrite_re_error(_Config) -> t_list(_Config) -> ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), - Expect = [ - #{ - <<"action">> => <<"publish">>, - <<"dest_topic">> => <<"z/y/$1">>, - <<"re">> => <<"^x/y/(.+)$">>, - <<"source_topic">> => <<"x/#">> - }, - #{ - <<"action">> => <<"subscribe">>, - <<"dest_topic">> => <<"y/z/$2">>, - <<"re">> => <<"^y/(.+)/z/(.+)$">>, - <<"source_topic">> => <<"y/+/z/#">> - }, - #{ - <<"action">> => <<"all">>, - <<"dest_topic">> => <<"all/x/$2">>, - <<"re">> => <<"^all/(.+)/x/(.+)$">>, - <<"source_topic">> => <<"all/+/x/#">> - } - ], + Expect = maps:get(<<"rewrite">>, ?REWRITE), ?assertEqual(Expect, emqx_rewrite:list()), ok. @@ -246,7 +251,7 @@ receive_publish(Timeout) -> init() -> ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), ok = emqx_rewrite:enable(), - {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), + {ok, C} = emqtt:start_link([{clientid, <<"c1">>}, {username, <<"u1">>}]), {ok, _} = emqtt:connect(C), {ok, C}.