diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index 4de58fbd7..cdbe0857c 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -32,6 +32,7 @@ open_listeners/1, close_listeners/1, load_all_plugins/0, unload_all_plugins/0, load_plugin/1, unload_plugin/1, + load_all_mods/0, loaded_plugins/0, is_running/1]). @@ -202,6 +203,14 @@ unload_app(App) -> lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} end. +load_all_mods() -> + Mods = application:get_env(emqttd, modules, []), + lists:foreach(fun({Name, Opts}) -> + Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), + Mod:load(Opts), + lager:info("load module ~s successfully", [Name]) + end, Mods). + %%------------------------------------------------------------------------------ %% @doc Is running? %% @end diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 95018f5eb..5a8daf60f 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -51,8 +51,9 @@ start(_StartType, _StartArgs) -> emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), - {ok, Listeners} = application:get_env(listeners), + emqttd:load_all_mods(), emqttd:load_all_plugins(), + {ok, Listeners} = application:get_env(listeners), emqttd:open_listeners(Listeners), register(emqttd, self()), print_vsn(), @@ -78,6 +79,7 @@ start_servers(Sup) -> {"emqttd metrics", emqttd_metrics}, %{"emqttd router", emqttd_router}, {"emqttd broker", emqttd_broker}, + {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, {"emqttd system monitor", emqttd_sysmon}], diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 4a9b55ea6..a2f1c6ba1 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -42,6 +42,9 @@ %% Event API -export([subscribe/1, notify/2]). +%% Hook API +-export([hook/2, unhook/2, run_hooks/2]). + %% Broker API -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). @@ -127,6 +130,22 @@ datetime() -> io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). +hook(Name, MFArgs) -> + gen_server:call(?MODULE, {hook, Name, MFArgs}). + +unhook(Name, MF) -> + gen_server:call(?MODULE, {unhook, Name, MF}). + +run_hooks(Name, Args) -> + case ets:lookup(?BROKER_TAB, {hook, Name}) of + [{_, Hooks}] -> + lists:foreach(fun({M, F, A}) -> + apply(M, F, Args++A) + end, Hooks); + [] -> + ok + end. + %%------------------------------------------------------------------------------ %% @doc Start a tick timer %% @end @@ -163,6 +182,31 @@ init([]) -> handle_call(uptime, _From, State) -> {reply, uptime(State), State}; +handle_call({hook, Name, MFArgs}, _From, State) -> + Key = {hook, Name}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + case lists:member(MFArgs, Hooks) of + true -> + {error, existed}; + false -> + ets:insert(?BROKER_TAB, {Key, Hooks ++ [MFArgs]}) + end; + [] -> + ets:insert(?BROKER_TAB, {Key, [MFArgs]}) + end, + {reply, Reply, State}; + +handle_call({unhook, Name, MFArgs}, _From, State) -> + Key = {hook, Name}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + ets:insert(?BROKER_TAB, {Key, remove_hook(MFArgs, Hooks, [])}); + [] -> + {error, not_found} + end, + {reply, Reply, State}; + handle_call(_Request, _From, State) -> {reply, error, State}. @@ -189,6 +233,15 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +remove_hook(_Hook, [], Acc) -> + lists:reverse(Acc); +remove_hook(Hook, [Hook|Hooks], Acc) -> + remove_hook(Hook, Hooks, Acc); +remove_hook(Hook = {M,F}, [{M,F,_A}|Hooks], Acc) -> + remove_hook(Hook, Hooks, Acc); +remove_hook(Hook, [H|Hooks], Acc) -> + remove_hook(Hook, Hooks, [H|Acc]). + create_topic(Topic) -> emqttd_pubsub:create(emqtt_topic:systop(Topic)). diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 6c5469090..433f81393 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -121,7 +121,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; -handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> +handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; diff --git a/apps/emqttd/src/emqttd_gen_mod.erl b/apps/emqttd/src/emqttd_gen_mod.erl new file mode 100644 index 000000000..190971d0e --- /dev/null +++ b/apps/emqttd/src/emqttd_gen_mod.erl @@ -0,0 +1,49 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd gen_mod behaviour +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_gen_mod). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-ifdef(use_specs). + +-callback load(Opts :: any()) -> {ok, State :: any()}. + +-callback unload(State :: any()) -> any(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{load, 1}, {unload, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. + diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl new file mode 100644 index 000000000..039066c3e --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -0,0 +1,50 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd auto subscribe module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_mod_autosub). + +-author("Feng Lee "). + +-behaviour(emqttd_gen_mod). + +-export([load/1, subscribe/2, unload/1]). + +-record(state, {topics}). + +load(Opts) -> + Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], + emqttd_broker:hook(client_connected, {?MODULE, subscribe, [Topics]}), + {ok, #state{topics = Topics}}. + +subscribe({Client, ClientId}, Topics) -> + F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, + [Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]. + +unload(_Opts) -> + emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). + + diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl new file mode 100644 index 000000000..fa499443c --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -0,0 +1,48 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd rewrite module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_mod_rewrite). + +-author("Feng Lee "). + +-behaviour(emqttd_gen_mod). + +-export([load/1, rewrite/1, unload/1]). + +load(Opts) -> + ok. + +rewrite(Topic) -> + Topic. + +reload(Opts) -> + ok. + +unload(_Opts) -> + ok. + + diff --git a/apps/emqttd/src/emqttd_mod_sup.erl b/apps/emqttd/src/emqttd_mod_sup.erl new file mode 100644 index 000000000..8fa3ced59 --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_sup.erl @@ -0,0 +1,67 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd module supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_mod_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_child/1, start_child/2]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). + +%%%============================================================================= +%%% API +%%%============================================================================= + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(ChildSpec) when is_tuple(ChildSpec) -> + supervisor:start_child(?MODULE, ChildSpec). + +%% +%% start_child(Mod::atom(), Type::type()) -> {ok, pid()} +%% @type type() = worker | supervisor +%% +start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> + supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). + +%%%============================================================================= +%%% Supervisor callbacks +%%%============================================================================= + +init([]) -> + {ok, {{one_for_one, 10, 3600}, []}}. + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 03845c482..54b4cb1b7 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -134,10 +134,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = emqttd_cm:register(ClientId1), %%Starting session {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), - %% Force subscriptions - force_subscribe(ClientId1), %% Start keepalive start_keepalive(KeepAlive), + %% Run hooks + emqttd_broker:run_hooks(client_connected, [{self(), ClientId1}]), {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, session = Session, will_msg = willmsg(Var)}}; @@ -298,18 +298,6 @@ send_willmsg(_ClientId, undefined) -> send_willmsg(ClientId, WillMsg) -> emqttd_pubsub:publish(ClientId, WillMsg). -%%TODO: will be fixed in 0.8 -force_subscribe(ClientId) -> - [force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- - proplists:get_value(forced_subscriptions, emqttd:env(mqtt, client), [])]. - -force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) -> - force_subscribe(ClientId, {list_to_binary(Topic), Qos}); - -force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) -> - Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic), - self() ! {force_subscribe, Topic1, Qos}. - start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> diff --git a/apps/emqttd/src/emqttd_sup.erl b/apps/emqttd/src/emqttd_sup.erl index bb460d144..6e57818b7 100644 --- a/apps/emqttd/src/emqttd_sup.erl +++ b/apps/emqttd/src/emqttd_sup.erl @@ -39,7 +39,7 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). %%%============================================================================= %%% API @@ -63,5 +63,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> %%%============================================================================= init([]) -> - {ok, {{one_for_all, 10, 100}, []}}. + {ok, {{one_for_all, 10, 3600}, []}}. diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 2c84c02ee..68c22400c 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -67,8 +67,6 @@ ]}, %% Client {client, [ - %% Subscribe topics automatically when client connected - {forced_subscriptions, [{"$Q/client/$c", 0}]} %TODO: Network ingoing limit %{ingoing_rate_limit, '64KB/s'} %TODO: Reconnet control @@ -108,6 +106,11 @@ {ping_down_interval, 1} %seconds ]} ]}, + %% Modules + {modules, [ + %% Subscribe topics automatically when client connected + {autosub, [{"$Q/client/$c", 0}]} + ]}, %% Listeners {listeners, [ {mqtt, 1883, [