diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index a2f1c6ba1..ebff43718 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -43,7 +43,7 @@ -export([subscribe/1, notify/2]). %% Hook API --export([hook/2, unhook/2, run_hooks/2]). +-export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]). %% Broker API -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). @@ -130,18 +130,48 @@ datetime() -> io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). -hook(Name, MFArgs) -> - gen_server:call(?MODULE, {hook, Name, MFArgs}). +%%------------------------------------------------------------------------------ +%% @doc Hook +%% @end +%%------------------------------------------------------------------------------ +-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. +hook(Hook, Name, MFA) -> + gen_server:call(?MODULE, {hook, Hook, Name, MFA}). -unhook(Name, MF) -> - gen_server:call(?MODULE, {unhook, Name, MF}). +%%------------------------------------------------------------------------------ +%% @doc Unhook +%% @end +%%------------------------------------------------------------------------------ +-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. +unhook(Hook, Name) -> + gen_server:call(?MODULE, {unhook, Hook, Name}). -run_hooks(Name, Args) -> - case ets:lookup(?BROKER_TAB, {hook, Name}) of - [{_, Hooks}] -> - lists:foreach(fun({M, F, A}) -> +%%------------------------------------------------------------------------------ +%% @doc Foreach hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any(). +foreach_hooks(Hook, Args) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foreach(fun({_Name, {M, F, A}}) -> apply(M, F, Args++A) end, Hooks); + [] -> + ok + end. + +%%------------------------------------------------------------------------------ +%% @doc Foldl hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any(). +foldl_hooks(Hook, Args, Acc0) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foldl(fun({_Name, {M, F, A}}, Acc) -> + apply(M, F, [Acc, Args++A]) + end, Acc0, Hooks); [] -> ok end. @@ -182,33 +212,33 @@ init([]) -> handle_call(uptime, _From, State) -> {reply, uptime(State), State}; -handle_call({hook, Name, MFArgs}, _From, State) -> - Key = {hook, Name}, Reply = +handle_call({hook, Hook, Name, MFArgs}, _From, State) -> + Key = {hook, Hook}, Reply = case ets:lookup(?BROKER_TAB, Key) of [{Key, Hooks}] -> - case lists:member(MFArgs, Hooks) of - true -> + case lists:keyfind(Name, 1, Hooks) of + {Name, _MFArgs} -> {error, existed}; false -> - ets:insert(?BROKER_TAB, {Key, Hooks ++ [MFArgs]}) + ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]}) end; [] -> - ets:insert(?BROKER_TAB, {Key, [MFArgs]}) + ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]}) end, {reply, Reply, State}; -handle_call({unhook, Name, MFArgs}, _From, State) -> +handle_call({unhook, Name}, _From, State) -> Key = {hook, Name}, Reply = case ets:lookup(?BROKER_TAB, Key) of [{Key, Hooks}] -> - ets:insert(?BROKER_TAB, {Key, remove_hook(MFArgs, Hooks, [])}); + ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)}); [] -> {error, not_found} end, {reply, Reply, State}; handle_call(_Request, _From, State) -> - {reply, error, State}. + {reply, {error, unsupport_request}, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -233,15 +263,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -remove_hook(_Hook, [], Acc) -> - lists:reverse(Acc); -remove_hook(Hook, [Hook|Hooks], Acc) -> - remove_hook(Hook, Hooks, Acc); -remove_hook(Hook = {M,F}, [{M,F,_A}|Hooks], Acc) -> - remove_hook(Hook, Hooks, Acc); -remove_hook(Hook, [H|Hooks], Acc) -> - remove_hook(Hook, Hooks, [H|Acc]). - create_topic(Topic) -> emqttd_pubsub:create(emqtt_topic:systop(Topic)). diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl index 039066c3e..0ed1be5e1 100644 --- a/apps/emqttd/src/emqttd_mod_autosub.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -37,7 +37,8 @@ load(Opts) -> Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], - emqttd_broker:hook(client_connected, {?MODULE, subscribe, [Topics]}), + emqttd_broker:hook(client_connected, {?MODULE, subscribe}, + {?MODULE, subscribe, [Topics]}), {ok, #state{topics = Topics}}. subscribe({Client, ClientId}, Topics) -> @@ -47,4 +48,3 @@ subscribe({Client, ClientId}, Topics) -> unload(_Opts) -> emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). - diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl index fa499443c..bc7e4aa73 100644 --- a/apps/emqttd/src/emqttd_mod_rewrite.erl +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -31,18 +31,52 @@ -behaviour(emqttd_gen_mod). --export([load/1, rewrite/1, unload/1]). +-export([load/1, reload/1, unload/1]). + +-export([rewrite/2]). + +%%%============================================================================= +%%% API +%%%============================================================================= load(Opts) -> - ok. + File = proplists:get_value(file, Opts), + Sections = compile(file:consult(File)), + emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, + {?MODULE, rewrite, [subscribe, Sections]}), + emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, + {?MODULE, rewrite_unsubscribe, [unsubscribe, Sections]}), + emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, + {?MODULE, rewrite_publish, [publish, Sections]}). -rewrite(Topic) -> - Topic. +rewrite(TopicTable, [subscribe, _Sections]) -> + lager:info("Rewrite Subscribe: ~p", [TopicTable]), + TopicTable; -reload(Opts) -> - ok. +rewrite(Topics, [unsubscribe, _Sections]) -> + lager:info("Rewrite Unsubscribe: ~p", [Topics]), + Topics; + +rewrite(Message, [publish, _Sections]) -> + Message. + +reload(File) -> + %%TODO: The unload api is not right... + unload(state), load([{file, File}]). -unload(_Opts) -> - ok. +unload(_) -> + emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}). +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +compile(Sections) -> + C = fun({rewrite, Re, Dest}) -> + {ok, MP} = re:compile(Re), + {rewrite, MP, Dest} + end, + [{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections]. diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 54b4cb1b7..0c9bf62a5 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -212,8 +212,9 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> + TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable), %%TODO: GrantedQos should be renamed. - {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) end; @@ -226,7 +227,8 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), + Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics), + {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1), send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession}); handle(?PACKET(?PINGREQ), State) -> diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 68c22400c..f2ba8109d 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -46,14 +46,14 @@ %% Authentication with username, password %{username, []}, %% Authentication with clientid - %{clientid, [{password, no}, {file, "etc/clients.config"}]}, + %{clientid, [{password, no}, {file, "etc/conf/clients.config"}]}, %% Allow all {anonymous, []} ]}, %% ACL config {acl, [ %% Internal ACL module - {internal, [{file, "etc/acl.config"}, {nomatch, allow}]} + {internal, [{file, "etc/conf/acl.config"}, {nomatch, allow}]} ]} ]}, %% MQTT Protocol Options @@ -109,7 +109,10 @@ %% Modules {modules, [ %% Subscribe topics automatically when client connected - {autosub, [{"$Q/client/$c", 0}]} + {autosub, [{"$Q/client/$c", 0}]}, + %% Rewrite rules + {rewrite, [{file, "etc/conf/rewrite.config"}]} + ]}, %% Listeners {listeners, [ @@ -137,8 +140,8 @@ %% Socket Access Control {access, [{allow, all}]}, %% SSL certificate and key files - {ssl, [{certfile, "etc/ssl.crt"}, - {keyfile, "etc/ssl.key"}]}, + {ssl, [{certfile, "etc/ssl/ssl.crt"}, + {keyfile, "etc/ssl/ssl.key"}]}, %% Socket Options {sockopts, [ {backlog, 1024} diff --git a/rel/reltool.config b/rel/reltool.config index 933300fd3..6e122a77d 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -63,6 +63,8 @@ {overlay, [ {mkdir, "log/"}, {mkdir, "etc/"}, + {mkdir, "etc/ssl/"}, + {mkdir, "etc/conf/"}, {mkdir, "data/"}, {mkdir, "plugins/"}, {copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"}, @@ -72,11 +74,12 @@ {template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, - {copy, "files/ssl/ssl.crt", "etc/ssl.crt"}, - {copy, "files/ssl/ssl.key", "etc/ssl.key"}, + {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, + {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, {template, "files/emqttd.config", "etc/emqttd.config"}, - {template, "files/acl.config", "etc/acl.config"}, - {template, "files/clients.config", "etc/clients.config"}, + {template, "files/acl.config", "etc/conf/acl.config"}, + {template, "files/rewrite.config", "etc/conf/rewrite.config"}, + {template, "files/clients.config", "etc/conf/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, {template, "files/vm.args", "etc/vm.args"} ]}.