mod rewrite

This commit is contained in:
Feng Lee 2015-05-24 18:33:53 +08:00
parent 74024acd01
commit d19805b68c
6 changed files with 111 additions and 48 deletions

View File

@ -43,7 +43,7 @@
-export([subscribe/1, notify/2]). -export([subscribe/1, notify/2]).
%% Hook API %% Hook API
-export([hook/2, unhook/2, run_hooks/2]). -export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]).
%% Broker API %% Broker API
-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
@ -130,18 +130,48 @@ datetime() ->
io_lib:format( io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
hook(Name, MFArgs) -> %%------------------------------------------------------------------------------
gen_server:call(?MODULE, {hook, Name, MFArgs}). %% @doc Hook
%% @end
%%------------------------------------------------------------------------------
-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
hook(Hook, Name, MFA) ->
gen_server:call(?MODULE, {hook, Hook, Name, MFA}).
unhook(Name, MF) -> %%------------------------------------------------------------------------------
gen_server:call(?MODULE, {unhook, Name, MF}). %% @doc Unhook
%% @end
%%------------------------------------------------------------------------------
-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
unhook(Hook, Name) ->
gen_server:call(?MODULE, {unhook, Hook, Name}).
run_hooks(Name, Args) -> %%------------------------------------------------------------------------------
case ets:lookup(?BROKER_TAB, {hook, Name}) of %% @doc Foreach hooks
[{_, Hooks}] -> %% @end
lists:foreach(fun({M, F, A}) -> %%------------------------------------------------------------------------------
-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any().
foreach_hooks(Hook, Args) ->
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
[{_, Hooks}] ->
lists:foreach(fun({_Name, {M, F, A}}) ->
apply(M, F, Args++A) apply(M, F, Args++A)
end, Hooks); end, Hooks);
[] ->
ok
end.
%%------------------------------------------------------------------------------
%% @doc Foldl hooks
%% @end
%%------------------------------------------------------------------------------
-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any().
foldl_hooks(Hook, Args, Acc0) ->
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
[{_, Hooks}] ->
lists:foldl(fun({_Name, {M, F, A}}, Acc) ->
apply(M, F, [Acc, Args++A])
end, Acc0, Hooks);
[] -> [] ->
ok ok
end. end.
@ -182,33 +212,33 @@ init([]) ->
handle_call(uptime, _From, State) -> handle_call(uptime, _From, State) ->
{reply, uptime(State), State}; {reply, uptime(State), State};
handle_call({hook, Name, MFArgs}, _From, State) -> handle_call({hook, Hook, Name, MFArgs}, _From, State) ->
Key = {hook, Name}, Reply = Key = {hook, Hook}, Reply =
case ets:lookup(?BROKER_TAB, Key) of case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] -> [{Key, Hooks}] ->
case lists:member(MFArgs, Hooks) of case lists:keyfind(Name, 1, Hooks) of
true -> {Name, _MFArgs} ->
{error, existed}; {error, existed};
false -> false ->
ets:insert(?BROKER_TAB, {Key, Hooks ++ [MFArgs]}) ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]})
end; end;
[] -> [] ->
ets:insert(?BROKER_TAB, {Key, [MFArgs]}) ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]})
end, end,
{reply, Reply, State}; {reply, Reply, State};
handle_call({unhook, Name, MFArgs}, _From, State) -> handle_call({unhook, Name}, _From, State) ->
Key = {hook, Name}, Reply = Key = {hook, Name}, Reply =
case ets:lookup(?BROKER_TAB, Key) of case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] -> [{Key, Hooks}] ->
ets:insert(?BROKER_TAB, {Key, remove_hook(MFArgs, Hooks, [])}); ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)});
[] -> [] ->
{error, not_found} {error, not_found}
end, end,
{reply, Reply, State}; {reply, Reply, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, error, State}. {reply, {error, unsupport_request}, State}.
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
@ -233,15 +263,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
remove_hook(_Hook, [], Acc) ->
lists:reverse(Acc);
remove_hook(Hook, [Hook|Hooks], Acc) ->
remove_hook(Hook, Hooks, Acc);
remove_hook(Hook = {M,F}, [{M,F,_A}|Hooks], Acc) ->
remove_hook(Hook, Hooks, Acc);
remove_hook(Hook, [H|Hooks], Acc) ->
remove_hook(Hook, Hooks, [H|Acc]).
create_topic(Topic) -> create_topic(Topic) ->
emqttd_pubsub:create(emqtt_topic:systop(Topic)). emqttd_pubsub:create(emqtt_topic:systop(Topic)).

View File

@ -37,7 +37,8 @@
load(Opts) -> load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
emqttd_broker:hook(client_connected, {?MODULE, subscribe, [Topics]}), emqttd_broker:hook(client_connected, {?MODULE, subscribe},
{?MODULE, subscribe, [Topics]}),
{ok, #state{topics = Topics}}. {ok, #state{topics = Topics}}.
subscribe({Client, ClientId}, Topics) -> subscribe({Client, ClientId}, Topics) ->
@ -47,4 +48,3 @@ subscribe({Client, ClientId}, Topics) ->
unload(_Opts) -> unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). emqttd_broker:unhook(client_connected, {?MODULE, subscribe}).

View File

@ -31,18 +31,52 @@
-behaviour(emqttd_gen_mod). -behaviour(emqttd_gen_mod).
-export([load/1, rewrite/1, unload/1]). -export([load/1, reload/1, unload/1]).
-export([rewrite/2]).
%%%=============================================================================
%%% API
%%%=============================================================================
load(Opts) -> load(Opts) ->
ok. File = proplists:get_value(file, Opts),
Sections = compile(file:consult(File)),
emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe},
{?MODULE, rewrite, [subscribe, Sections]}),
emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe},
{?MODULE, rewrite_unsubscribe, [unsubscribe, Sections]}),
emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish},
{?MODULE, rewrite_publish, [publish, Sections]}).
rewrite(Topic) -> rewrite(TopicTable, [subscribe, _Sections]) ->
Topic. lager:info("Rewrite Subscribe: ~p", [TopicTable]),
TopicTable;
reload(Opts) -> rewrite(Topics, [unsubscribe, _Sections]) ->
ok. lager:info("Rewrite Unsubscribe: ~p", [Topics]),
Topics;
rewrite(Message, [publish, _Sections]) ->
Message.
reload(File) ->
%%TODO: The unload api is not right...
unload(state), load([{file, File}]).
unload(_Opts) -> unload(_) ->
ok. emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}),
emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}),
emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}).
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
compile(Sections) ->
C = fun({rewrite, Re, Dest}) ->
{ok, MP} = re:compile(Re),
{rewrite, MP, Dest}
end,
[{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections].

View File

@ -212,8 +212,9 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid =
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
{ok, State}; {ok, State};
false -> false ->
TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable),
%%TODO: GrantedQos should be renamed. %%TODO: GrantedQos should be renamed.
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
end; end;
@ -226,7 +227,8 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State); send(?UNSUBACK_PACKET(PacketId), State);
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics),
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1),
send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession}); send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
handle(?PACKET(?PINGREQ), State) -> handle(?PACKET(?PINGREQ), State) ->

View File

@ -46,14 +46,14 @@
%% Authentication with username, password %% Authentication with username, password
%{username, []}, %{username, []},
%% Authentication with clientid %% Authentication with clientid
%{clientid, [{password, no}, {file, "etc/clients.config"}]}, %{clientid, [{password, no}, {file, "etc/conf/clients.config"}]},
%% Allow all %% Allow all
{anonymous, []} {anonymous, []}
]}, ]},
%% ACL config %% ACL config
{acl, [ {acl, [
%% Internal ACL module %% Internal ACL module
{internal, [{file, "etc/acl.config"}, {nomatch, allow}]} {internal, [{file, "etc/conf/acl.config"}, {nomatch, allow}]}
]} ]}
]}, ]},
%% MQTT Protocol Options %% MQTT Protocol Options
@ -109,7 +109,10 @@
%% Modules %% Modules
{modules, [ {modules, [
%% Subscribe topics automatically when client connected %% Subscribe topics automatically when client connected
{autosub, [{"$Q/client/$c", 0}]} {autosub, [{"$Q/client/$c", 0}]},
%% Rewrite rules
{rewrite, [{file, "etc/conf/rewrite.config"}]}
]}, ]},
%% Listeners %% Listeners
{listeners, [ {listeners, [
@ -137,8 +140,8 @@
%% Socket Access Control %% Socket Access Control
{access, [{allow, all}]}, {access, [{allow, all}]},
%% SSL certificate and key files %% SSL certificate and key files
{ssl, [{certfile, "etc/ssl.crt"}, {ssl, [{certfile, "etc/ssl/ssl.crt"},
{keyfile, "etc/ssl.key"}]}, {keyfile, "etc/ssl/ssl.key"}]},
%% Socket Options %% Socket Options
{sockopts, [ {sockopts, [
{backlog, 1024} {backlog, 1024}

View File

@ -63,6 +63,8 @@
{overlay, [ {overlay, [
{mkdir, "log/"}, {mkdir, "log/"},
{mkdir, "etc/"}, {mkdir, "etc/"},
{mkdir, "etc/ssl/"},
{mkdir, "etc/conf/"},
{mkdir, "data/"}, {mkdir, "data/"},
{mkdir, "plugins/"}, {mkdir, "plugins/"},
{copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"}, {copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"},
@ -72,11 +74,12 @@
{template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {template, "files/emqttd.cmd", "bin/emqttd.cmd"},
{copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"},
{copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"},
{copy, "files/ssl/ssl.crt", "etc/ssl.crt"}, {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"},
{copy, "files/ssl/ssl.key", "etc/ssl.key"}, {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"},
{template, "files/emqttd.config", "etc/emqttd.config"}, {template, "files/emqttd.config", "etc/emqttd.config"},
{template, "files/acl.config", "etc/acl.config"}, {template, "files/acl.config", "etc/conf/acl.config"},
{template, "files/clients.config", "etc/clients.config"}, {template, "files/rewrite.config", "etc/conf/rewrite.config"},
{template, "files/clients.config", "etc/conf/clients.config"},
{template, "files/plugins.config", "etc/plugins.config"}, {template, "files/plugins.config", "etc/plugins.config"},
{template, "files/vm.args", "etc/vm.args"} {template, "files/vm.args", "etc/vm.args"}
]}. ]}.