Rewrite http api with hocon schema (#5980)

* feat: rewrite http api with hocon

* fix: crash when default_username is empty

* chore: udpate rewrite api with emqx_conf's cluster_rpc

* fix: spec wrong
This commit is contained in:
zhongwencool 2021-10-22 17:01:29 +08:00 committed by GitHub
parent f817ba0075
commit 90795a6f42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 203 additions and 89 deletions

View File

@ -18,7 +18,7 @@
-compile({no_auto_import, [get/1, get/2]}).
-export([add_handler/2, remove_handler/1]).
-export([get/1, get/2, get_all/1]).
-export([get/1, get/2, get_raw/2, get_all/1]).
-export([get_by_node/2, get_by_node/3]).
-export([update/3, update/4]).
-export([remove/2, remove/3]).
@ -46,6 +46,10 @@ get(KeyPath) ->
get(KeyPath, Default) ->
emqx:get_config(KeyPath, Default).
-spec get_raw(emqx_map_lib:config_key_path(), term()) -> term().
get_raw(KeyPath, Default) ->
emqx_config:get_raw(KeyPath, Default).
%% @doc Returns all values in the cluster.
-spec get_all(emqx_map_lib:config_key_path()) -> #{node() => term()}.
get_all(KeyPath) ->
@ -72,7 +76,7 @@ get_node_and_config(KeyPath) ->
{node(), emqx:get_config(KeyPath, config_not_found)}.
%% @doc Update all value of key path in cluster-override.conf or local-override.conf.
-spec update(emqx_map_lib:config_key_path(), emqx_config:update_args(),
-spec update(emqx_map_lib:config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts0) ->
@ -81,7 +85,7 @@ update(KeyPath, UpdateReq, Opts0) ->
Res.
%% @doc Update the specified node's key path in local-override.conf.
-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_args(),
-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(Node, KeyPath, UpdateReq, Opts0)when Node =:= node() ->

View File

@ -208,7 +208,7 @@ binenv(Key) ->
iolist_to_binary(emqx_conf:get([emqx_dashboard, Key], "")).
add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) ->
igonre;
ok;
add_default_user(Username, Password) ->
case lookup_user(Username) of

View File

@ -36,7 +36,7 @@ maybe_enable_modules() ->
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
emqx_event_message:enable(),
emqx_rewrite:enable(),
ok = emqx_rewrite:enable(),
emqx_topic_metrics:enable().
maybe_disable_modules() ->

View File

@ -43,12 +43,13 @@ fields("delayed") ->
];
fields("rewrite") ->
[ {action, hoconsc:enum([publish, subscribe, all])}
, {source_topic, sc(binary(), #{})}
, {re, sc(binary(), #{})}
, {dest_topic, sc(binary(), #{})}
[ {action, sc(hoconsc:enum([subscribe, publish, all]), #{desc => "Action", example => publish})}
, {source_topic, sc(binary(), #{desc => "Origin Topic", example => "x/#"})}
, {dest_topic, sc(binary(), #{desc => "Destination Topic", example => "z/y/$1"})}
, {re, fun regular_expression/1 }
];
fields("event_message") ->
[ {"$event/client_connected", sc(boolean(), #{default => false})}
, {"$event/client_disconnected", sc(boolean(), #{default => false})}
@ -62,6 +63,18 @@ fields("event_message") ->
fields("topic_metrics") ->
[{topic, sc(binary(), #{})}].
regular_expression(type) -> binary();
regular_expression(desc) -> "Regular expressions";
regular_expression(example) -> "^x/y/(.+)$";
regular_expression(validator) -> fun is_re/1;
regular_expression(_) -> undefined.
is_re(Bin) ->
case re:compile(Bin) of
{ok, _} -> ok;
{error, Reason} -> {error, {Bin, Reason}}
end.
array(Name) -> {Name, hoconsc:array(hoconsc:ref(?MODULE, Name))}.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -17,6 +17,7 @@
-module(emqx_rewrite).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-ifdef(TEST).
@ -49,28 +50,27 @@ enable() ->
disable() ->
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}),
ok.
list() ->
emqx:get_raw_config([<<"rewrite">>], []).
emqx_conf:get_raw([<<"rewrite">>], []).
update(Rules0) ->
{ok, #{config := Rules}} = emqx:update_config([rewrite], Rules0),
case Rules of
[] ->
disable();
_ ->
register_hook(Rules)
end.
{ok, #{config := Rules}} = emqx_conf:update([rewrite], Rules0, #{override_to => cluster}),
register_hook(Rules).
register_hook([]) -> disable();
register_hook(Rules) ->
case Rules =:= [] of
true -> ok;
false ->
{PubRules, SubRules} = compile(Rules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]})
{PubRules, SubRules, ErrRules} = compile(Rules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}),
case ErrRules of
[] -> ok;
_ ->
?SLOG(error, #{rewrite_rule_re_complie_failed => ErrRules}),
{error, ErrRules}
end.
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
@ -86,20 +86,21 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) ->
%% Internal functions
%%--------------------------------------------------------------------
compile(Rules) ->
lists:foldl(fun(#{source_topic := Topic,
re := Re,
dest_topic := Dest,
action := Action}, {Acc1, Acc2}) ->
{ok, MP} = re:compile(Re),
case Action of
publish ->
{[{Topic, MP, Dest} | Acc1], Acc2};
subscribe ->
{Acc1, [{Topic, MP, Dest} | Acc2]};
all ->
{[{Topic, MP, Dest} | Acc1], [{Topic, MP, Dest} | Acc2]}
end
end, {[], []}, Rules).
lists:foldl(fun(Rule, {Publish, Subscribe, Error}) ->
#{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule,
case re:compile(Re) of
{ok, MP} ->
case Action of
publish ->
{[{Topic, MP, Dest} | Publish], Subscribe, Error};
subscribe ->
{Publish, [{Topic, MP, Dest} | Subscribe], Error};
all ->
{[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error}
end;
{error, ErrSpec} ->
{Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]}
end end, {[], [], []}, Rules).
match_and_rewrite(Topic, []) ->
Topic;

View File

@ -16,8 +16,9 @@
-module(emqx_rewrite_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-export([api_spec/0]).
-export([api_spec/0, paths/0, schema/1]).
-export([topic_rewrite/2]).
@ -32,33 +33,32 @@
]).
api_spec() ->
{[rewrite_api()], []}.
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
properties() ->
properties([{action, string, <<"Action">>, [subscribe, publish, all]},
{source_topic, string, <<"Topic">>},
{re, string, <<"Regular expressions">>},
{dest_topic, string, <<"Destination topic">>}]).
paths() ->
["/mqtt/topic_rewrite"].
rewrite_api() ->
Path = "/mqtt/topic_rewrite",
Metadata = #{
schema("/mqtt/topic_rewrite") ->
#{
operationId => topic_rewrite,
get => #{
description => <<"List topic rewrite">>,
tags => [mqtt],
description => <<"List rewrite topic.">>,
responses => #{
<<"200">> => object_array_schema(properties(), <<"List all rewrite rules">>)
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),
#{desc => <<"List all rewrite rules">>})
}
},
put => #{
description => <<"Update topic rewrite">>,
'requestBody' => object_array_schema(properties()),
description => <<"Update rewrite topic">>,
requestBody => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),#{}),
responses => #{
<<"200">> =>object_array_schema(properties(), <<"Update topic rewrite success">>),
<<"413">> => error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),
#{desc => <<"Update rewrite topic success.">>}),
413 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Rules count exceed max limit">>)
}
}
},
{Path, Metadata, topic_rewrite}.
}.
topic_rewrite(get, _Params) ->
{200, emqx_rewrite:list()};

View File

@ -35,6 +35,12 @@ rewrite: [
source_topic : \"y/+/z/#\"
re : \"^y/(.+)/z/(.+)$\"
dest_topic : \"y/z/$2\"
},
{
action : all
source_topic : \"all/+/x/#\"
re : \"^all/(.+)/x/(.+)$\"
dest_topic : \"all/x/$2\"
}
]""">>).
@ -42,56 +48,135 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([emqx_modules]),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_modules]).
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]).
%% Test case for emqx_mod_write
t_mod_rewrite(_Config) ->
ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE),
ok = emqx_rewrite:enable(),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>],
t_subscribe_rewrite(_Config) ->
{ok, Conn} = init(),
SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>],
SubDestTopics = [<<"y/z/b">>, <<"y/def">>],
%% Sub Rules
{ok, _Props1, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
timer:sleep(100),
{ok, _Props1, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
timer:sleep(150),
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
RecvTopics1 = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- SubDestTopics],
?assertEqual(SubDestTopics, RecvTopics1),
{ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics),
RecvTopics = [begin
ok = emqtt:publish(Conn, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- SubDestTopics],
?assertEqual(SubDestTopics, RecvTopics),
{ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics),
timer:sleep(100),
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
%% Pub Rules
{ok, _Props2, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
RecvTopics2 = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- PubOrigTopics],
?assertEqual(PubDestTopics, RecvTopics2),
{ok, _, _} = emqtt:unsubscribe(C, PubDestTopics),
ok = emqtt:disconnect(C),
ok = emqx_rewrite:disable().
terminate(Conn).
t_publish_rewrite(_Config) ->
{ok, Conn} = init(),
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>],
{ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
RecvTopics = [begin
ok = emqtt:publish(Conn, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- PubOrigTopics],
?assertEqual(PubDestTopics, RecvTopics),
{ok, _, _} = emqtt:unsubscribe(Conn, PubDestTopics),
terminate(Conn).
t_rewrite_rule(_Config) ->
{PubRules, SubRules} = emqx_rewrite:compile(emqx:get_config([rewrite])),
{PubRules, SubRules, []} = emqx_rewrite:compile(emqx:get_config([rewrite])),
?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),
?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
t_rewrite_re_error(_Config) ->
Rules = [#{
action => subscribe,
source_topic => "y/+/z/#",
re => "{^y/(.+)/z/(.+)$*",
dest_topic => "\"y/z/$2"
}],
Error = {
"y/+/z/#",
"{^y/(.+)/z/(.+)$*",
"\"y/z/$2",
{"nothing to repeat",16}
},
?assertEqual({[], [], [Error]}, emqx_rewrite:compile(Rules)),
ok.
t_list(_Config) ->
ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE),
Expect = [
#{<<"action">> => <<"publish">>,
<<"dest_topic">> => <<"z/y/$1">>,
<<"re">> => <<"^x/y/(.+)$">>,
<<"source_topic">> => <<"x/#">>},
#{<<"action">> => <<"subscribe">>,
<<"dest_topic">> => <<"y/z/$2">>,
<<"re">> => <<"^y/(.+)/z/(.+)$">>,
<<"source_topic">> => <<"y/+/z/#">>},
#{<<"action">> => <<"all">>,
<<"dest_topic">> => <<"all/x/$2">>,
<<"re">> => <<"^all/(.+)/x/(.+)$">>,
<<"source_topic">> => <<"all/+/x/#">>}],
?assertEqual(Expect, emqx_rewrite:list()),
ok.
t_update(_Config) ->
ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE),
Init = emqx_rewrite:list(),
Rules = [#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"test/*">>,
<<"dest_topic">> => <<"test1/$2">>,
<<"action">> => <<"publish">>
}],
ok = emqx_rewrite:update(Rules),
?assertEqual(Rules, emqx_rewrite:list()),
ok = emqx_rewrite:update(Init),
ok.
t_update_disable(_Config) ->
ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE),
?assertEqual(ok, emqx_rewrite:update([])),
timer:sleep(150),
Subs = emqx_hooks:lookup('client.subscribe'),
UnSubs = emqx_hooks:lookup('client.unsubscribe'),
MessagePub = emqx_hooks:lookup('message.publish'),
Filter = fun({_, {Mod, _, _}, _, _}) -> Mod =:= emqx_rewrite end,
?assertEqual([], lists:filter(Filter, Subs)),
?assertEqual([], lists:filter(Filter, UnSubs)),
?assertEqual([], lists:filter(Filter, MessagePub)),
ok.
t_update_re_failed(_Config) ->
ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE),
Rules = [#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"*^test/*">>,
<<"dest_topic">> => <<"test1/$2">>,
<<"action">> => <<"publish">>
}],
Error = {badmatch,
{error,
{error,
{emqx_modules_schema,
[{validation_error,
#{array_index => 1,path => "rewrite.re",
reason => {<<"*^test/*">>,{"nothing to repeat",0}},
value => <<"*^test/*">>}}]}}}},
?assertError(Error, emqx_rewrite:update(Rules)),
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -102,3 +187,14 @@ receive_publish(Timeout) ->
after
Timeout -> {error, timeout}
end.
init() ->
ok = emqx_config:init_load(emqx_modules_schema, ?REWRITE),
ok = emqx_rewrite:enable(),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
{ok, C}.
terminate(Conn) ->
ok = emqtt:disconnect(Conn),
ok = emqx_rewrite:disable().