rewrite
This commit is contained in:
parent
c2d4a60dec
commit
46545be9d0
|
@ -29,6 +29,8 @@
|
|||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-include_lib("emqtt/include/emqtt.hrl").
|
||||
|
||||
-behaviour(emqttd_gen_mod).
|
||||
|
||||
-export([load/1, reload/1, unload/1]).
|
||||
|
@ -50,20 +52,33 @@ load(Opts) ->
|
|||
emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish},
|
||||
{?MODULE, rewrite_publish, [publish, Sections]}).
|
||||
|
||||
rewrite(TopicTable, [subscribe, _Sections]) ->
|
||||
lager:info("Rewrite Subscribe: ~p", [TopicTable]),
|
||||
TopicTable;
|
||||
rewrite(TopicTable, [subscribe, Sections]) ->
|
||||
[{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
|
||||
|
||||
rewrite(Topics, [unsubscribe, _Sections]) ->
|
||||
lager:info("Rewrite Unsubscribe: ~p", [Topics]),
|
||||
Topics;
|
||||
rewrite(Topics, [unsubscribe, Sections]) ->
|
||||
[match_topic(Topic, Sections) || Topic <- Topics];
|
||||
|
||||
rewrite(Message, [publish, _Sections]) ->
|
||||
Message.
|
||||
rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) ->
|
||||
%%TODO: this will not work if the client is always online.
|
||||
RewriteTopic =
|
||||
case get({rewrite, Topic}) of
|
||||
undefined ->
|
||||
DestTopic = match_topic(Topic, Sections),
|
||||
put({rewrite, Topic}, DestTopic), DestTopic;
|
||||
DestTopic ->
|
||||
DestTopic
|
||||
end,
|
||||
Message#mqtt_message{topic = RewriteTopic}.
|
||||
|
||||
reload(File) ->
|
||||
%%TODO: The unload api is not right...
|
||||
unload(state), load([{file, File}]).
|
||||
case emqttd:is_mod_enabled(rewrite) of
|
||||
true ->
|
||||
unload(state),
|
||||
load([{file, File}]);
|
||||
false ->
|
||||
{error, module_unloaded}
|
||||
end.
|
||||
|
||||
unload(_) ->
|
||||
emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}),
|
||||
|
@ -81,3 +96,27 @@ compile(Sections) ->
|
|||
end,
|
||||
[{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections].
|
||||
|
||||
match_topic(Topic, []) ->
|
||||
Topic;
|
||||
match_topic(Topic, [{topic, Filter, Rules}|Sections]) ->
|
||||
case emqtt_topic:match(Topic, Filter) of
|
||||
true ->
|
||||
match_rule(Topic, Rules);
|
||||
false ->
|
||||
match_topic(Topic, Sections)
|
||||
end.
|
||||
|
||||
match_rule(Topic, []) ->
|
||||
Topic;
|
||||
match_rule(Topic, [{rewrite, MP, Dest}|Rules]) ->
|
||||
case re:run(Topic, MP, [{captrue, all_but_first, list}]) of
|
||||
{match, Captured} ->
|
||||
%%TODO: stupid??? how to replace $1, $2?
|
||||
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),
|
||||
iolist_to_binary(lists:foldl(
|
||||
fun({Var, Val}, Acc) ->
|
||||
re:replace(Acc, Var, Val, [global])
|
||||
end, Dest, Vars));
|
||||
nomatch ->
|
||||
match_rule(Topic, Rules)
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue