broker hooks

This commit is contained in:
Feng Lee 2015-05-24 15:28:56 +08:00
parent 01bfb830f5
commit 74024acd01
11 changed files with 289 additions and 20 deletions

View File

@ -32,6 +32,7 @@
open_listeners/1, close_listeners/1, open_listeners/1, close_listeners/1,
load_all_plugins/0, unload_all_plugins/0, load_all_plugins/0, unload_all_plugins/0,
load_plugin/1, unload_plugin/1, load_plugin/1, unload_plugin/1,
load_all_mods/0,
loaded_plugins/0, loaded_plugins/0,
is_running/1]). is_running/1]).
@ -202,6 +203,14 @@ unload_app(App) ->
lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
end. end.
load_all_mods() ->
Mods = application:get_env(emqttd, modules, []),
lists:foreach(fun({Name, Opts}) ->
Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
Mod:load(Opts),
lager:info("load module ~s successfully", [Name])
end, Mods).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Is running? %% @doc Is running?
%% @end %% @end

View File

@ -51,8 +51,9 @@ start(_StartType, _StartArgs) ->
emqttd_mnesia:start(), emqttd_mnesia:start(),
{ok, Sup} = emqttd_sup:start_link(), {ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup), start_servers(Sup),
{ok, Listeners} = application:get_env(listeners), emqttd:load_all_mods(),
emqttd:load_all_plugins(), emqttd:load_all_plugins(),
{ok, Listeners} = application:get_env(listeners),
emqttd:open_listeners(Listeners), emqttd:open_listeners(Listeners),
register(emqttd, self()), register(emqttd, self()),
print_vsn(), print_vsn(),
@ -78,6 +79,7 @@ start_servers(Sup) ->
{"emqttd metrics", emqttd_metrics}, {"emqttd metrics", emqttd_metrics},
%{"emqttd router", emqttd_router}, %{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker}, {"emqttd broker", emqttd_broker},
{"emqttd mode supervisor", emqttd_mod_sup},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd access control", emqttd_access_control}, {"emqttd access control", emqttd_access_control},
{"emqttd system monitor", emqttd_sysmon}], {"emqttd system monitor", emqttd_sysmon}],

View File

@ -42,6 +42,9 @@
%% Event API %% Event API
-export([subscribe/1, notify/2]). -export([subscribe/1, notify/2]).
%% Hook API
-export([hook/2, unhook/2, run_hooks/2]).
%% Broker API %% Broker API
-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
@ -127,6 +130,22 @@ datetime() ->
io_lib:format( io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
hook(Name, MFArgs) ->
gen_server:call(?MODULE, {hook, Name, MFArgs}).
unhook(Name, MF) ->
gen_server:call(?MODULE, {unhook, Name, MF}).
run_hooks(Name, Args) ->
case ets:lookup(?BROKER_TAB, {hook, Name}) of
[{_, Hooks}] ->
lists:foreach(fun({M, F, A}) ->
apply(M, F, Args++A)
end, Hooks);
[] ->
ok
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start a tick timer %% @doc Start a tick timer
%% @end %% @end
@ -163,6 +182,31 @@ init([]) ->
handle_call(uptime, _From, State) -> handle_call(uptime, _From, State) ->
{reply, uptime(State), State}; {reply, uptime(State), State};
handle_call({hook, Name, MFArgs}, _From, State) ->
Key = {hook, Name}, Reply =
case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] ->
case lists:member(MFArgs, Hooks) of
true ->
{error, existed};
false ->
ets:insert(?BROKER_TAB, {Key, Hooks ++ [MFArgs]})
end;
[] ->
ets:insert(?BROKER_TAB, {Key, [MFArgs]})
end,
{reply, Reply, State};
handle_call({unhook, Name, MFArgs}, _From, State) ->
Key = {hook, Name}, Reply =
case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] ->
ets:insert(?BROKER_TAB, {Key, remove_hook(MFArgs, Hooks, [])});
[] ->
{error, not_found}
end,
{reply, Reply, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, error, State}. {reply, error, State}.
@ -189,6 +233,15 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
remove_hook(_Hook, [], Acc) ->
lists:reverse(Acc);
remove_hook(Hook, [Hook|Hooks], Acc) ->
remove_hook(Hook, Hooks, Acc);
remove_hook(Hook = {M,F}, [{M,F,_A}|Hooks], Acc) ->
remove_hook(Hook, Hooks, Acc);
remove_hook(Hook, [H|Hooks], Acc) ->
remove_hook(Hook, Hooks, [H|Acc]).
create_topic(Topic) -> create_topic(Topic) ->
emqttd_pubsub:create(emqtt_topic:systop(Topic)). emqttd_pubsub:create(emqtt_topic:systop(Topic)).

View File

@ -121,7 +121,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} =
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}}; {noreply, State#state{proto_state = ProtoState1}};
handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}}; {noreply, State#state{proto_state = ProtoState1}};

View File

@ -0,0 +1,49 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd gen_mod behaviour
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_gen_mod).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-ifdef(use_specs).
-callback load(Opts :: any()) -> {ok, State :: any()}.
-callback unload(State :: any()) -> any().
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{load, 1}, {unload, 1}];
behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -0,0 +1,50 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd auto subscribe module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_autosub).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(emqttd_gen_mod).
-export([load/1, subscribe/2, unload/1]).
-record(state, {topics}).
load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
emqttd_broker:hook(client_connected, {?MODULE, subscribe, [Topics]}),
{ok, #state{topics = Topics}}.
subscribe({Client, ClientId}, Topics) ->
F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end,
[Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics].
unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, subscribe}).

View File

@ -0,0 +1,48 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd rewrite module.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_rewrite).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(emqttd_gen_mod).
-export([load/1, rewrite/1, unload/1]).
load(Opts) ->
ok.
rewrite(Topic) ->
Topic.
reload(Opts) ->
ok.
unload(_Opts) ->
ok.

View File

@ -0,0 +1,67 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd module supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_sup).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0, start_child/1, start_child/2]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}).
%%%=============================================================================
%%% API
%%%=============================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
%%
%% start_child(Mod::atom(), Type::type()) -> {ok, pid()}
%% @type type() = worker | supervisor
%%
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
%%%=============================================================================
%%% Supervisor callbacks
%%%=============================================================================
init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}.

View File

@ -134,10 +134,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
emqttd_cm:register(ClientId1), emqttd_cm:register(ClientId1),
%%Starting session %%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}),
%% Force subscriptions
force_subscribe(ClientId1),
%% Start keepalive %% Start keepalive
start_keepalive(KeepAlive), start_keepalive(KeepAlive),
%% Run hooks
emqttd_broker:run_hooks(client_connected, [{self(), ClientId1}]),
{?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1,
session = Session, session = Session,
will_msg = willmsg(Var)}}; will_msg = willmsg(Var)}};
@ -298,18 +298,6 @@ send_willmsg(_ClientId, undefined) ->
send_willmsg(ClientId, WillMsg) -> send_willmsg(ClientId, WillMsg) ->
emqttd_pubsub:publish(ClientId, WillMsg). emqttd_pubsub:publish(ClientId, WillMsg).
%%TODO: will be fixed in 0.8
force_subscribe(ClientId) ->
[force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <-
proplists:get_value(forced_subscriptions, emqttd:env(mqtt, client), [])].
force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) ->
force_subscribe(ClientId, {list_to_binary(Topic), Qos});
force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) ->
Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic),
self() ! {force_subscribe, Topic1, Qos}.
start_keepalive(0) -> ignore; start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 -> start_keepalive(Sec) when Sec > 0 ->

View File

@ -39,7 +39,7 @@
-export([init/1]). -export([init/1]).
%% Helper macro for declaring children of supervisor %% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). -define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
@ -63,5 +63,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
{ok, {{one_for_all, 10, 100}, []}}. {ok, {{one_for_all, 10, 3600}, []}}.

View File

@ -67,8 +67,6 @@
]}, ]},
%% Client %% Client
{client, [ {client, [
%% Subscribe topics automatically when client connected
{forced_subscriptions, [{"$Q/client/$c", 0}]}
%TODO: Network ingoing limit %TODO: Network ingoing limit
%{ingoing_rate_limit, '64KB/s'} %{ingoing_rate_limit, '64KB/s'}
%TODO: Reconnet control %TODO: Reconnet control
@ -108,6 +106,11 @@
{ping_down_interval, 1} %seconds {ping_down_interval, 1} %seconds
]} ]}
]}, ]},
%% Modules
{modules, [
%% Subscribe topics automatically when client connected
{autosub, [{"$Q/client/$c", 0}]}
]},
%% Listeners %% Listeners
{listeners, [ {listeners, [
{mqtt, 1883, [ {mqtt, 1883, [