187 lines
6.6 KiB
Erlang
187 lines
6.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mod_rewrite).
|
|
|
|
-behaviour(emqx_gen_mod).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-ifdef(TEST).
|
|
-export([ compile_rules/1
|
|
, match_and_rewrite/3
|
|
]).
|
|
-endif.
|
|
|
|
%% APIs
|
|
-export([ rewrite_subscribe/4
|
|
, rewrite_unsubscribe/4
|
|
, rewrite_publish/2
|
|
]).
|
|
|
|
%% emqx_gen_mod callbacks
|
|
-export([ load/1
|
|
, unload/1
|
|
, description/0
|
|
]).
|
|
|
|
-type(topic() :: binary()).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Load/Unload
|
|
%%--------------------------------------------------------------------
|
|
|
|
load(RawRules) ->
|
|
{PubRules, SubRules} = compile_rules(RawRules),
|
|
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
|
|
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
|
|
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
|
|
|
|
rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
|
|
Binds = fill_client_binds(ClientInfo),
|
|
{ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
|
|
|
|
rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
|
|
Binds = fill_client_binds(ClientInfo),
|
|
{ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
|
|
|
|
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
|
|
Binds = fill_client_binds(Message),
|
|
{ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}.
|
|
|
|
unload(_) ->
|
|
?LOG(info, "[Rewrite] Unload"),
|
|
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
|
|
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
|
|
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
|
|
|
description() ->
|
|
"EMQX Topic Rewrite Module".
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
compile_rules(RawRules) ->
|
|
compile(validate_rules(RawRules)).
|
|
|
|
compile({PubRules, SubRules}) ->
|
|
CompileRE =
|
|
fun({rewrite, RewriteFrom, Re, RewriteTo}) ->
|
|
{ok, MP} = re:compile(Re),
|
|
{rewrite, RewriteFrom, MP, RewriteTo}
|
|
end,
|
|
{lists:map(CompileRE, PubRules), lists:map(CompileRE, SubRules)}.
|
|
|
|
validate_rules(Rules) ->
|
|
PubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
|
|
{rewrite, pub, RewriteFrom, Re, RewriteTo} <- Rules,
|
|
validate_rule(pub, RewriteFrom, RewriteTo)
|
|
],
|
|
SubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
|
|
{rewrite, sub, RewriteFrom, Re, RewriteTo} <- Rules,
|
|
validate_rule(sub, RewriteFrom, RewriteTo)
|
|
],
|
|
?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p",
|
|
[erlang:length(PubRules), erlang:length(SubRules)]),
|
|
log_rules(pub, PubRules),
|
|
log_rules(sub, SubRules),
|
|
{PubRules, SubRules}.
|
|
|
|
validate_rule(Type, RewriteFrom, RewriteTo) ->
|
|
case validate_topic(filter, RewriteFrom) of
|
|
ok ->
|
|
case validate_topic(dest_topic_type(Type), RewriteTo) of
|
|
ok ->
|
|
true;
|
|
{error, Reason} ->
|
|
log_invalid_rule(to, Type, RewriteTo, Reason),
|
|
false
|
|
end;
|
|
{error, Reason} ->
|
|
log_invalid_rule(from, Type, RewriteFrom, Reason),
|
|
false
|
|
end.
|
|
|
|
log_invalid_rule(Direction, Type, Topic, Reason) ->
|
|
?LOG(warning, "Invalid rewrite ~p rule for rewrite ~p topic '~ts' discarded. Reason: ~p",
|
|
[type_to_name(Type), Direction, Topic, Reason]).
|
|
|
|
log_rules(Type, Rules) ->
|
|
do_log_rules(Type, Rules, 1).
|
|
|
|
do_log_rules(_Type, [], _Index) -> ok;
|
|
do_log_rules(Type, [{_, Topic, Re, Dest} | Rules], Index) ->
|
|
?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts",
|
|
[Type, Index, Topic, Re, Dest]),
|
|
do_log_rules(Type, Rules, Index + 1).
|
|
|
|
match_and_rewrite(Topic, [], _) ->
|
|
Topic;
|
|
|
|
match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules], Binds) ->
|
|
case emqx_topic:match(Topic, Filter) of
|
|
true -> rewrite(Topic, MP, Dest, Binds);
|
|
false -> match_and_rewrite(Topic, Rules, Binds)
|
|
end.
|
|
|
|
rewrite(Topic, MP, Dest, Binds) ->
|
|
NewTopic =
|
|
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
|
{match, Captured} ->
|
|
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|
|
|| I <- lists:seq(1, length(Captured))], Captured),
|
|
iolist_to_binary(lists:foldl(
|
|
fun({Var, Val}, Acc) ->
|
|
re:replace(Acc, Var, Val, [global])
|
|
end, Dest, Binds ++ Vars));
|
|
nomatch -> Topic
|
|
end,
|
|
?LOG(debug, "[Rewrite] topic ~0p, params: ~0p dest topic: ~p", [Topic, Binds, NewTopic]),
|
|
NewTopic.
|
|
|
|
fill_client_binds(#{clientid := ClientId, username := Username}) ->
|
|
filter_client_binds([{"%c", ClientId}, {"%u", Username}]);
|
|
|
|
fill_client_binds(#message{from = ClientId, headers = Headers}) ->
|
|
Username = maps:get(username, Headers, undefined),
|
|
filter_client_binds([{"%c", ClientId}, {"%u", Username}]).
|
|
|
|
filter_client_binds(Binds) ->
|
|
lists:filter(fun({_, undefined}) -> false;
|
|
({_, <<"">>}) -> false;
|
|
({_, ""}) -> false;
|
|
(_) -> true
|
|
end,
|
|
Binds).
|
|
|
|
type_to_name(pub) -> 'PUBLISH';
|
|
type_to_name(sub) -> 'SUBSCRIBE'.
|
|
|
|
dest_topic_type(pub) -> name;
|
|
dest_topic_type(sub) -> filter.
|
|
|
|
-spec(validate_topic(name | filter, topic()) -> ok | {error, term()}).
|
|
validate_topic(Type, Topic) ->
|
|
try
|
|
true = emqx_topic:validate(Type, Topic),
|
|
ok
|
|
catch
|
|
error:Reason ->
|
|
{error, Reason}
|
|
end.
|