feat(rewrite): update rewrite conf to array

This commit is contained in:
Turtle 2021-08-26 20:16:56 +08:00 committed by turtleDeng
parent f02a554502
commit 307eaa7f1e
4 changed files with 24 additions and 41 deletions

View File

@ -23,18 +23,16 @@ event_message {
# "$event/message_dropped": false # "$event/message_dropped": false
} }
topic_metrics { topic_metrics: [
topics = [] #{topic: "test/1"}
} ]
rewrite { rewrite: [
rules = [ {
{ action = publish
action = publish source_topic = "x/#"
source_topic = "x/#" re = "^x/y/(.+)$"
re = "^x/y/(.+)$" dest_topic = "z/y/$1"
dest_topic = "z/y/$1" }
} ]
]
}

View File

@ -28,8 +28,8 @@ structs() ->
"recon", "recon",
"telemetry", "telemetry",
"event_message", "event_message",
"rewrite", {array, "rewrite"},
"topic_metrics"]. {array, "topic_metrics"}].
fields(Name) when Name =:= "recon"; fields(Name) when Name =:= "recon";
Name =:= "telemetry" -> Name =:= "telemetry" ->
@ -42,10 +42,12 @@ fields("delayed") ->
]; ];
fields("rewrite") -> fields("rewrite") ->
[ {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))} [ {action, hoconsc:enum([publish, subscribe])}
, {source_topic, emqx_schema:t(binary())}
, {re, emqx_schema:t(binary())}
, {dest_topic, emqx_schema:t(binary())}
]; ];
fields("event_message") -> fields("event_message") ->
[ {"$event/client_connected", emqx_schema:t(boolean(), undefined, false)} [ {"$event/client_connected", emqx_schema:t(boolean(), undefined, false)}
, {"$event/client_disconnected", emqx_schema:t(boolean(), undefined, false)} , {"$event/client_disconnected", emqx_schema:t(boolean(), undefined, false)}
@ -57,13 +59,5 @@ fields("event_message") ->
]; ];
fields("topic_metrics") -> fields("topic_metrics") ->
[ {topics, hoconsc:array(binary())} [{topic, emqx_schema:t(binary())}].
];
fields("rules") ->
[ {action, hoconsc:enum([publish, subscribe])}
, {source_topic, emqx_schema:t(binary())}
, {re, emqx_schema:t(binary())}
, {dest_topic, emqx_schema:t(binary())}
].

View File

@ -43,7 +43,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
enable() -> enable() ->
Rules = emqx:get_config([rewrite, rules], []), Rules = emqx:get_config([rewrite], []),
register_hook(Rules). register_hook(Rules).
disable() -> disable() ->
@ -52,13 +52,10 @@ disable() ->
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
list() -> list() ->
maps:get(<<"rules">>, emqx:get_raw_config([<<"rewrite">>], #{}), []). maps:get(<<"rules">>, emqx:get_raw_config([<<"rewrite">>], []), []).
update(Rules0) -> update(Rules0) ->
Rewrite = emqx:get_raw_config([<<"rewrite">>], #{}), {ok, #{config := Rules}} = emqx:update_config([rewrite], Rules0),
{ok, #{config := Config}} = emqx:update_config([rewrite], maps:put(<<"rules">>,
Rules0, Rewrite)),
Rules = maps:get(rules, maps:get(rewrite, Config, #{}), []),
case Rules of case Rules of
[] -> [] ->
disable(); disable();

View File

@ -23,8 +23,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(REWRITE, <<""" -define(REWRITE, <<"""
rewrite: { rewrite: [
rules : [
{ {
action : publish action : publish
source_topic : \"x/#\" source_topic : \"x/#\"
@ -37,7 +36,7 @@ rewrite: {
re : \"^y/(.+)/z/(.+)$\" re : \"^y/(.+)/z/(.+)$\"
dest_topic : \"y/z/$2\" dest_topic : \"y/z/$2\"
} }
]}""">>). ]""">>).
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
@ -87,12 +86,7 @@ t_mod_rewrite(_Config) ->
ok = emqx_rewrite:disable(). ok = emqx_rewrite:disable().
t_rewrite_rule(_Config) -> t_rewrite_rule(_Config) ->
{ok, Rewite} = hocon:binary(?REWRITE), {PubRules, SubRules} = emqx_rewrite:compile(emqx:get_config([rewrite])),
#{rewrite := #{rules := Rules}} =
hocon_schema:check_plain(emqx_modules_schema, Rewite,
#{atom_key => true},
["rewrite"]),
{PubRules, SubRules} = emqx_rewrite:compile(Rules),
?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),