feat: add rewrite api (#5502)
This commit is contained in:
parent
7fcd925eff
commit
50ee840220
|
@ -35,12 +35,37 @@
|
||||||
, disable/0
|
, disable/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ list/0
|
||||||
|
, update/1]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Load/Unload
|
%% Load/Unload
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
enable() ->
|
enable() ->
|
||||||
Rules = emqx_config:get([rewrite, rules], []),
|
Rules = emqx_config:get([rewrite, rules], []),
|
||||||
|
register_hook(Rules).
|
||||||
|
|
||||||
|
disable() ->
|
||||||
|
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
|
||||||
|
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
|
||||||
|
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
||||||
|
|
||||||
|
list() ->
|
||||||
|
maps:get(<<"rules">>, emqx_config:get_raw([<<"rewrite">>], #{}), []).
|
||||||
|
|
||||||
|
update(Rules0) ->
|
||||||
|
Rewrite = emqx_config:get_raw([<<"rewrite">>], #{}),
|
||||||
|
{ok, Config, _} = emqx_config:update([rewrite], maps:put(<<"rules">>, Rules0, Rewrite)),
|
||||||
|
Rules = maps:get(rules, maps:get(rewrite, Config, #{}), []),
|
||||||
|
case Rules of
|
||||||
|
[] ->
|
||||||
|
disable();
|
||||||
|
_ ->
|
||||||
|
register_hook(Rules)
|
||||||
|
end.
|
||||||
|
|
||||||
|
register_hook(Rules) ->
|
||||||
case Rules =:= [] of
|
case Rules =:= [] of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false ->
|
false ->
|
||||||
|
@ -50,11 +75,6 @@ enable() ->
|
||||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]})
|
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
disable() ->
|
|
||||||
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
|
|
||||||
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
|
|
||||||
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
|
||||||
|
|
||||||
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
|
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
|
||||||
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
|
||||||
|
|
||||||
|
@ -67,7 +87,6 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
compile(Rules) ->
|
compile(Rules) ->
|
||||||
lists:foldl(fun(#{source_topic := Topic,
|
lists:foldl(fun(#{source_topic := Topic,
|
||||||
re := Re,
|
re := Re,
|
||||||
|
@ -102,4 +121,3 @@ rewrite(Topic, MP, Dest) ->
|
||||||
end, Dest, Vars));
|
end, Dest, Vars));
|
||||||
nomatch -> Topic
|
nomatch -> Topic
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
-module(emqx_rewrite_api).
|
||||||
|
|
||||||
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
|
-export([api_spec/0]).
|
||||||
|
|
||||||
|
-export([topic_rewrite/2]).
|
||||||
|
|
||||||
|
-define(MAX_RULES_LIMIT, 20).
|
||||||
|
|
||||||
|
-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
|
||||||
|
|
||||||
|
api_spec() ->
|
||||||
|
{[rewrite_api()], []}.
|
||||||
|
|
||||||
|
topic_rewrite_schema() ->
|
||||||
|
#{
|
||||||
|
type => object,
|
||||||
|
properties => #{
|
||||||
|
action => #{
|
||||||
|
type => string,
|
||||||
|
description => <<"Node">>,
|
||||||
|
enum => [subscribe, publish]},
|
||||||
|
source_topic => #{
|
||||||
|
type => string,
|
||||||
|
description => <<"Topic">>},
|
||||||
|
re => #{
|
||||||
|
type => string,
|
||||||
|
description => <<"Regular expressions">>},
|
||||||
|
dest_topic => #{
|
||||||
|
type => string,
|
||||||
|
description => <<"Destination topic">>}
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
rewrite_api() ->
|
||||||
|
Path = "/mqtt/topic_rewrite",
|
||||||
|
Metadata = #{
|
||||||
|
get => #{
|
||||||
|
description => <<"List topic rewrite">>,
|
||||||
|
responses => #{
|
||||||
|
<<"200">> =>
|
||||||
|
emqx_mgmt_util:response_array_schema(<<"List all rewrite rules">>, topic_rewrite_schema())}},
|
||||||
|
post => #{
|
||||||
|
description => <<"Update topic rewrite">>,
|
||||||
|
'requestBody' => emqx_mgmt_util:request_body_array_schema(topic_rewrite_schema()),
|
||||||
|
response => #{
|
||||||
|
<<"200">> =>
|
||||||
|
emqx_mgmt_util:response_schema(<<"Update topic rewrite success">>, topic_rewrite_schema()),
|
||||||
|
<<"413">> => emqx_mgmt_util:response_error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])}}},
|
||||||
|
{Path, Metadata, topic_rewrite}.
|
||||||
|
|
||||||
|
topic_rewrite(get, _Request) ->
|
||||||
|
{200, emqx_rewrite:list()};
|
||||||
|
|
||||||
|
topic_rewrite(post, Request) ->
|
||||||
|
{ok, Body, _} = cowboy_req:read_body(Request),
|
||||||
|
Params = emqx_json:decode(Body, [return_maps]),
|
||||||
|
case length(Params) < ?MAX_RULES_LIMIT of
|
||||||
|
true ->
|
||||||
|
ok = emqx_rewrite:update(Params),
|
||||||
|
{200, emqx_rewrite:list()};
|
||||||
|
_ ->
|
||||||
|
Message = list_to_binary(io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])),
|
||||||
|
{413, #{code => ?EXCEED_LIMIT, message => Message}}
|
||||||
|
end.
|
Loading…
Reference in New Issue