From 50ee840220e3d4cb03fb12d47b90d704c0f4ed49 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 17 Aug 2021 17:49:37 +0800 Subject: [PATCH] feat: add rewrite api (#5502) --- apps/emqx_modules/src/emqx_rewrite.erl | 32 ++++++++--- apps/emqx_modules/src/emqx_rewrite_api.erl | 66 ++++++++++++++++++++++ 2 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 apps/emqx_modules/src/emqx_rewrite_api.erl diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index 20e51ae16..a7e98bbea 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -35,12 +35,37 @@ , disable/0 ]). +-export([ list/0 + , update/1]). + %%-------------------------------------------------------------------- %% Load/Unload %%-------------------------------------------------------------------- enable() -> Rules = emqx_config:get([rewrite, rules], []), + register_hook(Rules). + +disable() -> + emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), + emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), + emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). + +list() -> + maps:get(<<"rules">>, emqx_config:get_raw([<<"rewrite">>], #{}), []). + +update(Rules0) -> + Rewrite = emqx_config:get_raw([<<"rewrite">>], #{}), + {ok, Config, _} = emqx_config:update([rewrite], maps:put(<<"rules">>, Rules0, Rewrite)), + Rules = maps:get(rules, maps:get(rewrite, Config, #{}), []), + case Rules of + [] -> + disable(); + _ -> + register_hook(Rules) + end. + +register_hook(Rules) -> case Rules =:= [] of true -> ok; false -> @@ -50,11 +75,6 @@ enable() -> emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}) end. -disable() -> - emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), - emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), - emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). - rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. @@ -67,7 +87,6 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - compile(Rules) -> lists:foldl(fun(#{source_topic := Topic, re := Re, @@ -102,4 +121,3 @@ rewrite(Topic, MP, Dest) -> end, Dest, Vars)); nomatch -> Topic end. - diff --git a/apps/emqx_modules/src/emqx_rewrite_api.erl b/apps/emqx_modules/src/emqx_rewrite_api.erl new file mode 100644 index 000000000..2be28a081 --- /dev/null +++ b/apps/emqx_modules/src/emqx_rewrite_api.erl @@ -0,0 +1,66 @@ +-module(emqx_rewrite_api). + +-behaviour(minirest_api). + +-export([api_spec/0]). + +-export([topic_rewrite/2]). + +-define(MAX_RULES_LIMIT, 20). + +-define(EXCEED_LIMIT, 'EXCEED_LIMIT'). + +api_spec() -> + {[rewrite_api()], []}. + +topic_rewrite_schema() -> + #{ + type => object, + properties => #{ + action => #{ + type => string, + description => <<"Node">>, + enum => [subscribe, publish]}, + source_topic => #{ + type => string, + description => <<"Topic">>}, + re => #{ + type => string, + description => <<"Regular expressions">>}, + dest_topic => #{ + type => string, + description => <<"Destination topic">>} + } + }. + +rewrite_api() -> + Path = "/mqtt/topic_rewrite", + Metadata = #{ + get => #{ + description => <<"List topic rewrite">>, + responses => #{ + <<"200">> => + emqx_mgmt_util:response_array_schema(<<"List all rewrite rules">>, topic_rewrite_schema())}}, + post => #{ + description => <<"Update topic rewrite">>, + 'requestBody' => emqx_mgmt_util:request_body_array_schema(topic_rewrite_schema()), + response => #{ + <<"200">> => + emqx_mgmt_util:response_schema(<<"Update topic rewrite success">>, topic_rewrite_schema()), + <<"413">> => emqx_mgmt_util:response_error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])}}}, + {Path, Metadata, topic_rewrite}. + +topic_rewrite(get, _Request) -> + {200, emqx_rewrite:list()}; + +topic_rewrite(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + case length(Params) < ?MAX_RULES_LIMIT of + true -> + ok = emqx_rewrite:update(Params), + {200, emqx_rewrite:list()}; + _ -> + Message = list_to_binary(io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])), + {413, #{code => ?EXCEED_LIMIT, message => Message}} + end.