emqx/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl

104 lines
3.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_rewrite).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-ifdef(TEST).
-export([ compile/1
, match_and_rewrite/2
]).
-endif.
%% APIs
-export([ rewrite_subscribe/4
, rewrite_unsubscribe/4
, rewrite_publish/2
]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
load(RawRules) ->
{PubRules, SubRules} = compile(RawRules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
{ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}.
unload(_) ->
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
description() ->
"EMQ X Topic Rewrite Module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
compile(Rules) ->
PubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, pub, Topic, Re, Dest}<- Rules ],
SubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
{PubRules, SubRules}.
match_and_rewrite(Topic, []) ->
Topic;
match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) ->
case emqx_topic:match(Topic, Filter) of
true -> rewrite(Topic, MP, Dest);
false -> match_and_rewrite(Topic, Rules)
end.
rewrite(Topic, MP, Dest) ->
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} ->
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|| I <- lists:seq(1, length(Captured))], Captured),
iolist_to_binary(lists:foldl(
fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global])
end, Dest, Vars));
nomatch -> Topic
end.