diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index 4de58fbd7..30faa3e2d 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -32,6 +32,7 @@ open_listeners/1, close_listeners/1, load_all_plugins/0, unload_all_plugins/0, load_plugin/1, unload_plugin/1, + load_all_mods/0, is_mod_enabled/1, loaded_plugins/0, is_running/1]). @@ -202,6 +203,17 @@ unload_app(App) -> lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} end. +load_all_mods() -> + Mods = application:get_env(emqttd, modules, []), + lists:foreach(fun({Name, Opts}) -> + Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), + Mod:load(Opts), + lager:info("load module ~s successfully", [Name]) + end, Mods). + +is_mod_enabled(Name) -> + env(modules, Name) =/= undefined. + %%------------------------------------------------------------------------------ %% @doc Is running? %% @end diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 80bbd968e..5a8daf60f 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -51,8 +51,9 @@ start(_StartType, _StartArgs) -> emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), - {ok, Listeners} = application:get_env(listeners), + emqttd:load_all_mods(), emqttd:load_all_plugins(), + {ok, Listeners} = application:get_env(listeners), emqttd:open_listeners(Listeners), register(emqttd, self()), print_vsn(), @@ -71,13 +72,14 @@ start_servers(Sup) -> {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, - {"emqttd session manager", emqttd_sm}, + {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, %{"emqttd router", emqttd_router}, {"emqttd broker", emqttd_broker}, + {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, {"emqttd system monitor", emqttd_sysmon}], diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 4a9b55ea6..ebff43718 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -42,6 +42,9 @@ %% Event API -export([subscribe/1, notify/2]). +%% Hook API +-export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]). + %% Broker API -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). @@ -127,6 +130,52 @@ datetime() -> io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). +%%------------------------------------------------------------------------------ +%% @doc Hook +%% @end +%%------------------------------------------------------------------------------ +-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. +hook(Hook, Name, MFA) -> + gen_server:call(?MODULE, {hook, Hook, Name, MFA}). + +%%------------------------------------------------------------------------------ +%% @doc Unhook +%% @end +%%------------------------------------------------------------------------------ +-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. +unhook(Hook, Name) -> + gen_server:call(?MODULE, {unhook, Hook, Name}). + +%%------------------------------------------------------------------------------ +%% @doc Foreach hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any(). +foreach_hooks(Hook, Args) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foreach(fun({_Name, {M, F, A}}) -> + apply(M, F, Args++A) + end, Hooks); + [] -> + ok + end. + +%%------------------------------------------------------------------------------ +%% @doc Foldl hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any(). +foldl_hooks(Hook, Args, Acc0) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foldl(fun({_Name, {M, F, A}}, Acc) -> + apply(M, F, [Acc, Args++A]) + end, Acc0, Hooks); + [] -> + ok + end. + %%------------------------------------------------------------------------------ %% @doc Start a tick timer %% @end @@ -163,8 +212,33 @@ init([]) -> handle_call(uptime, _From, State) -> {reply, uptime(State), State}; +handle_call({hook, Hook, Name, MFArgs}, _From, State) -> + Key = {hook, Hook}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + case lists:keyfind(Name, 1, Hooks) of + {Name, _MFArgs} -> + {error, existed}; + false -> + ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]}) + end; + [] -> + ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]}) + end, + {reply, Reply, State}; + +handle_call({unhook, Name}, _From, State) -> + Key = {hook, Name}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)}); + [] -> + {error, not_found} + end, + {reply, Reply, State}; + handle_call(_Request, _From, State) -> - {reply, error, State}. + {reply, {error, unsupport_request}, State}. handle_cast(_Msg, State) -> {noreply, State}. diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 6c5469090..433f81393 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -121,7 +121,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; -handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> +handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 51b230f7b..7758f8ec0 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Exports --export([start_link/3]). +-export([start_link/2, pool/0, table/0]). -export([lookup/1, register/1, unregister/1]). @@ -41,9 +41,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tab, statsfun}). +-record(state, {id, tab, statsfun}). --define(POOL, cm_pool). +-define(CM_POOL, cm_pool). + +-define(CLIENT_TAB, mqtt_client). %%%============================================================================= %%% API @@ -53,12 +55,15 @@ %% @doc Start client manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - TabId :: ets:tid(), StatsFun :: fun(). -start_link(Id, TabId, StatsFun) -> - gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +pool() -> ?CM_POOL. + +table() -> ?CLIENT_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup client pid with clientId @@ -66,7 +71,7 @@ start_link(Id, TabId, StatsFun) -> %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(emqttd_cm_sup:table(), ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. @@ -77,7 +82,7 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary()) -> ok. register(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:call(CmPid, {register, ClientId, self()}, infinity). %%------------------------------------------------------------------------------ @@ -86,19 +91,19 @@ register(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, TabId, StatsFun]) -> - gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{tab = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({register, ClientId, Pid}, _From, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; @@ -106,9 +111,9 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}); [] -> - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}) end, {reply, ok, setstats(State)}; @@ -116,11 +121,11 @@ handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> - case ets:lookup(TabId, ClientId) of +handle_cast({unregister, ClientId, Pid}, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef, [flush]), - ets:delete(TabId, ClientId); + ets:delete(?CLIENT_TAB, ClientId); [_] -> ignore; [] -> @@ -131,15 +136,15 @@ handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) -> - ets:match_delete(TabId, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -148,6 +153,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{tab = TabId, statsfun = StatsFun}) -> - StatsFun(ets:info(TabId, size)), State. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?CLIENT_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl index 666b87548..53a338404 100644 --- a/apps/emqttd/src/emqttd_cm_sup.erl +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -33,30 +33,26 @@ -behaviour(supervisor). %% API --export([start_link/0, table/0]). +-export([start_link/0]). %% Supervisor callbacks -export([init/1]). --define(CLIENT_TAB, mqtt_client). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -table() -> ?CLIENT_TAB. - init([]) -> - TabId = ets:new(?CLIENT_TAB, [set, named_table, public, - {write_concurrency, true}]), + ets:new(emqttd_cm:table(), [set, named_table, public, + {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), - gproc_pool:new(cm_pool, hash, [{size, Schedulers}]), + gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), Children = lists:map( fun(I) -> Name = {emqttd_cm, I}, - gproc_pool:add_worker(cm_pool, Name, I), - {Name, {emqttd_cm, start_link, [I, TabId, StatsFun]}, - permanent, 10000, worker, [emqttd_cm]} + gproc_pool:add_worker(emqttd_cm:pool(), Name, I), + {Name, {emqttd_cm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_cm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl b/apps/emqttd/src/emqttd_gen_mod.erl similarity index 77% rename from plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl rename to apps/emqttd/src/emqttd_gen_mod.erl index 3e693335c..190971d0e 100644 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl +++ b/apps/emqttd/src/emqttd_gen_mod.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -20,23 +20,30 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd demo auth module. +%%% emqttd gen_mod behaviour %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_plugin_demo_auth). +-module(emqttd_gen_mod). -author("Feng Lee "). --include_lib("emqttd/include/emqttd.hrl"). +-include("emqttd.hrl"). --behaviour(emqttd_auth_mod). +-ifdef(use_specs). --export([init/1, check/3, description/0]). +-callback load(Opts :: any()) -> {ok, State :: any()}. -init(Opts) -> {ok, Opts}. +-callback unload(State :: any()) -> any(). -check(_Client, _Password, _Opts) -> ignore. +-else. -description() -> "Demo authentication module". +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{load, 1}, {unload, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl b/apps/emqttd/src/emqttd_mod_autosub.erl similarity index 65% rename from plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl rename to apps/emqttd/src/emqttd_mod_autosub.erl index 424ee2c86..0ed1be5e1 100644 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -20,26 +20,31 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd demo acl module. +%%% emqttd auto subscribe module. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_plugin_demo_acl). + +-module(emqttd_mod_autosub). -author("Feng Lee "). --include_lib("emqttd/include/emqttd.hrl"). +-behaviour(emqttd_gen_mod). --behaviour(emqttd_acl_mod). +-export([load/1, subscribe/2, unload/1]). -%% ACL callbacks --export([init/1, check_acl/2, reload_acl/1, description/0]). +-record(state, {topics}). -init(Opts) -> {ok, Opts}. +load(Opts) -> + Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], + emqttd_broker:hook(client_connected, {?MODULE, subscribe}, + {?MODULE, subscribe, [Topics]}), + {ok, #state{topics = Topics}}. -check_acl({_Client, _PubSub, _Topic}, _State) -> ignore. +subscribe({Client, ClientId}, Topics) -> + F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, + [Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]. -reload_acl(_State) -> ok. - -description() -> "Demo ACL Module". +unload(_Opts) -> + emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl new file mode 100644 index 000000000..4131afcf6 --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -0,0 +1,122 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd rewrite module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_mod_rewrite). + +-author("Feng Lee "). + +-include_lib("emqtt/include/emqtt.hrl"). + +-behaviour(emqttd_gen_mod). + +-export([load/1, reload/1, unload/1]). + +-export([rewrite/2]). + +%%%============================================================================= +%%% API +%%%============================================================================= + +load(Opts) -> + File = proplists:get_value(file, Opts), + {ok, Terms} = file:consult(File), + Sections = compile(Terms), + emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, + {?MODULE, rewrite, [subscribe, Sections]}), + emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, + {?MODULE, rewrite_unsubscribe, [unsubscribe, Sections]}), + emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, + {?MODULE, rewrite_publish, [publish, Sections]}). + +rewrite(TopicTable, [subscribe, Sections]) -> + [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; + +rewrite(Topics, [unsubscribe, Sections]) -> + [match_topic(Topic, Sections) || Topic <- Topics]; + +rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> + %%TODO: this will not work if the client is always online. + RewriteTopic = + case get({rewrite, Topic}) of + undefined -> + DestTopic = match_topic(Topic, Sections), + put({rewrite, Topic}, DestTopic), DestTopic; + DestTopic -> + DestTopic + end, + Message#mqtt_message{topic = RewriteTopic}. + +reload(File) -> + %%TODO: The unload api is not right... + case emqttd:is_mod_enabled(rewrite) of + true -> + unload(state), + load([{file, File}]); + false -> + {error, module_unloaded} + end. + +unload(_) -> + emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}). + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +compile(Sections) -> + C = fun({rewrite, Re, Dest}) -> + {ok, MP} = re:compile(Re), + {rewrite, MP, Dest} + end, + [{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections]. + +match_topic(Topic, []) -> + Topic; +match_topic(Topic, [{topic, Filter, Rules}|Sections]) -> + case emqtt_topic:match(Topic, Filter) of + true -> + match_rule(Topic, Rules); + false -> + match_topic(Topic, Sections) + end. + +match_rule(Topic, []) -> + Topic; +match_rule(Topic, [{rewrite, MP, Dest}|Rules]) -> + case re:run(Topic, MP, [{captrue, all_but_first, list}]) of + {match, Captured} -> + %%TODO: stupid??? how to replace $1, $2? + Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), + iolist_to_binary(lists:foldl( + fun({Var, Val}, Acc) -> + re:replace(Acc, Var, Val, [global]) + end, Dest, Vars)); + nomatch -> + match_rule(Topic, Rules) + end. diff --git a/apps/emqttd/src/emqttd_mod_sup.erl b/apps/emqttd/src/emqttd_mod_sup.erl new file mode 100644 index 000000000..8fa3ced59 --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_sup.erl @@ -0,0 +1,67 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd module supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_mod_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_child/1, start_child/2]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). + +%%%============================================================================= +%%% API +%%%============================================================================= + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(ChildSpec) when is_tuple(ChildSpec) -> + supervisor:start_child(?MODULE, ChildSpec). + +%% +%% start_child(Mod::atom(), Type::type()) -> {ok, pid()} +%% @type type() = worker | supervisor +%% +start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> + supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). + +%%%============================================================================= +%%% Supervisor callbacks +%%%============================================================================= + +init([]) -> + {ok, {{one_for_one, 10, 3600}, []}}. + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index eb08b62fd..71c33bd72 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -134,10 +134,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = emqttd_cm:register(ClientId1), %%Starting session {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), - %% Force subscriptions - force_subscribe(ClientId1), %% Start keepalive start_keepalive(KeepAlive), + %% Run hooks + emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]), {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, session = Session, will_msg = willmsg(Var)}}; @@ -212,12 +212,13 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> + TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable), %%TODO: GrantedQos should be renamed. - {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) end; -handle({subscribe, Topic, Qos}, State = #proto_state{clientid = ClientId, session = Session}) -> +handle({subscribe, Topic, Qos}, State = #proto_state{session = Session}) -> {ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]), {ok, State#proto_state{session = NewSession}}; @@ -226,7 +227,8 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), + Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics), + {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1), send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession}); handle(?PACKET(?PINGREQ), State) -> @@ -298,23 +300,8 @@ send_willmsg(_ClientId, undefined) -> send_willmsg(ClientId, WillMsg) -> emqttd_pubsub:publish(ClientId, WillMsg). -%%TODO: will be fixed in 0.8 -force_subscribe(ClientId) -> - case emqttd_broker:env(forced_subscriptions) of - undefined -> - ingore; - Topics -> - [force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- Topics] - end. - -force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) -> - force_subscribe(ClientId, {list_to_binary(Topic), Qos}); - -force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) -> - Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic), - self() ! {force_subscribe, Topic1, Qos}. - start_keepalive(0) -> ignore; + start_keepalive(Sec) when Sec > 0 -> self() ! {keepalive, start, round(Sec * 1.5)}. diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index ef26cb44e..05d9e62b0 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -44,10 +44,8 @@ -behaviour(gen_server). --define(SERVER, ?MODULE). - %% API Function Exports --export([start_link/0]). +-export([start_link/2, pool/0, table/0]). -export([lookup_session/1, start_session/2, destroy_session/1]). @@ -55,16 +53,37 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tabid, statsfun}). +-record(state, {id, tabid, statsfun}). + +-define(SM_POOL, sm_pool). -define(SESSION_TAB, mqtt_session). %%%============================================================================= %%% API %%%============================================================================= --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%------------------------------------------------------------------------------ +%% @doc Start a session manager +%% @end +%%------------------------------------------------------------------------------ +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + StatsFun :: fun(). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +%%------------------------------------------------------------------------------ +%% @doc Pool name. +%% @end +%%------------------------------------------------------------------------------ +pool() -> ?SM_POOL. + +%%------------------------------------------------------------------------------ +%% @doc Table name. +%% @end +%%------------------------------------------------------------------------------ +table() -> ?SESSION_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -72,7 +91,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(?SESSION_TAB, ClientId) of + case ets:lookup(emqttd_sm_sup:table(), ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. @@ -83,7 +102,8 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. start_session(ClientId, ClientPid) -> - gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:call(SmPid, {start_session, ClientId, ClientPid}). %%------------------------------------------------------------------------------ %% @doc Destroy a session @@ -91,29 +111,27 @@ start_session(ClientId, ClientPid) -> %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> - gen_server:call(?SERVER, {destroy_session, ClientId}). + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:call(SmPid, {destroy_session, ClientId}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - process_flag(trap_exit, true), - TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), - StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), - {ok, #state{tabid = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> +handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = - case ets:lookup(Tab, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _MRef}] -> emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> case emqttd_session_sup:start_session(ClientId, ClientPid) of {ok, SessPid} -> - ets:insert(Tab, {ClientId, SessPid, - erlang:monitor(process, SessPid)}), + ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), {ok, SessPid}; {error, Error} -> {error, Error} @@ -121,12 +139,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = end, {reply, Reply, setstats(State)}; -handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({destroy_session, ClientId}, _From, State) -> + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, MRef}] -> emqttd_session:destroy(SessPid, ClientId), erlang:demonitor(MRef, [flush]), - ets:delete(Tab, ClientId); + ets:delete(?SESSION_TAB, ClientId); [] -> ignore end, @@ -145,8 +163,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Ta handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/emqttd/src/emqttd_sm_sup.erl b/apps/emqttd/src/emqttd_sm_sup.erl new file mode 100644 index 000000000..ece44dd38 --- /dev/null +++ b/apps/emqttd/src/emqttd_sm_sup.erl @@ -0,0 +1,59 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd client manager supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_sm_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +%% API +-export([start_link/0]). + +-behaviour(supervisor). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ets:new(emqttd_sm:table(), [set, named_table, public, + {write_concurrency, true}]), + Schedulers = erlang:system_info(schedulers), + gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), + Children = lists:map( + fun(I) -> + Name = {emqttd_sm, I}, + gproc_pool:add_worker(emqttd_sm:pool(), Name, I), + {Name, {emqttd_sm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_sm]} + end, lists:seq(1, Schedulers)), + {ok, {{one_for_all, 10, 100}, Children}}. + + diff --git a/apps/emqttd/src/emqttd_sup.erl b/apps/emqttd/src/emqttd_sup.erl index bb460d144..6e57818b7 100644 --- a/apps/emqttd/src/emqttd_sup.erl +++ b/apps/emqttd/src/emqttd_sup.erl @@ -39,7 +39,7 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). %%%============================================================================= %%% API @@ -63,5 +63,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> %%%============================================================================= init([]) -> - {ok, {{one_for_all, 10, 100}, []}}. + {ok, {{one_for_all, 10, 3600}, []}}. diff --git a/plugins/emqttd_auth_ldap/README.md b/plugins/emqttd_auth_ldap/README.md index aa86de0e8..083b318ac 100644 --- a/plugins/emqttd_auth_ldap/README.md +++ b/plugins/emqttd_auth_ldap/README.md @@ -1,4 +1,3 @@ - ## Overview Authentication with LDAP. @@ -16,7 +15,6 @@ Authentication with LDAP. {"certfile", "ssl.crt"}, {"keyfile", "ssl.key"}]} ]} - ``` ## Load Plugin diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src index eee513a0a..e699fdba3 100644 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src @@ -1,6 +1,6 @@ {application, emqttd_auth_ldap, [ - {description, "emqttd LDA Authentication Plugin"}, + {description, "emqttd LDAP Authentication Plugin"}, {vsn, "1.0"}, {registered, []}, {applications, [ diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl index 02fb4d121..ba1bdca4d 100644 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl @@ -30,6 +30,8 @@ -include_lib("emqttd/include/emqttd.hrl"). +-import(proplists, [get_value/2, get_value/3]). + -behaviour(emqttd_auth_mod). -export([init/1, check/3, description/0]). @@ -37,14 +39,14 @@ -record(state, {servers, user_dn, options}). init(Opts) -> - Servers = proplists:get_value(servers, Opts, ["localhost"]), - Port = proplists:get_value(port, Opts, 389), - Timeout = proplists:get_value(timeout, Opts, 30), - UserDn = proplists:get_value(user_dn, Opts), + Servers = get_value(servers, Opts, ["localhost"]), + Port = get_value(port, Opts, 389), + Timeout = get_value(timeout, Opts, 30), + UserDn = get_value(user_dn, Opts), LdapOpts = - case proplists:get_value(ssl, Opts, false) of + case get_value(ssl, Opts, false) of true -> - SslOpts = proplists:get_value(sslopts, Opts), + SslOpts = get_value(sslopts, Opts), [{port, Port}, {timeout, Timeout}, {sslopts, SslOpts}]; false -> [{port, Port}, {timeout, Timeout}] @@ -67,8 +69,6 @@ check(#mqtt_client{username = Username}, Password, {error, Reason} end. -description() -> "LDAP Authentication Module". - ldap_bind(LDAP, UserDn, Password) -> case catch eldap:simple_bind(LDAP, UserDn, Password) of ok -> @@ -87,3 +87,6 @@ fill(Username, UserDn) -> (S) -> S end, string:tokens(UserDn, ",="))). +description() -> + "LDAP Authentication Module". + diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl index 2e0060712..1cea23075 100644 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% LDAP Authentication APP. +%%% LDAP Authentication Plugin. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -56,4 +56,3 @@ stop(_State) -> init([]) -> {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/plugins/emqttd_auth_mysql/README.md b/plugins/emqttd_auth_mysql/README.md index 481a8f3e7..02db17e10 100644 --- a/plugins/emqttd_auth_mysql/README.md +++ b/plugins/emqttd_auth_mysql/README.md @@ -1,11 +1,11 @@ -## Overview + +## Overview Authentication with user table of MySQL database. ## etc/plugin.config -```erlang -[ +``` {emysql, [ {pool, 4}, {host, "localhost"}, @@ -24,7 +24,6 @@ Authentication with user table of MySQL database. {password, password} ]} ]} -]. ``` ## Users Table(Demo) diff --git a/plugins/emqttd_auth_mysql/etc/plugin.config b/plugins/emqttd_auth_mysql/etc/plugin.config index bb9a5817e..a5ef4bc41 100644 --- a/plugins/emqttd_auth_mysql/etc/plugin.config +++ b/plugins/emqttd_auth_mysql/etc/plugin.config @@ -1,18 +1,16 @@ -[ - {emysql, [ - {pool, 4}, - {host, "localhost"}, - {port, 3306}, - {username, "root"}, - {password, "public"}, - {database, "mqtt"}, - {encoding, utf8} - ]}, - {emqttd_auth_mysql, [ - {users_table, mqtt_users}, - {field_mapper, [ - {username, username}, - {password, password, plain} - ]} - ]} -]. +{emysql, [ + {pool, 4}, + {host, "localhost"}, + {port, 3306}, + {username, "root"}, + {password, "public"}, + {database, "mqtt"}, + {encoding, utf8} +]}, +{emqttd_auth_mysql, [ + {users_table, mqtt_users}, + {field_mapper, [ + {username, username}, + {password, password, plain} + ]} +]} diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src deleted file mode 100644 index ecc0b1114..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd_plugin_demo, - [ - {description, "emqttd demo plugin"}, - {vsn, "0.1"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { emqttd_plugin_demo_app, []}}, - {env, []} - ]}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl deleted file mode 100644 index b3898f7f6..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl +++ /dev/null @@ -1,21 +0,0 @@ --module(emqttd_plugin_demo_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%% =================================================================== -%% Application callbacks -%% =================================================================== - -start(_StartType, _StartArgs) -> - {ok, Sup} = emqttd_plugin_demo_sup:start_link(), - emqttd_access_control:register_mod(auth, emqttd_plugin_demo_auth, []), - emqttd_access_control:register_mod(acl, emqttd_plugin_demo_acl, []), - {ok, Sup}. - -stop(_State) -> - emqttd_access_control:unregister_mod(auth, emqttd_plugin_demo_auth), - emqttd_access_control:unregister_mod(acl, emqttd_plugin_demo_acl), - ok. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl deleted file mode 100644 index 65dad7a60..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(emqttd_plugin_demo_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index a60107c76..716cd3421 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -86,15 +86,12 @@ %% System interval of publishing broker $SYS messages {sys_interval, 60}, - %% Subscribe these topics automatically when client connected - {forced_subscriptions, [{"$Q/client/$c", 0}]}, - %% Retained messages {retained, [ %% Max number of retained messages {max_message_num, 100000}, %% Max Payload Size of retained message - {max_playload_size, 4096} + {max_playload_size, 65536} ]}, %% PubSub {pubsub, [ @@ -109,6 +106,14 @@ {ping_down_interval, 1} %seconds ]} ]}, + %% Modules + {modules, [ + %% Subscribe topics automatically when client connected + {autosub, [{"$Q/client/$c", 0}]}, + %% Rewrite rules + {rewrite, [{file, "etc/rewrite.config"}]} + + ]}, %% Listeners {listeners, [ {mqtt, 1883, [ @@ -135,8 +140,8 @@ %% Socket Access Control {access, [{allow, all}]}, %% SSL certificate and key files - {ssl, [{certfile, "etc/ssl.crt"}, - {keyfile, "etc/ssl.key"}]}, + {ssl, [{certfile, "etc/ssl/ssl.crt"}, + {keyfile, "etc/ssl/ssl.key"}]}, %% Socket Options {sockopts, [ {backlog, 1024} diff --git a/rel/files/rewrite.config b/rel/files/rewrite.config new file mode 100644 index 000000000..5006c70b9 --- /dev/null +++ b/rel/files/rewrite.config @@ -0,0 +1,14 @@ +%%%----------------------------------------------------------------------------- +%% +%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) +%% +%%%----------------------------------------------------------------------------- + +{topic, "x/#", [ + {rewrite, "^x/y/(.+)$", "z/y/$1"}, + {rewrite, "^x/(.+)$", "y/$1"} +]}. + +{topic, "y/+/z/#", [ + {rewrite, "^y/(.+)/z/(.+)$", "y/z/$2"} +]}. diff --git a/rel/reltool.config b/rel/reltool.config index 933300fd3..ac5d09e83 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -63,6 +63,7 @@ {overlay, [ {mkdir, "log/"}, {mkdir, "etc/"}, + {mkdir, "etc/ssl/"}, {mkdir, "data/"}, {mkdir, "plugins/"}, {copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"}, @@ -72,10 +73,11 @@ {template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, - {copy, "files/ssl/ssl.crt", "etc/ssl.crt"}, - {copy, "files/ssl/ssl.key", "etc/ssl.key"}, + {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, + {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, {template, "files/emqttd.config", "etc/emqttd.config"}, {template, "files/acl.config", "etc/acl.config"}, + {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, {template, "files/vm.args", "etc/vm.args"}