diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl index bba664af1..4131afcf6 100644 --- a/apps/emqttd/src/emqttd_mod_rewrite.erl +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -29,6 +29,8 @@ -author("Feng Lee "). +-include_lib("emqtt/include/emqtt.hrl"). + -behaviour(emqttd_gen_mod). -export([load/1, reload/1, unload/1]). @@ -50,20 +52,33 @@ load(Opts) -> emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, {?MODULE, rewrite_publish, [publish, Sections]}). -rewrite(TopicTable, [subscribe, _Sections]) -> - lager:info("Rewrite Subscribe: ~p", [TopicTable]), - TopicTable; +rewrite(TopicTable, [subscribe, Sections]) -> + [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; -rewrite(Topics, [unsubscribe, _Sections]) -> - lager:info("Rewrite Unsubscribe: ~p", [Topics]), - Topics; +rewrite(Topics, [unsubscribe, Sections]) -> + [match_topic(Topic, Sections) || Topic <- Topics]; -rewrite(Message, [publish, _Sections]) -> - Message. +rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> + %%TODO: this will not work if the client is always online. + RewriteTopic = + case get({rewrite, Topic}) of + undefined -> + DestTopic = match_topic(Topic, Sections), + put({rewrite, Topic}, DestTopic), DestTopic; + DestTopic -> + DestTopic + end, + Message#mqtt_message{topic = RewriteTopic}. reload(File) -> %%TODO: The unload api is not right... - unload(state), load([{file, File}]). + case emqttd:is_mod_enabled(rewrite) of + true -> + unload(state), + load([{file, File}]); + false -> + {error, module_unloaded} + end. unload(_) -> emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), @@ -81,3 +96,27 @@ compile(Sections) -> end, [{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections]. +match_topic(Topic, []) -> + Topic; +match_topic(Topic, [{topic, Filter, Rules}|Sections]) -> + case emqtt_topic:match(Topic, Filter) of + true -> + match_rule(Topic, Rules); + false -> + match_topic(Topic, Sections) + end. + +match_rule(Topic, []) -> + Topic; +match_rule(Topic, [{rewrite, MP, Dest}|Rules]) -> + case re:run(Topic, MP, [{captrue, all_but_first, list}]) of + {match, Captured} -> + %%TODO: stupid??? how to replace $1, $2? + Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), + iolist_to_binary(lists:foldl( + fun({Var, Val}, Acc) -> + re:replace(Acc, Var, Val, [global]) + end, Dest, Vars)); + nomatch -> + match_rule(Topic, Rules) + end.