feature(emqx_mod_rewrite): separate rewrite rules for pub and sub (#3676)

This commit is contained in:
Rory Z 2020-08-22 09:59:39 +08:00 committed by tigercl
parent dbeabf3de0
commit f47e10e08a
4 changed files with 85 additions and 31 deletions

View File

@ -1966,8 +1966,8 @@ module.presence.qos = 1
## Rewrite Module
## {rewrite, Topic, Re, Dest}
## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1
## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
##-------------------------------------------------------------------
## Plugins

View File

@ -1945,6 +1945,14 @@ end}.
{datatype, string}
]}.
{mapping, "module.rewrite.pub.rule.$id", "emqx.modules", [
{datatype, string}
]}.
{mapping, "module.rewrite.sub.rule.$id", "emqx.modules", [
{datatype, string}
]}.
{translation, "emqx.modules", fun(Conf) ->
Subscriptions = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
@ -1957,10 +1965,16 @@ end}.
end,
Rewrites = fun() ->
Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf),
lists:map(fun({[_, "rewrite", "rule", I], Rule}) ->
PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf),
SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf),
TotalRules = lists:append(
[ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules,
[ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules
),
lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) ->
[Topic, Re, Dest] = string:tokens(Rule, " "),
{rewrite, list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
end, Rules)
{rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
end, TotalRules)
end,
lists:append([
[{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}],
@ -2165,3 +2179,28 @@ end}.
{size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)},
{validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}]
end}.
%%--------------------------------------------------------------------
%% Telemetry
%%--------------------------------------------------------------------
{mapping, "telemetry.enabled", "emqx.telemetry", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{mapping, "telemetry.url", "emqx.telemetry", [
{default, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"},
{datatype, string}
]}.
{mapping, "telemetry.report_interval", "emqx.telemetry", [
{default, "7d"},
{datatype, {duration, s}}
]}.
{translation, "emqx.telemetry", fun(Conf) ->
[ {enabled, cuttlefish:conf_get("telemetry.enabled", Conf)}
, {url, cuttlefish:conf_get("telemetry.url", Conf)}
, {report_interval, cuttlefish:conf_get("telemetry.report_interval", Conf)}
]
end}.

View File

@ -44,10 +44,10 @@
%%--------------------------------------------------------------------
load(RawRules) ->
Rules = compile(RawRules),
emqx_hooks:add('client.subscribe', {?MODULE, rewrite_subscribe, [Rules]}),
emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [Rules]}),
emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [Rules]}).
{PubRules, SubRules} = compile(RawRules),
emqx_hooks:add('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
@ -70,10 +70,15 @@ description() ->
%%--------------------------------------------------------------------
compile(Rules) ->
lists:map(fun({rewrite, Topic, Re, Dest}) ->
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end, Rules).
PubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, pub, Topic, Re, Dest}<- Rules ],
SubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
{PubRules, SubRules}.
match_and_rewrite(Topic, []) ->
Topic;

View File

@ -22,8 +22,8 @@
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(RULES, [{rewrite,<<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
{rewrite,<<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
-define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
{rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
]).
all() -> emqx_ct:all(?MODULE).
@ -43,33 +43,43 @@ t_mod_rewrite(_Config) ->
ok = emqx_mod_rewrite:load(?RULES),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
OrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"y/a/z/b">>, <<"y/def">>],
DestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"y/z/b">>, <<"y/def">>],
%% Subscribe
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- OrigTopics]),
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>],
SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>],
SubDestTopics = [<<"y/z/b">>, <<"y/def">>],
%% Sub Rules
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
timer:sleep(100),
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
?assertEqual(DestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
%% Publish
RecvTopics = [begin
?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
RecvTopics1 = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- OrigTopics],
?assertEqual(DestTopics, RecvTopics),
%% Unsubscribe
{ok, _, _} = emqtt:unsubscribe(C, OrigTopics),
end || Topic <- SubDestTopics],
?assertEqual(SubDestTopics, RecvTopics1),
{ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics),
timer:sleep(100),
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
%% Pub Rules
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
RecvTopics2 = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- PubOrigTopics],
?assertEqual(PubDestTopics, RecvTopics2),
{ok, _, _} = emqtt:unsubscribe(C, PubDestTopics),
ok = emqtt:disconnect(C),
ok = emqx_mod_rewrite:unload(?RULES).
t_rewrite_rule(_Config) ->
Rules = emqx_mod_rewrite:compile(?RULES),
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, Rules)),
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, Rules)),
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, Rules)),
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, Rules)).
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
%%--------------------------------------------------------------------
%% Internal functions