%%-------------------------------------------------------------------- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_rewrite). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -ifdef(TEST). -export([ compile/1, match_and_rewrite/3 ]). -endif. %% APIs -export([ rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2 ]). -export([ enable/0, disable/0 ]). -export([ list/0, update/1, post_config_update/5 ]). %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). -define(update(_Rules_), emqx_conf:update([rewrite], _Rules_, #{override_to => cluster}) ). %%------------------------------------------------------------------------------ %% Load/Unload %%------------------------------------------------------------------------------ enable() -> emqx_conf:add_handler([rewrite], ?MODULE), Rules = emqx_conf:get([rewrite], []), register_hook(Rules). disable() -> emqx_conf:remove_handler([rewrite]), unregister_hook(), ok. list() -> emqx_conf:get_raw([<<"rewrite">>], []). update(Rules0) -> case ?update(Rules0) of {ok, _} -> ok; {error, Reason} -> throw(Reason) end. post_config_update(_KeyPath, _Config, Rules, _OldConf, _AppEnvs) -> register_hook(Rules). register_hook([]) -> unregister_hook(); register_hook(Rules) -> {PubRules, SubRules, ErrRules} = compile(Rules), emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, ?HP_REWRITE), emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, ?HP_REWRITE), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, ?HP_REWRITE), case ErrRules of [] -> ok; _ -> ?SLOG(error, #{ msg => "rewrite_rule_re_compile_failed", error_rules => ErrRules }), {error, ErrRules} end. unregister_hook() -> emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) -> Binds = fill_client_binds(ClientInfo), {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) -> Binds = fill_client_binds(ClientInfo), {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> Binds = fill_client_binds(Message), {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}. %%------------------------------------------------------------------------------ %% Telemetry %%------------------------------------------------------------------------------ -spec get_basic_usage_info() -> #{topic_rewrite_rule_count => non_neg_integer()}. get_basic_usage_info() -> RewriteRules = list(), #{topic_rewrite_rule_count => length(RewriteRules)}. %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ compile(Rules) -> lists:foldl( fun(Rule, {Publish, Subscribe, Error}) -> #{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule, case re:compile(Re) of {ok, MP} -> case Action of publish -> {[{Topic, MP, Dest} | Publish], Subscribe, Error}; subscribe -> {Publish, [{Topic, MP, Dest} | Subscribe], Error}; all -> {[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error} end; {error, ErrSpec} -> {Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]} end end, {[], [], []}, Rules ). match_and_rewrite(Topic, [], _) -> Topic; match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) -> case emqx_topic:match(Topic, Filter) of true -> rewrite(Topic, MP, Dest, Binds); false -> match_and_rewrite(Topic, Rules, Binds) end. rewrite(SharedRecord = #share{topic = Topic}, MP, Dest, Binds) -> SharedRecord#share{topic = rewrite(Topic, MP, Dest, Binds)}; rewrite(Topic, MP, Dest, Binds) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> Vars = lists:zip( [ "\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured)) ], Captured ), iolist_to_binary( lists:foldl( fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) end, Dest, Binds ++ Vars ) ); nomatch -> Topic end. fill_client_binds(#{clientid := ClientId, username := Username}) -> filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]); fill_client_binds(#message{from = ClientId, headers = Headers}) -> Username = maps:get(username, Headers, undefined), filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]). filter_client_binds(Binds) -> lists:filter( fun ({_, undefined}) -> false; ({_, <<"">>}) -> false; ({_, ""}) -> false; (_) -> true end, Binds ).