diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 3ce3aa52b..7c039377a 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -18,7 +18,7 @@ -compile({no_auto_import, [get/1, get/2]}). -export([add_handler/2, remove_handler/1]). --export([get/1, get/2, get_all/1]). +-export([get/1, get/2, get_raw/2, get_all/1]). -export([get_by_node/2, get_by_node/3]). -export([update/3, update/4]). -export([remove/2, remove/3]). @@ -46,6 +46,10 @@ get(KeyPath) -> get(KeyPath, Default) -> emqx:get_config(KeyPath, Default). +-spec get_raw(emqx_map_lib:config_key_path(), term()) -> term(). +get_raw(KeyPath, Default) -> + emqx_config:get_raw(KeyPath, Default). + %% @doc Returns all values in the cluster. -spec get_all(emqx_map_lib:config_key_path()) -> #{node() => term()}. get_all(KeyPath) -> @@ -72,7 +76,7 @@ get_node_and_config(KeyPath) -> {node(), emqx:get_config(KeyPath, config_not_found)}. %% @doc Update all value of key path in cluster-override.conf or local-override.conf. --spec update(emqx_map_lib:config_key_path(), emqx_config:update_args(), +-spec update(emqx_map_lib:config_key_path(), emqx_config:update_request(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts0) -> @@ -81,7 +85,7 @@ update(KeyPath, UpdateReq, Opts0) -> Res. %% @doc Update the specified node's key path in local-override.conf. --spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_args(), +-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(Node, KeyPath, UpdateReq, Opts0)when Node =:= node() -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index e2ab0ad8f..1cde1035e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -208,7 +208,7 @@ binenv(Key) -> iolist_to_binary(emqx_conf:get([emqx_dashboard, Key], "")). add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) -> - igonre; + ok; add_default_user(Username, Password) -> case lookup_user(Username) of diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 8804bbc4a..1605c3382 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -36,7 +36,7 @@ maybe_enable_modules() -> emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), emqx_event_message:enable(), - emqx_rewrite:enable(), + ok = emqx_rewrite:enable(), emqx_topic_metrics:enable(). maybe_disable_modules() -> diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index cec75c9b5..e960d7d0d 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -43,12 +43,13 @@ fields("delayed") -> ]; fields("rewrite") -> - [ {action, hoconsc:enum([publish, subscribe, all])} - , {source_topic, sc(binary(), #{})} - , {re, sc(binary(), #{})} - , {dest_topic, sc(binary(), #{})} + [ {action, sc(hoconsc:enum([subscribe, publish, all]), #{desc => "Action", example => publish})} + , {source_topic, sc(binary(), #{desc => "Origin Topic", example => "x/#"})} + , {dest_topic, sc(binary(), #{desc => "Destination Topic", example => "z/y/$1"})} + , {re, fun regular_expression/1 } ]; + fields("event_message") -> [ {"$event/client_connected", sc(boolean(), #{default => false})} , {"$event/client_disconnected", sc(boolean(), #{default => false})} @@ -62,6 +63,18 @@ fields("event_message") -> fields("topic_metrics") -> [{topic, sc(binary(), #{})}]. +regular_expression(type) -> binary(); +regular_expression(desc) -> "Regular expressions"; +regular_expression(example) -> "^x/y/(.+)$"; +regular_expression(validator) -> fun is_re/1; +regular_expression(_) -> undefined. + +is_re(Bin) -> + case re:compile(Bin) of + {ok, _} -> ok; + {error, Reason} -> {error, {Bin, Reason}} + end. + array(Name) -> {Name, hoconsc:array(hoconsc:ref(?MODULE, Name))}. sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index 16009d46b..7c8c8311e 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -17,6 +17,7 @@ -module(emqx_rewrite). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -ifdef(TEST). @@ -49,28 +50,27 @@ enable() -> disable() -> emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), - emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). + emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}), + ok. list() -> - emqx:get_raw_config([<<"rewrite">>], []). + emqx_conf:get_raw([<<"rewrite">>], []). update(Rules0) -> - {ok, #{config := Rules}} = emqx:update_config([rewrite], Rules0), - case Rules of - [] -> - disable(); - _ -> - register_hook(Rules) - end. + {ok, #{config := Rules}} = emqx_conf:update([rewrite], Rules0, #{override_to => cluster}), + register_hook(Rules). +register_hook([]) -> disable(); register_hook(Rules) -> - case Rules =:= [] of - true -> ok; - false -> - {PubRules, SubRules} = compile(Rules), - emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), - emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), - emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}) + {PubRules, SubRules, ErrRules} = compile(Rules), + emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), + emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), + emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}), + case ErrRules of + [] -> ok; + _ -> + ?SLOG(error, #{rewrite_rule_re_complie_failed => ErrRules}), + {error, ErrRules} end. rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> @@ -86,20 +86,21 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) -> %% Internal functions %%-------------------------------------------------------------------- compile(Rules) -> - lists:foldl(fun(#{source_topic := Topic, - re := Re, - dest_topic := Dest, - action := Action}, {Acc1, Acc2}) -> - {ok, MP} = re:compile(Re), - case Action of - publish -> - {[{Topic, MP, Dest} | Acc1], Acc2}; - subscribe -> - {Acc1, [{Topic, MP, Dest} | Acc2]}; - all -> - {[{Topic, MP, Dest} | Acc1], [{Topic, MP, Dest} | Acc2]} - end - end, {[], []}, Rules). + lists:foldl(fun(Rule, {Publish, Subscribe, Error}) -> + #{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule, + case re:compile(Re) of + {ok, MP} -> + case Action of + publish -> + {[{Topic, MP, Dest} | Publish], Subscribe, Error}; + subscribe -> + {Publish, [{Topic, MP, Dest} | Subscribe], Error}; + all -> + {[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error} + end; + {error, ErrSpec} -> + {Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]} + end end, {[], [], []}, Rules). match_and_rewrite(Topic, []) -> Topic; diff --git a/apps/emqx_modules/src/emqx_rewrite_api.erl b/apps/emqx_modules/src/emqx_rewrite_api.erl index 54defa599..1fa5e9467 100644 --- a/apps/emqx_modules/src/emqx_rewrite_api.erl +++ b/apps/emqx_modules/src/emqx_rewrite_api.erl @@ -16,8 +16,9 @@ -module(emqx_rewrite_api). -behaviour(minirest_api). +-include_lib("typerefl/include/types.hrl"). --export([api_spec/0]). +-export([api_spec/0, paths/0, schema/1]). -export([topic_rewrite/2]). @@ -32,33 +33,32 @@ ]). api_spec() -> - {[rewrite_api()], []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -properties() -> - properties([{action, string, <<"Action">>, [subscribe, publish, all]}, - {source_topic, string, <<"Topic">>}, - {re, string, <<"Regular expressions">>}, - {dest_topic, string, <<"Destination topic">>}]). +paths() -> + ["/mqtt/topic_rewrite"]. -rewrite_api() -> - Path = "/mqtt/topic_rewrite", - Metadata = #{ +schema("/mqtt/topic_rewrite") -> + #{ + operationId => topic_rewrite, get => #{ - description => <<"List topic rewrite">>, + tags => [mqtt], + description => <<"List rewrite topic.">>, responses => #{ - <<"200">> => object_array_schema(properties(), <<"List all rewrite rules">>) + 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), + #{desc => <<"List all rewrite rules">>}) } }, put => #{ - description => <<"Update topic rewrite">>, - 'requestBody' => object_array_schema(properties()), + description => <<"Update rewrite topic">>, + requestBody => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),#{}), responses => #{ - <<"200">> =>object_array_schema(properties(), <<"Update topic rewrite success">>), - <<"413">> => error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT]) + 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), + #{desc => <<"Update rewrite topic success.">>}), + 413 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Rules count exceed max limit">>) } } - }, - {Path, Metadata, topic_rewrite}. + }. topic_rewrite(get, _Params) -> {200, emqx_rewrite:list()}; diff --git a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl index e5bbb6182..4d51834c2 100644 --- a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl @@ -35,6 +35,12 @@ rewrite: [ source_topic : \"y/+/z/#\" re : \"^y/(.+)/z/(.+)$\" dest_topic : \"y/z/$2\" + }, + { + action : all + source_topic : \"all/+/x/#\" + re : \"^all/(.+)/x/(.+)$\" + dest_topic : \"all/x/$2\" } ]""">>). @@ -42,56 +48,135 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([emqx_modules]), + emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_modules]). + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]). -%% Test case for emqx_mod_write -t_mod_rewrite(_Config) -> - ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE), - ok = emqx_rewrite:enable(), - {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), - {ok, _} = emqtt:connect(C), - PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], - PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], +t_subscribe_rewrite(_Config) -> + {ok, Conn} = init(), SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>], SubDestTopics = [<<"y/z/b">>, <<"y/def">>], - %% Sub Rules - {ok, _Props1, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]), - timer:sleep(100), + {ok, _Props1, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]), + timer:sleep(150), Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>), ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), - RecvTopics1 = [begin - ok = emqtt:publish(C, Topic, <<"payload">>), - {ok, #{topic := RecvTopic}} = receive_publish(100), - RecvTopic - end || Topic <- SubDestTopics], - ?assertEqual(SubDestTopics, RecvTopics1), - {ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics), + RecvTopics = [begin + ok = emqtt:publish(Conn, Topic, <<"payload">>), + {ok, #{topic := RecvTopic}} = receive_publish(100), + RecvTopic + end || Topic <- SubDestTopics], + ?assertEqual(SubDestTopics, RecvTopics), + {ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics), timer:sleep(100), ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)), - %% Pub Rules - {ok, _Props2, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), - RecvTopics2 = [begin - ok = emqtt:publish(C, Topic, <<"payload">>), - {ok, #{topic := RecvTopic}} = receive_publish(100), - RecvTopic - end || Topic <- PubOrigTopics], - ?assertEqual(PubDestTopics, RecvTopics2), - {ok, _, _} = emqtt:unsubscribe(C, PubDestTopics), - ok = emqtt:disconnect(C), - ok = emqx_rewrite:disable(). + terminate(Conn). + +t_publish_rewrite(_Config) -> + {ok, Conn} = init(), + PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], + PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], + {ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), + RecvTopics = [begin + ok = emqtt:publish(Conn, Topic, <<"payload">>), + {ok, #{topic := RecvTopic}} = receive_publish(100), + RecvTopic + end || Topic <- PubOrigTopics], + ?assertEqual(PubDestTopics, RecvTopics), + {ok, _, _} = emqtt:unsubscribe(Conn, PubDestTopics), + terminate(Conn). t_rewrite_rule(_Config) -> - {PubRules, SubRules} = emqx_rewrite:compile(emqx:get_config([rewrite])), + {PubRules, SubRules, []} = emqx_rewrite:compile(emqx:get_config([rewrite])), ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). +t_rewrite_re_error(_Config) -> + Rules = [#{ + action => subscribe, + source_topic => "y/+/z/#", + re => "{^y/(.+)/z/(.+)$*", + dest_topic => "\"y/z/$2" + }], + Error = { + "y/+/z/#", + "{^y/(.+)/z/(.+)$*", + "\"y/z/$2", + {"nothing to repeat",16} + }, + ?assertEqual({[], [], [Error]}, emqx_rewrite:compile(Rules)), + ok. + +t_list(_Config) -> + ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE), + Expect = [ + #{<<"action">> => <<"publish">>, + <<"dest_topic">> => <<"z/y/$1">>, + <<"re">> => <<"^x/y/(.+)$">>, + <<"source_topic">> => <<"x/#">>}, + #{<<"action">> => <<"subscribe">>, + <<"dest_topic">> => <<"y/z/$2">>, + <<"re">> => <<"^y/(.+)/z/(.+)$">>, + <<"source_topic">> => <<"y/+/z/#">>}, + #{<<"action">> => <<"all">>, + <<"dest_topic">> => <<"all/x/$2">>, + <<"re">> => <<"^all/(.+)/x/(.+)$">>, + <<"source_topic">> => <<"all/+/x/#">>}], + ?assertEqual(Expect, emqx_rewrite:list()), + ok. + +t_update(_Config) -> + ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE), + Init = emqx_rewrite:list(), + Rules = [#{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"test/*">>, + <<"dest_topic">> => <<"test1/$2">>, + <<"action">> => <<"publish">> + }], + ok = emqx_rewrite:update(Rules), + ?assertEqual(Rules, emqx_rewrite:list()), + ok = emqx_rewrite:update(Init), + ok. + +t_update_disable(_Config) -> + ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE), + ?assertEqual(ok, emqx_rewrite:update([])), + timer:sleep(150), + + Subs = emqx_hooks:lookup('client.subscribe'), + UnSubs = emqx_hooks:lookup('client.unsubscribe'), + MessagePub = emqx_hooks:lookup('message.publish'), + Filter = fun({_, {Mod, _, _}, _, _}) -> Mod =:= emqx_rewrite end, + + ?assertEqual([], lists:filter(Filter, Subs)), + ?assertEqual([], lists:filter(Filter, UnSubs)), + ?assertEqual([], lists:filter(Filter, MessagePub)), + ok. + +t_update_re_failed(_Config) -> + ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE), + Rules = [#{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"*^test/*">>, + <<"dest_topic">> => <<"test1/$2">>, + <<"action">> => <<"publish">> + }], + Error = {badmatch, + {error, + {error, + {emqx_modules_schema, + [{validation_error, + #{array_index => 1,path => "rewrite.re", + reason => {<<"*^test/*">>,{"nothing to repeat",0}}, + value => <<"*^test/*">>}}]}}}}, + ?assertError(Error, emqx_rewrite:update(Rules)), + ok. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -102,3 +187,14 @@ receive_publish(Timeout) -> after Timeout -> {error, timeout} end. + +init() -> + ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE), + ok = emqx_rewrite:enable(), + {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), + {ok, _} = emqtt:connect(C), + {ok, C}. + +terminate(Conn) -> + ok = emqtt:disconnect(Conn), + ok = emqx_rewrite:disable().