From f47e10e08a70c61c0c8809284a74e05892f03c7b Mon Sep 17 00:00:00 2001 From: Rory Z Date: Sat, 22 Aug 2020 09:59:39 +0800 Subject: [PATCH] feature(emqx_mod_rewrite): separate rewrite rules for pub and sub (#3676) --- etc/emqx.conf | 4 +-- priv/emqx.schema | 45 +++++++++++++++++++++++++++++--- src/emqx_mod_rewrite.erl | 21 +++++++++------ test/emqx_mod_rewrite_SUITE.erl | 46 ++++++++++++++++++++------------- 4 files changed, 85 insertions(+), 31 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 50f733de9..0a41b3dbc 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1966,8 +1966,8 @@ module.presence.qos = 1 ## Rewrite Module ## {rewrite, Topic, Re, Dest} -## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1 -## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 +## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1 +## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 ##------------------------------------------------------------------- ## Plugins diff --git a/priv/emqx.schema b/priv/emqx.schema index 38aa55998..693c7fbef 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1945,6 +1945,14 @@ end}. {datatype, string} ]}. +{mapping, "module.rewrite.pub.rule.$id", "emqx.modules", [ + {datatype, string} +]}. + +{mapping, "module.rewrite.sub.rule.$id", "emqx.modules", [ + {datatype, string} +]}. + {translation, "emqx.modules", fun(Conf) -> Subscriptions = fun() -> List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), @@ -1957,10 +1965,16 @@ end}. end, Rewrites = fun() -> Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), - lists:map(fun({[_, "rewrite", "rule", I], Rule}) -> + PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf), + SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf), + TotalRules = lists:append( + [ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules, + [ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules + ), + lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) -> [Topic, Re, Dest] = string:tokens(Rule, " "), - {rewrite, list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} - end, Rules) + {rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} + end, TotalRules) end, lists:append([ [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}], @@ -2165,3 +2179,28 @@ end}. {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)}, {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}] end}. + +%%-------------------------------------------------------------------- +%% Telemetry +%%-------------------------------------------------------------------- +{mapping, "telemetry.enabled", "emqx.telemetry", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "telemetry.url", "emqx.telemetry", [ + {default, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"}, + {datatype, string} +]}. + +{mapping, "telemetry.report_interval", "emqx.telemetry", [ + {default, "7d"}, + {datatype, {duration, s}} +]}. + +{translation, "emqx.telemetry", fun(Conf) -> + [ {enabled, cuttlefish:conf_get("telemetry.enabled", Conf)} + , {url, cuttlefish:conf_get("telemetry.url", Conf)} + , {report_interval, cuttlefish:conf_get("telemetry.report_interval", Conf)} + ] +end}. diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 9702b7350..f5e343eed 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -44,10 +44,10 @@ %%-------------------------------------------------------------------- load(RawRules) -> - Rules = compile(RawRules), - emqx_hooks:add('client.subscribe', {?MODULE, rewrite_subscribe, [Rules]}), - emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [Rules]}), - emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [Rules]}). + {PubRules, SubRules} = compile(RawRules), + emqx_hooks:add('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), + emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), + emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [PubRules]}). rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. @@ -70,10 +70,15 @@ description() -> %%-------------------------------------------------------------------- compile(Rules) -> - lists:map(fun({rewrite, Topic, Re, Dest}) -> - {ok, MP} = re:compile(Re), - {rewrite, Topic, MP, Dest} - end, Rules). + PubRules = [ begin + {ok, MP} = re:compile(Re), + {rewrite, Topic, MP, Dest} + end || {rewrite, pub, Topic, Re, Dest}<- Rules ], + SubRules = [ begin + {ok, MP} = re:compile(Re), + {rewrite, Topic, MP, Dest} + end || {rewrite, sub, Topic, Re, Dest}<- Rules ], + {PubRules, SubRules}. match_and_rewrite(Topic, []) -> Topic; diff --git a/test/emqx_mod_rewrite_SUITE.erl b/test/emqx_mod_rewrite_SUITE.erl index 74397b843..e3c19200e 100644 --- a/test/emqx_mod_rewrite_SUITE.erl +++ b/test/emqx_mod_rewrite_SUITE.erl @@ -22,8 +22,8 @@ -include("emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(RULES, [{rewrite,<<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>}, - {rewrite,<<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>} +-define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>}, + {rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>} ]). all() -> emqx_ct:all(?MODULE). @@ -43,33 +43,43 @@ t_mod_rewrite(_Config) -> ok = emqx_mod_rewrite:load(?RULES), {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), {ok, _} = emqtt:connect(C), - OrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"y/a/z/b">>, <<"y/def">>], - DestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"y/z/b">>, <<"y/def">>], - %% Subscribe - {ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- OrigTopics]), + PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], + PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], + SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>], + SubDestTopics = [<<"y/z/b">>, <<"y/def">>], + %% Sub Rules + {ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]), timer:sleep(100), Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>), - ?assertEqual(DestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), - %% Publish - RecvTopics = [begin + ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), + RecvTopics1 = [begin ok = emqtt:publish(C, Topic, <<"payload">>), {ok, #{topic := RecvTopic}} = receive_publish(100), RecvTopic - end || Topic <- OrigTopics], - ?assertEqual(DestTopics, RecvTopics), - %% Unsubscribe - {ok, _, _} = emqtt:unsubscribe(C, OrigTopics), + end || Topic <- SubDestTopics], + ?assertEqual(SubDestTopics, RecvTopics1), + {ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics), timer:sleep(100), ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)), + %% Pub Rules + {ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), + RecvTopics2 = [begin + ok = emqtt:publish(C, Topic, <<"payload">>), + {ok, #{topic := RecvTopic}} = receive_publish(100), + RecvTopic + end || Topic <- PubOrigTopics], + ?assertEqual(PubDestTopics, RecvTopics2), + {ok, _, _} = emqtt:unsubscribe(C, PubDestTopics), + ok = emqtt:disconnect(C), ok = emqx_mod_rewrite:unload(?RULES). t_rewrite_rule(_Config) -> - Rules = emqx_mod_rewrite:compile(?RULES), - ?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, Rules)), - ?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, Rules)), - ?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, Rules)), - ?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, Rules)). + {PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES), + ?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), + ?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), + ?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), + ?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). %%-------------------------------------------------------------------- %% Internal functions