From ebf04c86d41d80bda884519c4e468d43ffe7e474 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 12 May 2015 22:33:18 +0800 Subject: [PATCH 01/14] rm plugin demo --- .../src/emqttd_plugin_demo.app.src | 12 ----- .../src/emqttd_plugin_demo_acl.erl | 45 ------------------- .../src/emqttd_plugin_demo_app.erl | 21 --------- .../src/emqttd_plugin_demo_auth.erl | 42 ----------------- .../src/emqttd_plugin_demo_sup.erl | 27 ----------- 5 files changed, 147 deletions(-) delete mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src delete mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl delete mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl delete mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl delete mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src deleted file mode 100644 index ecc0b1114..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd_plugin_demo, - [ - {description, "emqttd demo plugin"}, - {vsn, "0.1"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { emqttd_plugin_demo_app, []}}, - {env, []} - ]}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl deleted file mode 100644 index 424ee2c86..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_acl.erl +++ /dev/null @@ -1,45 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd demo acl module. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_plugin_demo_acl). - --author("Feng Lee "). - --include_lib("emqttd/include/emqttd.hrl"). - --behaviour(emqttd_acl_mod). - -%% ACL callbacks --export([init/1, check_acl/2, reload_acl/1, description/0]). - -init(Opts) -> {ok, Opts}. - -check_acl({_Client, _PubSub, _Topic}, _State) -> ignore. - -reload_acl(_State) -> ok. - -description() -> "Demo ACL Module". - diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl deleted file mode 100644 index b3898f7f6..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl +++ /dev/null @@ -1,21 +0,0 @@ --module(emqttd_plugin_demo_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%% =================================================================== -%% Application callbacks -%% =================================================================== - -start(_StartType, _StartArgs) -> - {ok, Sup} = emqttd_plugin_demo_sup:start_link(), - emqttd_access_control:register_mod(auth, emqttd_plugin_demo_auth, []), - emqttd_access_control:register_mod(acl, emqttd_plugin_demo_acl, []), - {ok, Sup}. - -stop(_State) -> - emqttd_access_control:unregister_mod(auth, emqttd_plugin_demo_auth), - emqttd_access_control:unregister_mod(acl, emqttd_plugin_demo_acl), - ok. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl deleted file mode 100644 index 3e693335c..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_auth.erl +++ /dev/null @@ -1,42 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd demo auth module. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_plugin_demo_auth). - --author("Feng Lee "). - --include_lib("emqttd/include/emqttd.hrl"). - --behaviour(emqttd_auth_mod). - --export([init/1, check/3, description/0]). - -init(Opts) -> {ok, Opts}. - -check(_Client, _Password, _Opts) -> ignore. - -description() -> "Demo authentication module". - diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl deleted file mode 100644 index 65dad7a60..000000000 --- a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(emqttd_plugin_demo_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - From e93f4cf07762335f35868aeef1809e01ce95685d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 12 May 2015 22:39:27 +0800 Subject: [PATCH 02/14] misc fix --- plugins/emqttd_auth_ldap/README.md | 2 -- .../src/emqttd_auth_ldap.app.src | 2 +- .../emqttd_auth_ldap/src/emqttd_auth_ldap.erl | 19 +++++++++++-------- .../src/emqttd_auth_ldap_app.erl | 3 +-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/plugins/emqttd_auth_ldap/README.md b/plugins/emqttd_auth_ldap/README.md index aa86de0e8..083b318ac 100644 --- a/plugins/emqttd_auth_ldap/README.md +++ b/plugins/emqttd_auth_ldap/README.md @@ -1,4 +1,3 @@ - ## Overview Authentication with LDAP. @@ -16,7 +15,6 @@ Authentication with LDAP. {"certfile", "ssl.crt"}, {"keyfile", "ssl.key"}]} ]} - ``` ## Load Plugin diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src index eee513a0a..e699fdba3 100644 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.app.src @@ -1,6 +1,6 @@ {application, emqttd_auth_ldap, [ - {description, "emqttd LDA Authentication Plugin"}, + {description, "emqttd LDAP Authentication Plugin"}, {vsn, "1.0"}, {registered, []}, {applications, [ diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl index 02fb4d121..ba1bdca4d 100644 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap.erl @@ -30,6 +30,8 @@ -include_lib("emqttd/include/emqttd.hrl"). +-import(proplists, [get_value/2, get_value/3]). + -behaviour(emqttd_auth_mod). -export([init/1, check/3, description/0]). @@ -37,14 +39,14 @@ -record(state, {servers, user_dn, options}). init(Opts) -> - Servers = proplists:get_value(servers, Opts, ["localhost"]), - Port = proplists:get_value(port, Opts, 389), - Timeout = proplists:get_value(timeout, Opts, 30), - UserDn = proplists:get_value(user_dn, Opts), + Servers = get_value(servers, Opts, ["localhost"]), + Port = get_value(port, Opts, 389), + Timeout = get_value(timeout, Opts, 30), + UserDn = get_value(user_dn, Opts), LdapOpts = - case proplists:get_value(ssl, Opts, false) of + case get_value(ssl, Opts, false) of true -> - SslOpts = proplists:get_value(sslopts, Opts), + SslOpts = get_value(sslopts, Opts), [{port, Port}, {timeout, Timeout}, {sslopts, SslOpts}]; false -> [{port, Port}, {timeout, Timeout}] @@ -67,8 +69,6 @@ check(#mqtt_client{username = Username}, Password, {error, Reason} end. -description() -> "LDAP Authentication Module". - ldap_bind(LDAP, UserDn, Password) -> case catch eldap:simple_bind(LDAP, UserDn, Password) of ok -> @@ -87,3 +87,6 @@ fill(Username, UserDn) -> (S) -> S end, string:tokens(UserDn, ",="))). +description() -> + "LDAP Authentication Module". + diff --git a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl index 2e0060712..1cea23075 100644 --- a/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl +++ b/plugins/emqttd_auth_ldap/src/emqttd_auth_ldap_app.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% LDAP Authentication APP. +%%% LDAP Authentication Plugin. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -56,4 +56,3 @@ stop(_State) -> init([]) -> {ok, { {one_for_one, 5, 10}, []} }. - From aaf5c6b19474c7cbbc6dc337d45319a8036fdf0c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 12 May 2015 22:41:16 +0800 Subject: [PATCH 03/14] fix plugin config --- plugins/emqttd_auth_mysql/README.md | 7 ++--- plugins/emqttd_auth_mysql/etc/plugin.config | 34 ++++++++++----------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/plugins/emqttd_auth_mysql/README.md b/plugins/emqttd_auth_mysql/README.md index 481a8f3e7..02db17e10 100644 --- a/plugins/emqttd_auth_mysql/README.md +++ b/plugins/emqttd_auth_mysql/README.md @@ -1,11 +1,11 @@ -## Overview + +## Overview Authentication with user table of MySQL database. ## etc/plugin.config -```erlang -[ +``` {emysql, [ {pool, 4}, {host, "localhost"}, @@ -24,7 +24,6 @@ Authentication with user table of MySQL database. {password, password} ]} ]} -]. ``` ## Users Table(Demo) diff --git a/plugins/emqttd_auth_mysql/etc/plugin.config b/plugins/emqttd_auth_mysql/etc/plugin.config index bb9a5817e..a5ef4bc41 100644 --- a/plugins/emqttd_auth_mysql/etc/plugin.config +++ b/plugins/emqttd_auth_mysql/etc/plugin.config @@ -1,18 +1,16 @@ -[ - {emysql, [ - {pool, 4}, - {host, "localhost"}, - {port, 3306}, - {username, "root"}, - {password, "public"}, - {database, "mqtt"}, - {encoding, utf8} - ]}, - {emqttd_auth_mysql, [ - {users_table, mqtt_users}, - {field_mapper, [ - {username, username}, - {password, password, plain} - ]} - ]} -]. +{emysql, [ + {pool, 4}, + {host, "localhost"}, + {port, 3306}, + {username, "root"}, + {password, "public"}, + {database, "mqtt"}, + {encoding, utf8} +]}, +{emqttd_auth_mysql, [ + {users_table, mqtt_users}, + {field_mapper, [ + {username, username}, + {password, password, plain} + ]} +]} From aa703dea36adfbeaadfcc6d0c601d57cf96fce09 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 19 May 2015 00:14:10 +0800 Subject: [PATCH 04/14] id --- apps/emqttd/src/emqttd_cm.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 51b230f7b..d425646e6 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -41,7 +41,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tab, statsfun}). +-record(state, {id, tab, statsfun}). -define(POOL, cm_pool). @@ -95,7 +95,7 @@ unregister(ClientId) when is_binary(ClientId) -> init([Id, TabId, StatsFun]) -> gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{tab = TabId, statsfun = StatsFun}}. + {ok, #state{id = Id, tab = TabId, statsfun = StatsFun}}. handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> case ets:lookup(Tab, ClientId) of @@ -138,8 +138,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabI handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. From e91102581193086ad5585f642b5118ec6f5e8f5d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 19 May 2015 00:14:20 +0800 Subject: [PATCH 05/14] emqttd_sm_sup --- apps/emqttd/src/emqttd_app.erl | 2 +- apps/emqttd/src/emqttd_sm.erl | 41 ++++++++++---------- apps/emqttd/src/emqttd_sm_sup.erl | 63 +++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 21 deletions(-) create mode 100644 apps/emqttd/src/emqttd_sm_sup.erl diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 80bbd968e..95018f5eb 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -71,7 +71,7 @@ start_servers(Sup) -> {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, - {"emqttd session manager", emqttd_sm}, + {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index ef26cb44e..6a1c87325 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -44,10 +44,8 @@ -behaviour(gen_server). --define(SERVER, ?MODULE). - %% API Function Exports --export([start_link/0]). +-export([start_link/3]). -export([lookup_session/1, start_session/2, destroy_session/1]). @@ -55,16 +53,19 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tabid, statsfun}). +-record(state, {id, tabid, statsfun}). --define(SESSION_TAB, mqtt_session). +-define(POOL, sm_pool). %%%============================================================================= %%% API %%%============================================================================= --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + TabId :: ets:tid(), + StatsFun :: fun(). +start_link(Id, TabId, StatsFun) -> + gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -72,7 +73,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(?SESSION_TAB, ClientId) of + case ets:lookup(emqttd_sm_sup:table(), ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. @@ -83,7 +84,8 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. start_session(ClientId, ClientPid) -> - gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). + SmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:call(SmPid, {start_session, ClientId, ClientPid}). %%------------------------------------------------------------------------------ %% @doc Destroy a session @@ -91,17 +93,16 @@ start_session(ClientId, ClientPid) -> %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> - gen_server:call(?SERVER, {destroy_session, ClientId}). + SmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:call(SmPid, {destroy_session, ClientId}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - process_flag(trap_exit, true), - TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), - StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), - {ok, #state{tabid = TabId, statsfun = StatsFun}}. +init([Id, TabId, StatsFun]) -> + gproc_pool:connect_worker(?POOL, {?MODULE, Id}), + {ok, #state{id = Id, tabid = TabId, statsfun = StatsFun}}. handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> Reply = @@ -145,8 +146,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Ta handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -155,6 +156,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(?SESSION_TAB, size)), State. +setstats(State = #state{tabid = TabId, statsfun = StatsFun}) -> + StatsFun(ets:info(TabId, size)), State. diff --git a/apps/emqttd/src/emqttd_sm_sup.erl b/apps/emqttd/src/emqttd_sm_sup.erl new file mode 100644 index 000000000..15bdf13ec --- /dev/null +++ b/apps/emqttd/src/emqttd_sm_sup.erl @@ -0,0 +1,63 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd client manager supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_sm_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +%% API +-export([start_link/0, table/0]). + +-behaviour(supervisor). + +%% Supervisor callbacks +-export([init/1]). + +-define(SESSION_TAB, mqtt_session). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +table() -> ?SESSION_TAB. + +init([]) -> + TabId = ets:new(?SESSION_TAB, [set, named_table, public, + {write_concurrency, true}]), + Schedulers = erlang:system_info(schedulers), + gproc_pool:new(sm_pool, hash, [{size, Schedulers}]), + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), + Children = lists:map( + fun(I) -> + Name = {emqttd_sm, I}, + gproc_pool:add_worker(sm_pool, Name, I), + {Name, {emqttd_sm, start_link, [I, TabId, StatsFun]}, + permanent, 10000, worker, [emqttd_sm]} + end, lists:seq(1, Schedulers)), + {ok, {{one_for_all, 10, 100}, Children}}. + + From f75c807aaff39854ec1b444320a13c05606eb09f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 22 May 2015 18:39:35 +0800 Subject: [PATCH 06/14] fix pool, table --- apps/emqttd/src/emqttd_cm.erl | 53 ++++++++++++++------------- apps/emqttd/src/emqttd_cm_sup.erl | 18 ++++------ apps/emqttd/src/emqttd_sm.erl | 59 ++++++++++++++++++++----------- apps/emqttd/src/emqttd_sm_sup.erl | 18 ++++------ 4 files changed, 81 insertions(+), 67 deletions(-) diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index d425646e6..7758f8ec0 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Exports --export([start_link/3]). +-export([start_link/2, pool/0, table/0]). -export([lookup/1, register/1, unregister/1]). @@ -43,7 +43,9 @@ -record(state, {id, tab, statsfun}). --define(POOL, cm_pool). +-define(CM_POOL, cm_pool). + +-define(CLIENT_TAB, mqtt_client). %%%============================================================================= %%% API @@ -53,12 +55,15 @@ %% @doc Start client manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - TabId :: ets:tid(), StatsFun :: fun(). -start_link(Id, TabId, StatsFun) -> - gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +pool() -> ?CM_POOL. + +table() -> ?CLIENT_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup client pid with clientId @@ -66,7 +71,7 @@ start_link(Id, TabId, StatsFun) -> %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(emqttd_cm_sup:table(), ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. @@ -77,7 +82,7 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary()) -> ok. register(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:call(CmPid, {register, ClientId, self()}, infinity). %%------------------------------------------------------------------------------ @@ -86,19 +91,19 @@ register(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, TabId, StatsFun]) -> - gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{id = Id, tab = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({register, ClientId, Pid}, _From, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; @@ -106,9 +111,9 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}); [] -> - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}) end, {reply, ok, setstats(State)}; @@ -116,11 +121,11 @@ handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> - case ets:lookup(TabId, ClientId) of +handle_cast({unregister, ClientId, Pid}, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef, [flush]), - ets:delete(TabId, ClientId); + ets:delete(?CLIENT_TAB, ClientId); [_] -> ignore; [] -> @@ -131,15 +136,15 @@ handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) -> - ets:match_delete(TabId, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. + gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -148,6 +153,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{tab = TabId, statsfun = StatsFun}) -> - StatsFun(ets:info(TabId, size)), State. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?CLIENT_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl index 666b87548..53a338404 100644 --- a/apps/emqttd/src/emqttd_cm_sup.erl +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -33,30 +33,26 @@ -behaviour(supervisor). %% API --export([start_link/0, table/0]). +-export([start_link/0]). %% Supervisor callbacks -export([init/1]). --define(CLIENT_TAB, mqtt_client). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -table() -> ?CLIENT_TAB. - init([]) -> - TabId = ets:new(?CLIENT_TAB, [set, named_table, public, - {write_concurrency, true}]), + ets:new(emqttd_cm:table(), [set, named_table, public, + {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), - gproc_pool:new(cm_pool, hash, [{size, Schedulers}]), + gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), Children = lists:map( fun(I) -> Name = {emqttd_cm, I}, - gproc_pool:add_worker(cm_pool, Name, I), - {Name, {emqttd_cm, start_link, [I, TabId, StatsFun]}, - permanent, 10000, worker, [emqttd_cm]} + gproc_pool:add_worker(emqttd_cm:pool(), Name, I), + {Name, {emqttd_cm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_cm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index 6a1c87325..05d9e62b0 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -45,7 +45,7 @@ -behaviour(gen_server). %% API Function Exports --export([start_link/3]). +-export([start_link/2, pool/0, table/0]). -export([lookup_session/1, start_session/2, destroy_session/1]). @@ -55,17 +55,35 @@ -record(state, {id, tabid, statsfun}). --define(POOL, sm_pool). +-define(SM_POOL, sm_pool). + +-define(SESSION_TAB, mqtt_session). %%%============================================================================= %%% API %%%============================================================================= --spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + +%%------------------------------------------------------------------------------ +%% @doc Start a session manager +%% @end +%%------------------------------------------------------------------------------ +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - TabId :: ets:tid(), StatsFun :: fun(). -start_link(Id, TabId, StatsFun) -> - gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +%%------------------------------------------------------------------------------ +%% @doc Pool name. +%% @end +%%------------------------------------------------------------------------------ +pool() -> ?SM_POOL. + +%%------------------------------------------------------------------------------ +%% @doc Table name. +%% @end +%%------------------------------------------------------------------------------ +table() -> ?SESSION_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -84,7 +102,7 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. start_session(ClientId, ClientPid) -> - SmPid = gproc_pool:pick_worker(?POOL, ClientId), + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), gen_server:call(SmPid, {start_session, ClientId, ClientPid}). %%------------------------------------------------------------------------------ @@ -93,28 +111,27 @@ start_session(ClientId, ClientPid) -> %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> - SmPid = gproc_pool:pick_worker(?POOL, ClientId), + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), gen_server:call(SmPid, {destroy_session, ClientId}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, TabId, StatsFun]) -> - gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{id = Id, tabid = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> +handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = - case ets:lookup(Tab, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _MRef}] -> emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> case emqttd_session_sup:start_session(ClientId, ClientPid) of {ok, SessPid} -> - ets:insert(Tab, {ClientId, SessPid, - erlang:monitor(process, SessPid)}), + ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), {ok, SessPid}; {error, Error} -> {error, Error} @@ -122,12 +139,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = end, {reply, Reply, setstats(State)}; -handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({destroy_session, ClientId}, _From, State) -> + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, MRef}] -> emqttd_session:destroy(SessPid, ClientId), erlang:demonitor(MRef, [flush]), - ets:delete(Tab, ClientId); + ets:delete(?SESSION_TAB, ClientId); [] -> ignore end, @@ -147,7 +164,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. + gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -156,6 +173,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{tabid = TabId, statsfun = StatsFun}) -> - StatsFun(ets:info(TabId, size)), State. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?SESSION_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_sm_sup.erl b/apps/emqttd/src/emqttd_sm_sup.erl index 15bdf13ec..ece44dd38 100644 --- a/apps/emqttd/src/emqttd_sm_sup.erl +++ b/apps/emqttd/src/emqttd_sm_sup.erl @@ -31,32 +31,28 @@ -include("emqttd.hrl"). %% API --export([start_link/0, table/0]). +-export([start_link/0]). -behaviour(supervisor). %% Supervisor callbacks -export([init/1]). --define(SESSION_TAB, mqtt_session). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -table() -> ?SESSION_TAB. - init([]) -> - TabId = ets:new(?SESSION_TAB, [set, named_table, public, - {write_concurrency, true}]), + ets:new(emqttd_sm:table(), [set, named_table, public, + {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), - gproc_pool:new(sm_pool, hash, [{size, Schedulers}]), + gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Children = lists:map( fun(I) -> Name = {emqttd_sm, I}, - gproc_pool:add_worker(sm_pool, Name, I), - {Name, {emqttd_sm, start_link, [I, TabId, StatsFun]}, - permanent, 10000, worker, [emqttd_sm]} + gproc_pool:add_worker(emqttd_sm:pool(), Name, I), + {Name, {emqttd_sm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_sm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. From 05297e49cfc7495b1d7a1c8cfc3ec7860f0de3e2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 22 May 2015 20:46:41 +0800 Subject: [PATCH 07/14] max_payload_size --- rel/files/emqttd.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index a60107c76..9ff09a150 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -94,7 +94,7 @@ %% Max number of retained messages {max_message_num, 100000}, %% Max Payload Size of retained message - {max_playload_size, 4096} + {max_playload_size, 65536} ]}, %% PubSub {pubsub, [ From c865fd2b6ed3fcff19b06f5e6f691e0746002115 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 23 May 2015 00:20:23 +0800 Subject: [PATCH 08/14] mv 'forced_subscriptions' option to {mqtt, {client, ... --- rel/files/emqttd.config | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 9ff09a150..2c84c02ee 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -67,6 +67,8 @@ ]}, %% Client {client, [ + %% Subscribe topics automatically when client connected + {forced_subscriptions, [{"$Q/client/$c", 0}]} %TODO: Network ingoing limit %{ingoing_rate_limit, '64KB/s'} %TODO: Reconnet control @@ -86,9 +88,6 @@ %% System interval of publishing broker $SYS messages {sys_interval, 60}, - %% Subscribe these topics automatically when client connected - {forced_subscriptions, [{"$Q/client/$c", 0}]}, - %% Retained messages {retained, [ %% Max number of retained messages From 01bfb830f5ef095d0d3f7e8582c15dd593a1a0a8 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 23 May 2015 00:20:58 +0800 Subject: [PATCH 09/14] forced subscriptions --- apps/emqttd/src/emqttd_protocol.erl | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index eb08b62fd..03845c482 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -217,7 +217,7 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) end; -handle({subscribe, Topic, Qos}, State = #proto_state{clientid = ClientId, session = Session}) -> +handle({subscribe, Topic, Qos}, State = #proto_state{session = Session}) -> {ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]), {ok, State#proto_state{session = NewSession}}; @@ -300,12 +300,8 @@ send_willmsg(ClientId, WillMsg) -> %%TODO: will be fixed in 0.8 force_subscribe(ClientId) -> - case emqttd_broker:env(forced_subscriptions) of - undefined -> - ingore; - Topics -> - [force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- Topics] - end. + [force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- + proplists:get_value(forced_subscriptions, emqttd:env(mqtt, client), [])]. force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) -> force_subscribe(ClientId, {list_to_binary(Topic), Qos}); @@ -315,6 +311,7 @@ force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) -> self() ! {force_subscribe, Topic1, Qos}. start_keepalive(0) -> ignore; + start_keepalive(Sec) when Sec > 0 -> self() ! {keepalive, start, round(Sec * 1.5)}. From 74024acd011db5ea4420ae8f3afe7db36be9e629 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 24 May 2015 15:28:56 +0800 Subject: [PATCH 10/14] broker hooks --- apps/emqttd/src/emqttd.erl | 9 ++++ apps/emqttd/src/emqttd_app.erl | 4 +- apps/emqttd/src/emqttd_broker.erl | 53 ++++++++++++++++++++ apps/emqttd/src/emqttd_client.erl | 2 +- apps/emqttd/src/emqttd_gen_mod.erl | 49 +++++++++++++++++++ apps/emqttd/src/emqttd_mod_autosub.erl | 50 +++++++++++++++++++ apps/emqttd/src/emqttd_mod_rewrite.erl | 48 ++++++++++++++++++ apps/emqttd/src/emqttd_mod_sup.erl | 67 ++++++++++++++++++++++++++ apps/emqttd/src/emqttd_protocol.erl | 16 +----- apps/emqttd/src/emqttd_sup.erl | 4 +- rel/files/emqttd.config | 7 ++- 11 files changed, 289 insertions(+), 20 deletions(-) create mode 100644 apps/emqttd/src/emqttd_gen_mod.erl create mode 100644 apps/emqttd/src/emqttd_mod_autosub.erl create mode 100644 apps/emqttd/src/emqttd_mod_rewrite.erl create mode 100644 apps/emqttd/src/emqttd_mod_sup.erl diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index 4de58fbd7..cdbe0857c 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -32,6 +32,7 @@ open_listeners/1, close_listeners/1, load_all_plugins/0, unload_all_plugins/0, load_plugin/1, unload_plugin/1, + load_all_mods/0, loaded_plugins/0, is_running/1]). @@ -202,6 +203,14 @@ unload_app(App) -> lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} end. +load_all_mods() -> + Mods = application:get_env(emqttd, modules, []), + lists:foreach(fun({Name, Opts}) -> + Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), + Mod:load(Opts), + lager:info("load module ~s successfully", [Name]) + end, Mods). + %%------------------------------------------------------------------------------ %% @doc Is running? %% @end diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 95018f5eb..5a8daf60f 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -51,8 +51,9 @@ start(_StartType, _StartArgs) -> emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), - {ok, Listeners} = application:get_env(listeners), + emqttd:load_all_mods(), emqttd:load_all_plugins(), + {ok, Listeners} = application:get_env(listeners), emqttd:open_listeners(Listeners), register(emqttd, self()), print_vsn(), @@ -78,6 +79,7 @@ start_servers(Sup) -> {"emqttd metrics", emqttd_metrics}, %{"emqttd router", emqttd_router}, {"emqttd broker", emqttd_broker}, + {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, {"emqttd system monitor", emqttd_sysmon}], diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 4a9b55ea6..a2f1c6ba1 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -42,6 +42,9 @@ %% Event API -export([subscribe/1, notify/2]). +%% Hook API +-export([hook/2, unhook/2, run_hooks/2]). + %% Broker API -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). @@ -127,6 +130,22 @@ datetime() -> io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). +hook(Name, MFArgs) -> + gen_server:call(?MODULE, {hook, Name, MFArgs}). + +unhook(Name, MF) -> + gen_server:call(?MODULE, {unhook, Name, MF}). + +run_hooks(Name, Args) -> + case ets:lookup(?BROKER_TAB, {hook, Name}) of + [{_, Hooks}] -> + lists:foreach(fun({M, F, A}) -> + apply(M, F, Args++A) + end, Hooks); + [] -> + ok + end. + %%------------------------------------------------------------------------------ %% @doc Start a tick timer %% @end @@ -163,6 +182,31 @@ init([]) -> handle_call(uptime, _From, State) -> {reply, uptime(State), State}; +handle_call({hook, Name, MFArgs}, _From, State) -> + Key = {hook, Name}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + case lists:member(MFArgs, Hooks) of + true -> + {error, existed}; + false -> + ets:insert(?BROKER_TAB, {Key, Hooks ++ [MFArgs]}) + end; + [] -> + ets:insert(?BROKER_TAB, {Key, [MFArgs]}) + end, + {reply, Reply, State}; + +handle_call({unhook, Name, MFArgs}, _From, State) -> + Key = {hook, Name}, Reply = + case ets:lookup(?BROKER_TAB, Key) of + [{Key, Hooks}] -> + ets:insert(?BROKER_TAB, {Key, remove_hook(MFArgs, Hooks, [])}); + [] -> + {error, not_found} + end, + {reply, Reply, State}; + handle_call(_Request, _From, State) -> {reply, error, State}. @@ -189,6 +233,15 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +remove_hook(_Hook, [], Acc) -> + lists:reverse(Acc); +remove_hook(Hook, [Hook|Hooks], Acc) -> + remove_hook(Hook, Hooks, Acc); +remove_hook(Hook = {M,F}, [{M,F,_A}|Hooks], Acc) -> + remove_hook(Hook, Hooks, Acc); +remove_hook(Hook, [H|Hooks], Acc) -> + remove_hook(Hook, Hooks, [H|Acc]). + create_topic(Topic) -> emqttd_pubsub:create(emqtt_topic:systop(Topic)). diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 6c5469090..433f81393 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -121,7 +121,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; -handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> +handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; diff --git a/apps/emqttd/src/emqttd_gen_mod.erl b/apps/emqttd/src/emqttd_gen_mod.erl new file mode 100644 index 000000000..190971d0e --- /dev/null +++ b/apps/emqttd/src/emqttd_gen_mod.erl @@ -0,0 +1,49 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd gen_mod behaviour +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_gen_mod). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-ifdef(use_specs). + +-callback load(Opts :: any()) -> {ok, State :: any()}. + +-callback unload(State :: any()) -> any(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{load, 1}, {unload, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. + diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl new file mode 100644 index 000000000..039066c3e --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -0,0 +1,50 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd auto subscribe module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_mod_autosub). + +-author("Feng Lee "). + +-behaviour(emqttd_gen_mod). + +-export([load/1, subscribe/2, unload/1]). + +-record(state, {topics}). + +load(Opts) -> + Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], + emqttd_broker:hook(client_connected, {?MODULE, subscribe, [Topics]}), + {ok, #state{topics = Topics}}. + +subscribe({Client, ClientId}, Topics) -> + F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, + [Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]. + +unload(_Opts) -> + emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). + + diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl new file mode 100644 index 000000000..fa499443c --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -0,0 +1,48 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd rewrite module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_mod_rewrite). + +-author("Feng Lee "). + +-behaviour(emqttd_gen_mod). + +-export([load/1, rewrite/1, unload/1]). + +load(Opts) -> + ok. + +rewrite(Topic) -> + Topic. + +reload(Opts) -> + ok. + +unload(_Opts) -> + ok. + + diff --git a/apps/emqttd/src/emqttd_mod_sup.erl b/apps/emqttd/src/emqttd_mod_sup.erl new file mode 100644 index 000000000..8fa3ced59 --- /dev/null +++ b/apps/emqttd/src/emqttd_mod_sup.erl @@ -0,0 +1,67 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd module supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_mod_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_child/1, start_child/2]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). + +%%%============================================================================= +%%% API +%%%============================================================================= + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(ChildSpec) when is_tuple(ChildSpec) -> + supervisor:start_child(?MODULE, ChildSpec). + +%% +%% start_child(Mod::atom(), Type::type()) -> {ok, pid()} +%% @type type() = worker | supervisor +%% +start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> + supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). + +%%%============================================================================= +%%% Supervisor callbacks +%%%============================================================================= + +init([]) -> + {ok, {{one_for_one, 10, 3600}, []}}. + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 03845c482..54b4cb1b7 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -134,10 +134,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = emqttd_cm:register(ClientId1), %%Starting session {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), - %% Force subscriptions - force_subscribe(ClientId1), %% Start keepalive start_keepalive(KeepAlive), + %% Run hooks + emqttd_broker:run_hooks(client_connected, [{self(), ClientId1}]), {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, session = Session, will_msg = willmsg(Var)}}; @@ -298,18 +298,6 @@ send_willmsg(_ClientId, undefined) -> send_willmsg(ClientId, WillMsg) -> emqttd_pubsub:publish(ClientId, WillMsg). -%%TODO: will be fixed in 0.8 -force_subscribe(ClientId) -> - [force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- - proplists:get_value(forced_subscriptions, emqttd:env(mqtt, client), [])]. - -force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) -> - force_subscribe(ClientId, {list_to_binary(Topic), Qos}); - -force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) -> - Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic), - self() ! {force_subscribe, Topic1, Qos}. - start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> diff --git a/apps/emqttd/src/emqttd_sup.erl b/apps/emqttd/src/emqttd_sup.erl index bb460d144..6e57818b7 100644 --- a/apps/emqttd/src/emqttd_sup.erl +++ b/apps/emqttd/src/emqttd_sup.erl @@ -39,7 +39,7 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). %%%============================================================================= %%% API @@ -63,5 +63,5 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> %%%============================================================================= init([]) -> - {ok, {{one_for_all, 10, 100}, []}}. + {ok, {{one_for_all, 10, 3600}, []}}. diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 2c84c02ee..68c22400c 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -67,8 +67,6 @@ ]}, %% Client {client, [ - %% Subscribe topics automatically when client connected - {forced_subscriptions, [{"$Q/client/$c", 0}]} %TODO: Network ingoing limit %{ingoing_rate_limit, '64KB/s'} %TODO: Reconnet control @@ -108,6 +106,11 @@ {ping_down_interval, 1} %seconds ]} ]}, + %% Modules + {modules, [ + %% Subscribe topics automatically when client connected + {autosub, [{"$Q/client/$c", 0}]} + ]}, %% Listeners {listeners, [ {mqtt, 1883, [ From d19805b68cec8d71d62e24462ad0dbf8c04688af Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 24 May 2015 18:33:53 +0800 Subject: [PATCH 11/14] mod rewrite --- apps/emqttd/src/emqttd_broker.erl | 75 ++++++++++++++++---------- apps/emqttd/src/emqttd_mod_autosub.erl | 4 +- apps/emqttd/src/emqttd_mod_rewrite.erl | 50 ++++++++++++++--- apps/emqttd/src/emqttd_protocol.erl | 6 ++- rel/files/emqttd.config | 13 +++-- rel/reltool.config | 11 ++-- 6 files changed, 111 insertions(+), 48 deletions(-) diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index a2f1c6ba1..ebff43718 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -43,7 +43,7 @@ -export([subscribe/1, notify/2]). %% Hook API --export([hook/2, unhook/2, run_hooks/2]). +-export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]). %% Broker API -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). @@ -130,18 +130,48 @@ datetime() -> io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). -hook(Name, MFArgs) -> - gen_server:call(?MODULE, {hook, Name, MFArgs}). +%%------------------------------------------------------------------------------ +%% @doc Hook +%% @end +%%------------------------------------------------------------------------------ +-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. +hook(Hook, Name, MFA) -> + gen_server:call(?MODULE, {hook, Hook, Name, MFA}). -unhook(Name, MF) -> - gen_server:call(?MODULE, {unhook, Name, MF}). +%%------------------------------------------------------------------------------ +%% @doc Unhook +%% @end +%%------------------------------------------------------------------------------ +-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. +unhook(Hook, Name) -> + gen_server:call(?MODULE, {unhook, Hook, Name}). -run_hooks(Name, Args) -> - case ets:lookup(?BROKER_TAB, {hook, Name}) of - [{_, Hooks}] -> - lists:foreach(fun({M, F, A}) -> +%%------------------------------------------------------------------------------ +%% @doc Foreach hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any(). +foreach_hooks(Hook, Args) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foreach(fun({_Name, {M, F, A}}) -> apply(M, F, Args++A) end, Hooks); + [] -> + ok + end. + +%%------------------------------------------------------------------------------ +%% @doc Foldl hooks +%% @end +%%------------------------------------------------------------------------------ +-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any(). +foldl_hooks(Hook, Args, Acc0) -> + case ets:lookup(?BROKER_TAB, {hook, Hook}) of + [{_, Hooks}] -> + lists:foldl(fun({_Name, {M, F, A}}, Acc) -> + apply(M, F, [Acc, Args++A]) + end, Acc0, Hooks); [] -> ok end. @@ -182,33 +212,33 @@ init([]) -> handle_call(uptime, _From, State) -> {reply, uptime(State), State}; -handle_call({hook, Name, MFArgs}, _From, State) -> - Key = {hook, Name}, Reply = +handle_call({hook, Hook, Name, MFArgs}, _From, State) -> + Key = {hook, Hook}, Reply = case ets:lookup(?BROKER_TAB, Key) of [{Key, Hooks}] -> - case lists:member(MFArgs, Hooks) of - true -> + case lists:keyfind(Name, 1, Hooks) of + {Name, _MFArgs} -> {error, existed}; false -> - ets:insert(?BROKER_TAB, {Key, Hooks ++ [MFArgs]}) + ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]}) end; [] -> - ets:insert(?BROKER_TAB, {Key, [MFArgs]}) + ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]}) end, {reply, Reply, State}; -handle_call({unhook, Name, MFArgs}, _From, State) -> +handle_call({unhook, Name}, _From, State) -> Key = {hook, Name}, Reply = case ets:lookup(?BROKER_TAB, Key) of [{Key, Hooks}] -> - ets:insert(?BROKER_TAB, {Key, remove_hook(MFArgs, Hooks, [])}); + ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)}); [] -> {error, not_found} end, {reply, Reply, State}; handle_call(_Request, _From, State) -> - {reply, error, State}. + {reply, {error, unsupport_request}, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -233,15 +263,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -remove_hook(_Hook, [], Acc) -> - lists:reverse(Acc); -remove_hook(Hook, [Hook|Hooks], Acc) -> - remove_hook(Hook, Hooks, Acc); -remove_hook(Hook = {M,F}, [{M,F,_A}|Hooks], Acc) -> - remove_hook(Hook, Hooks, Acc); -remove_hook(Hook, [H|Hooks], Acc) -> - remove_hook(Hook, Hooks, [H|Acc]). - create_topic(Topic) -> emqttd_pubsub:create(emqtt_topic:systop(Topic)). diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl index 039066c3e..0ed1be5e1 100644 --- a/apps/emqttd/src/emqttd_mod_autosub.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -37,7 +37,8 @@ load(Opts) -> Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], - emqttd_broker:hook(client_connected, {?MODULE, subscribe, [Topics]}), + emqttd_broker:hook(client_connected, {?MODULE, subscribe}, + {?MODULE, subscribe, [Topics]}), {ok, #state{topics = Topics}}. subscribe({Client, ClientId}, Topics) -> @@ -47,4 +48,3 @@ subscribe({Client, ClientId}, Topics) -> unload(_Opts) -> emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). - diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl index fa499443c..bc7e4aa73 100644 --- a/apps/emqttd/src/emqttd_mod_rewrite.erl +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -31,18 +31,52 @@ -behaviour(emqttd_gen_mod). --export([load/1, rewrite/1, unload/1]). +-export([load/1, reload/1, unload/1]). + +-export([rewrite/2]). + +%%%============================================================================= +%%% API +%%%============================================================================= load(Opts) -> - ok. + File = proplists:get_value(file, Opts), + Sections = compile(file:consult(File)), + emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, + {?MODULE, rewrite, [subscribe, Sections]}), + emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, + {?MODULE, rewrite_unsubscribe, [unsubscribe, Sections]}), + emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, + {?MODULE, rewrite_publish, [publish, Sections]}). -rewrite(Topic) -> - Topic. +rewrite(TopicTable, [subscribe, _Sections]) -> + lager:info("Rewrite Subscribe: ~p", [TopicTable]), + TopicTable; -reload(Opts) -> - ok. +rewrite(Topics, [unsubscribe, _Sections]) -> + lager:info("Rewrite Unsubscribe: ~p", [Topics]), + Topics; + +rewrite(Message, [publish, _Sections]) -> + Message. + +reload(File) -> + %%TODO: The unload api is not right... + unload(state), load([{file, File}]). -unload(_Opts) -> - ok. +unload(_) -> + emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}). +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +compile(Sections) -> + C = fun({rewrite, Re, Dest}) -> + {ok, MP} = re:compile(Re), + {rewrite, MP, Dest} + end, + [{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections]. diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 54b4cb1b7..0c9bf62a5 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -212,8 +212,9 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> + TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable), %%TODO: GrantedQos should be renamed. - {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) end; @@ -226,7 +227,8 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), + Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics), + {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1), send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession}); handle(?PACKET(?PINGREQ), State) -> diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 68c22400c..f2ba8109d 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -46,14 +46,14 @@ %% Authentication with username, password %{username, []}, %% Authentication with clientid - %{clientid, [{password, no}, {file, "etc/clients.config"}]}, + %{clientid, [{password, no}, {file, "etc/conf/clients.config"}]}, %% Allow all {anonymous, []} ]}, %% ACL config {acl, [ %% Internal ACL module - {internal, [{file, "etc/acl.config"}, {nomatch, allow}]} + {internal, [{file, "etc/conf/acl.config"}, {nomatch, allow}]} ]} ]}, %% MQTT Protocol Options @@ -109,7 +109,10 @@ %% Modules {modules, [ %% Subscribe topics automatically when client connected - {autosub, [{"$Q/client/$c", 0}]} + {autosub, [{"$Q/client/$c", 0}]}, + %% Rewrite rules + {rewrite, [{file, "etc/conf/rewrite.config"}]} + ]}, %% Listeners {listeners, [ @@ -137,8 +140,8 @@ %% Socket Access Control {access, [{allow, all}]}, %% SSL certificate and key files - {ssl, [{certfile, "etc/ssl.crt"}, - {keyfile, "etc/ssl.key"}]}, + {ssl, [{certfile, "etc/ssl/ssl.crt"}, + {keyfile, "etc/ssl/ssl.key"}]}, %% Socket Options {sockopts, [ {backlog, 1024} diff --git a/rel/reltool.config b/rel/reltool.config index 933300fd3..6e122a77d 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -63,6 +63,8 @@ {overlay, [ {mkdir, "log/"}, {mkdir, "etc/"}, + {mkdir, "etc/ssl/"}, + {mkdir, "etc/conf/"}, {mkdir, "data/"}, {mkdir, "plugins/"}, {copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"}, @@ -72,11 +74,12 @@ {template, "files/emqttd.cmd", "bin/emqttd.cmd"}, {copy, "files/start_erl.cmd", "bin/start_erl.cmd"}, {copy, "files/install_upgrade.escript", "bin/install_upgrade.escript"}, - {copy, "files/ssl/ssl.crt", "etc/ssl.crt"}, - {copy, "files/ssl/ssl.key", "etc/ssl.key"}, + {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, + {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, {template, "files/emqttd.config", "etc/emqttd.config"}, - {template, "files/acl.config", "etc/acl.config"}, - {template, "files/clients.config", "etc/clients.config"}, + {template, "files/acl.config", "etc/conf/acl.config"}, + {template, "files/rewrite.config", "etc/conf/rewrite.config"}, + {template, "files/clients.config", "etc/conf/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, {template, "files/vm.args", "etc/vm.args"} ]}. From ced7acbf1ca65c50d4d8c6d022a713fdb10effd9 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 24 May 2015 18:34:54 +0800 Subject: [PATCH 12/14] rewrite --- rel/files/rewrite.config | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 rel/files/rewrite.config diff --git a/rel/files/rewrite.config b/rel/files/rewrite.config new file mode 100644 index 000000000..5006c70b9 --- /dev/null +++ b/rel/files/rewrite.config @@ -0,0 +1,14 @@ +%%%----------------------------------------------------------------------------- +%% +%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) +%% +%%%----------------------------------------------------------------------------- + +{topic, "x/#", [ + {rewrite, "^x/y/(.+)$", "z/y/$1"}, + {rewrite, "^x/(.+)$", "y/$1"} +]}. + +{topic, "y/+/z/#", [ + {rewrite, "^y/(.+)/z/(.+)$", "y/z/$2"} +]}. From c2d4a60decfc4ff2894c3792f32683e969e32f9a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 24 May 2015 21:54:36 +0800 Subject: [PATCH 13/14] is_mod_enabled --- apps/emqttd/src/emqttd.erl | 5 ++++- apps/emqttd/src/emqttd_mod_rewrite.erl | 3 ++- apps/emqttd/src/emqttd_protocol.erl | 2 +- rel/files/emqttd.config | 6 +++--- rel/reltool.config | 7 +++---- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index cdbe0857c..30faa3e2d 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -32,7 +32,7 @@ open_listeners/1, close_listeners/1, load_all_plugins/0, unload_all_plugins/0, load_plugin/1, unload_plugin/1, - load_all_mods/0, + load_all_mods/0, is_mod_enabled/1, loaded_plugins/0, is_running/1]). @@ -211,6 +211,9 @@ load_all_mods() -> lager:info("load module ~s successfully", [Name]) end, Mods). +is_mod_enabled(Name) -> + env(modules, Name) =/= undefined. + %%------------------------------------------------------------------------------ %% @doc Is running? %% @end diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl index bc7e4aa73..bba664af1 100644 --- a/apps/emqttd/src/emqttd_mod_rewrite.erl +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -41,7 +41,8 @@ load(Opts) -> File = proplists:get_value(file, Opts), - Sections = compile(file:consult(File)), + {ok, Terms} = file:consult(File), + Sections = compile(Terms), emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, {?MODULE, rewrite, [subscribe, Sections]}), emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 0c9bf62a5..71c33bd72 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -137,7 +137,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = %% Start keepalive start_keepalive(KeepAlive), %% Run hooks - emqttd_broker:run_hooks(client_connected, [{self(), ClientId1}]), + emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]), {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, session = Session, will_msg = willmsg(Var)}}; diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index f2ba8109d..716cd3421 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -46,14 +46,14 @@ %% Authentication with username, password %{username, []}, %% Authentication with clientid - %{clientid, [{password, no}, {file, "etc/conf/clients.config"}]}, + %{clientid, [{password, no}, {file, "etc/clients.config"}]}, %% Allow all {anonymous, []} ]}, %% ACL config {acl, [ %% Internal ACL module - {internal, [{file, "etc/conf/acl.config"}, {nomatch, allow}]} + {internal, [{file, "etc/acl.config"}, {nomatch, allow}]} ]} ]}, %% MQTT Protocol Options @@ -111,7 +111,7 @@ %% Subscribe topics automatically when client connected {autosub, [{"$Q/client/$c", 0}]}, %% Rewrite rules - {rewrite, [{file, "etc/conf/rewrite.config"}]} + {rewrite, [{file, "etc/rewrite.config"}]} ]}, %% Listeners diff --git a/rel/reltool.config b/rel/reltool.config index 6e122a77d..ac5d09e83 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -64,7 +64,6 @@ {mkdir, "log/"}, {mkdir, "etc/"}, {mkdir, "etc/ssl/"}, - {mkdir, "etc/conf/"}, {mkdir, "data/"}, {mkdir, "plugins/"}, {copy, "files/erl", "\{\{erts_vsn\}\}/bin/erl"}, @@ -77,9 +76,9 @@ {copy, "files/ssl/ssl.crt", "etc/ssl/ssl.crt"}, {copy, "files/ssl/ssl.key", "etc/ssl/ssl.key"}, {template, "files/emqttd.config", "etc/emqttd.config"}, - {template, "files/acl.config", "etc/conf/acl.config"}, - {template, "files/rewrite.config", "etc/conf/rewrite.config"}, - {template, "files/clients.config", "etc/conf/clients.config"}, + {template, "files/acl.config", "etc/acl.config"}, + {template, "files/rewrite.config", "etc/rewrite.config"}, + {template, "files/clients.config", "etc/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, {template, "files/vm.args", "etc/vm.args"} ]}. From 46545be9d027ac321dcdb1720e41c18c3a8be41a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 24 May 2015 21:54:57 +0800 Subject: [PATCH 14/14] rewrite --- apps/emqttd/src/emqttd_mod_rewrite.erl | 57 ++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl index bba664af1..4131afcf6 100644 --- a/apps/emqttd/src/emqttd_mod_rewrite.erl +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -29,6 +29,8 @@ -author("Feng Lee "). +-include_lib("emqtt/include/emqtt.hrl"). + -behaviour(emqttd_gen_mod). -export([load/1, reload/1, unload/1]). @@ -50,20 +52,33 @@ load(Opts) -> emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, {?MODULE, rewrite_publish, [publish, Sections]}). -rewrite(TopicTable, [subscribe, _Sections]) -> - lager:info("Rewrite Subscribe: ~p", [TopicTable]), - TopicTable; +rewrite(TopicTable, [subscribe, Sections]) -> + [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; -rewrite(Topics, [unsubscribe, _Sections]) -> - lager:info("Rewrite Unsubscribe: ~p", [Topics]), - Topics; +rewrite(Topics, [unsubscribe, Sections]) -> + [match_topic(Topic, Sections) || Topic <- Topics]; -rewrite(Message, [publish, _Sections]) -> - Message. +rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> + %%TODO: this will not work if the client is always online. + RewriteTopic = + case get({rewrite, Topic}) of + undefined -> + DestTopic = match_topic(Topic, Sections), + put({rewrite, Topic}, DestTopic), DestTopic; + DestTopic -> + DestTopic + end, + Message#mqtt_message{topic = RewriteTopic}. reload(File) -> %%TODO: The unload api is not right... - unload(state), load([{file, File}]). + case emqttd:is_mod_enabled(rewrite) of + true -> + unload(state), + load([{file, File}]); + false -> + {error, module_unloaded} + end. unload(_) -> emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), @@ -81,3 +96,27 @@ compile(Sections) -> end, [{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections]. +match_topic(Topic, []) -> + Topic; +match_topic(Topic, [{topic, Filter, Rules}|Sections]) -> + case emqtt_topic:match(Topic, Filter) of + true -> + match_rule(Topic, Rules); + false -> + match_topic(Topic, Sections) + end. + +match_rule(Topic, []) -> + Topic; +match_rule(Topic, [{rewrite, MP, Dest}|Rules]) -> + case re:run(Topic, MP, [{captrue, all_but_first, list}]) of + {match, Captured} -> + %%TODO: stupid??? how to replace $1, $2? + Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), + iolist_to_binary(lists:foldl( + fun({Var, Val}, Acc) -> + re:replace(Acc, Var, Val, [global]) + end, Dest, Vars)); + nomatch -> + match_rule(Topic, Rules) + end.