From 16f23406a4feae48f5e3fa15eada91f29db1bae1 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 2 Dec 2015 18:01:26 +0800 Subject: [PATCH] 0.14 --- include/emqttd.hrl | 19 +- include/emqttd_internal.hrl | 12 +- rel/files/emqttd.config.development | 13 +- rel/files/emqttd.config.production | 9 + rel/files/vm.args | 3 + src/emqttd.erl | 20 +- src/emqttd_access_control.erl | 8 +- src/emqttd_access_rule.erl | 8 +- src/emqttd_acl_internal.erl | 8 +- src/emqttd_acl_mod.erl | 8 +- src/emqttd_alarm.erl | 8 +- src/emqttd_app.erl | 13 +- src/emqttd_auth_anonymous.erl | 8 +- src/emqttd_auth_clientid.erl | 8 +- src/emqttd_auth_ldap.erl | 8 +- src/emqttd_auth_mod.erl | 8 +- src/emqttd_auth_username.erl | 8 +- src/emqttd_bridge.erl | 8 +- src/emqttd_bridge_sup.erl | 8 +- src/emqttd_broker.erl | 18 +- src/emqttd_cli.erl | 8 +- src/emqttd_client.erl | 10 +- src/emqttd_cm.erl | 53 ++- src/emqttd_cm_sup.erl | 43 +- src/emqttd_ctl.erl | 10 +- src/emqttd_dist.erl | 6 +- src/emqttd_gen_mod.erl | 8 +- src/emqttd_guid.erl | 5 +- src/emqttd_http.erl | 8 +- src/emqttd_keepalive.erl | 5 +- src/emqttd_message.erl | 8 +- src/emqttd_metrics.erl | 10 +- src/emqttd_mnesia.erl | 8 +- src/emqttd_mod_autosub.erl | 10 +- src/emqttd_mod_presence.erl | 10 +- src/emqttd_mod_rewrite.erl | 10 +- src/emqttd_mod_sup.erl | 10 +- src/emqttd_mqueue.erl | 5 +- src/emqttd_net.erl | 8 +- src/emqttd_opts.erl | 8 +- src/emqttd_packet.erl | 8 +- src/emqttd_parser.erl | 8 +- src/emqttd_plugins.erl | 8 +- src/emqttd_pooler.erl | 47 ++- src/emqttd_pooler_sup.erl | 58 --- src/emqttd_protocol.erl | 8 +- src/emqttd_pubsub.erl | 372 +++--------------- src/emqttd_pubsub_sup.erl | 61 ++- ...mqttd_retained.erl => emqttd_retainer.erl} | 14 +- src/emqttd_serialiser.erl | 8 +- src/emqttd_session.erl | 11 +- src/emqttd_session_sup.erl | 8 +- src/emqttd_sm.erl | 23 +- src/emqttd_sm_helper.erl | 20 +- src/emqttd_sm_sup.erl | 51 +-- src/emqttd_stats.erl | 8 +- src/emqttd_sup.erl | 10 +- src/emqttd_sysmon.erl | 8 +- src/emqttd_topic.erl | 8 +- src/emqttd_trace.erl | 10 +- src/emqttd_trie.erl | 5 +- src/emqttd_util.erl | 8 +- src/emqttd_vm.erl | 8 +- src/emqttd_ws_client.erl | 8 +- 64 files changed, 447 insertions(+), 777 deletions(-) delete mode 100644 src/emqttd_pooler_sup.erl rename src/{emqttd_retained.erl => emqttd_retainer.erl} (98%) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 51bce2f6d..739600324 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -60,26 +60,15 @@ -type mqtt_topic() :: #mqtt_topic{}. %%------------------------------------------------------------------------------ -%% MQTT Subscriber +%% MQTT Subscription %%------------------------------------------------------------------------------ --record(mqtt_subscriber, { +-record(mqtt_subscription, { + clientid :: binary() | atom(), topic :: binary(), - subpid :: pid(), qos = 0 :: 0 | 1 | 2 }). --type mqtt_subscriber() :: #mqtt_subscriber{}. - -%%------------------------------------------------------------------------------ -%% P2P Queue Subscriber -%%------------------------------------------------------------------------------ --record(mqtt_queue, { - name :: binary(), - qpid :: pid(), - qos = 0 :: 0 | 1 | 2 -}). - --type mqtt_queue() :: #mqtt_queue{}. +-type mqtt_subscription() :: #mqtt_subscription{}. %%------------------------------------------------------------------------------ %% MQTT Client diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index 4f2eb378c..7a4806570 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -19,12 +19,18 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Internal Header. +%%% @doc Internal Header File %%% -%%% @end %%%----------------------------------------------------------------------------- +-define(GPROC_POOL(JoinOrLeave, Pool, I), + (begin + case JoinOrLeave of + join -> gproc_pool:connect_worker(Pool, {Pool, Id}); + leave -> gproc_pool:disconnect_worker(Pool, {Pool, I}) + end + end)). + -define(record_to_proplist(Def, Rec), lists:zip(record_info(fields, Def), tl(tuple_to_list(Rec)))). diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index ae682f7bb..506f1c48e 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -141,11 +141,18 @@ %% Max Payload Size of retained message {max_playload_size, 65536} ]}, - %% PubSub + + %% PubSub and Router {pubsub, [ - %% default should be scheduler numbers - %% {pool_size, 8} + %% Default should be scheduler numbers + %% {pool_size, 8}, + + %% Route aging time(second) + {shard, true}, + + {aging, 10} ]}, + %% Bridge {bridge, [ %%TODO: bridge queue size diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index ee3184e6c..2fb4fb91b 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -138,6 +138,15 @@ %% default should be scheduler numbers %% {pool_size, 8} ]}, + + %% Router + {router, [ + %% Default should be scheduler numbers + %% {pool_size, 8}, + %% Route aging time(second) + {aging, 5} + ]}, + %% Bridge {bridge, [ %%TODO: bridge queue size diff --git a/rel/files/vm.args b/rel/files/vm.args index 9cc798802..399b76d8f 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -34,6 +34,9 @@ ## Valid range is 1-2097151. Default is 1024. ## +zdbbl 8192 +## CPU Schedulers +## +sbt db + ##------------------------------------------------------------------------- ## Env ##------------------------------------------------------------------------- diff --git a/src/emqttd.erl b/src/emqttd.erl index 540a11fd1..f41a67c2b 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -19,19 +19,17 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd main module. +%%% @doc emqttd main module. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd). --author("Feng Lee "). - -export([start/0, env/1, env/2, open_listeners/1, close_listeners/1, load_all_mods/0, is_mod_enabled/1, - is_running/1]). + is_running/1, ensure_pool/3]). -define(MQTT_SOCKOPTS, [ binary, @@ -130,3 +128,13 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. +%%------------------------------------------------------------------------------ +%% @doc Ensure gproc pool exist. +%% @end +%%------------------------------------------------------------------------------ +ensure_pool(Pool, Type, Opts) -> + try gproc_pool:new(Pool, Type, Opts) + catch + error:exists -> ok + end. + diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 7d15d5904..3dc51f76f 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd authentication and ACL server. +%%% @doc Authentication and ACL Control Server +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_access_control). --author("Feng Lee "). - -include("emqttd.hrl"). -behaviour(gen_server). diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 30ed8f87b..ff32bff14 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd ACL rule. +%%% @doc emqttd ACL Rule +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_access_rule). --author("Feng Lee "). - -include("emqttd.hrl"). -type who() :: all | binary() | diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index b6dc0b81b..dd58033d3 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% Internal ACL that load rules from etc/acl.config +%%% @doc Internal ACL that load rules from etc/acl.config +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_acl_internal). --author("Feng Lee "). - -include("emqttd.hrl"). -export([all_rules/0]). diff --git a/src/emqttd_acl_mod.erl b/src/emqttd_acl_mod.erl index a961043b6..37ada1bd2 100644 --- a/src/emqttd_acl_mod.erl +++ b/src/emqttd_acl_mod.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% ACL module behaviour. +%%% @doc ACL module behaviour +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_acl_mod). --author("Feng Lee "). - -include("emqttd.hrl"). %%%============================================================================= diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index 4f176af8f..b0fa4301e 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% copy alarm_handler. +%%% @doc Copy alarm_handler +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_alarm). --author("Feng Lee "). - -include("emqttd.hrl"). -behaviour(gen_event). diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 4ee9cabd1..005251757 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd application. +%%% @doc emqttd application. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_app). --author("Feng Lee "). - -include("emqttd_cli.hrl"). -behaviour(application). @@ -72,11 +70,12 @@ start_listeners() -> start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, {"emqttd trace", emqttd_trace}, + {"emqttd router", {supervisor, emqttd_router_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, - {"emqttd retained", emqttd_retained}, - {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, + {"emqttd retainer", emqttd_retainer}, + {"emqttd pooler", {supervisor, emqttd_pooler}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl index 95592c22b..4ae6ecff7 100644 --- a/src/emqttd_auth_anonymous.erl +++ b/src/emqttd_auth_anonymous.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% Anonymous authentication module. +%%% @doc Anonymous authentication module +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_anonymous). --author("Feng Lee "). - -behaviour(emqttd_auth_mod). -export([init/1, check/3, description/0]). diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 0d28b8421..548c2a58d 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% ClientId Authentication Module. +%%% @doc ClientId Authentication Module +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_clientid). --author("Feng Lee "). - -include("emqttd.hrl"). -export([add_clientid/1, add_clientid/2, diff --git a/src/emqttd_auth_ldap.erl b/src/emqttd_auth_ldap.erl index 09ca89142..23895172d 100644 --- a/src/emqttd_auth_ldap.erl +++ b/src/emqttd_auth_ldap.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% LDAP Authentication Module +%%% @doc LDAP Authentication Module +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_ldap). --author("Feng Lee "). - -include("emqttd.hrl"). -import(proplists, [get_value/2, get_value/3]). diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 86e08df4b..bfc9a276e 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd authentication behaviour. +%%% @doc emqttd authentication behaviour +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_mod). --author("Feng Lee "). - -include("emqttd.hrl"). %%%============================================================================= diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 855a3d6e6..c8ee0c31f 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% Authentication with username and password. +%%% @doc Authentication with username and password +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_username). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_cli.hrl"). diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 12126dcd6..dc0df324f 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd bridge. +%%% @doc emqttd bridge +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_bridge). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 35003f152..fa7924503 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd bridge supervisor. +%%% @doc Bridge Supervisor +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_bridge_sup). --author("Feng Lee "). - -behavior(supervisor). -export([start_link/0, diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 88a3713cc..3075df994 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -19,14 +19,14 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd broker. +%%% @doc emqttd broker +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_broker). --author("Feng Lee "). +-behaviour(gen_server). -include("emqttd.hrl"). @@ -48,18 +48,16 @@ %% Tick API -export([start_tick/1, stop_tick/1]). --behaviour(gen_server). - --define(SERVER, ?MODULE). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(BROKER_TAB, mqtt_broker). - -record(state, {started_at, sys_interval, heartbeat, tick_tref}). +-define(SERVER, ?MODULE). + +-define(BROKER_TAB, mqtt_broker). + %% $SYS Topics of Broker -define(SYSTOP_BROKERS, [ version, % Broker version diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 340125c09..5d8f3624a 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd cli. +%%% @doc emqttd cli +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_cli). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_cli.hrl"). diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 196732320..e808f5ef8 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -19,14 +19,14 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Client Connection. +%%% @doc MQTT Client Connection +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_client). --author("Feng Lee "). +-behaviour(gen_server). -include("emqttd.hrl"). @@ -34,8 +34,6 @@ -include("emqttd_internal.hrl"). --behaviour(gen_server). - %% API Function Exports -export([start_link/2, session/1, info/1, kick/1]). diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 63cb358e3..05063429f 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -19,19 +19,19 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Client Manager +%%% @doc MQTT Client Manager +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_cm). --author("Feng Lee "). - -include("emqttd.hrl"). +-include("emqttd_internal.hrl"). + %% API Exports --export([start_link/2, pool/0]). +-export([start_link/3]). -export([lookup/1, lookup_proc/1, register/1, unregister/1]). @@ -44,28 +44,27 @@ %% gen_server2 priorities -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). --record(state, {id, statsfun, monitors}). +-record(state, {pool, id, statsfun, monitors}). --define(CM_POOL, ?MODULE). +-define(POOL, ?MODULE). %%%============================================================================= %%% API %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc Start client manager +%% @doc Start Client Manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when - Id :: pos_integer(), +-spec start_link(Pool, Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + Pool :: atom(), + Id :: pos_integer(), StatsFun :: fun(). -start_link(Id, StatsFun) -> - gen_server2:start_link(?MODULE, [Id, StatsFun], []). - -pool() -> ?CM_POOL. +start_link(Pool, Id, StatsFun) -> + gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []). %%------------------------------------------------------------------------------ -%% @doc Lookup client by clientId +%% @doc Lookup Client by ClientId %% @end %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> mqtt_client() | undefined. @@ -81,19 +80,18 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec lookup_proc(ClientId :: binary()) -> pid() | undefined. lookup_proc(ClientId) when is_binary(ClientId) -> - try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid) of - Pid -> Pid + try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid) catch error:badarg -> undefined end. %%------------------------------------------------------------------------------ -%% @doc Register clientId with pid. +%% @doc Register ClientId with Pid. %% @end %%------------------------------------------------------------------------------ -spec register(Client :: mqtt_client()) -> ok. register(Client = #mqtt_client{client_id = ClientId}) -> - CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), + CmPid = gproc_pool:pick_worker(?POOL, ClientId), gen_server2:cast(CmPid, {register, Client}). %%------------------------------------------------------------------------------ @@ -102,16 +100,18 @@ register(Client = #mqtt_client{client_id = ClientId}) -> %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), + CmPid = gproc_pool:pick_worker(?POOL, ClientId), gen_server2:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, StatsFun]) -> - gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), - {ok, #state{id = Id, statsfun = StatsFun, monitors = dict:new()}}. +init([Pool, Id, StatsFun]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id, + statsfun = StatsFun, + monitors = dict:new()}}. prioritise_call(_Req, _From, _Len, _State) -> 1. @@ -172,9 +172,8 @@ handle_info(Info, State) -> lager:error("Unexpected Info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), - ok. +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_cm_sup.erl b/src/emqttd_cm_sup.erl index aa7af9d8a..7b466bd7c 100644 --- a/src/emqttd_cm_sup.erl +++ b/src/emqttd_cm_sup.erl @@ -19,41 +19,46 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd client manager supervisor. +%%% @doc Client Manager Supervisor. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_cm_sup). --author("Feng Lee "). +-behaviour(supervisor). -include("emqttd.hrl"). --behaviour(supervisor). - %% API -export([start_link/0]). %% Supervisor callbacks -export([init/1]). +-define(CM, emqttd_cm). + +-define(TAB, mqtt_client). + start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ets:new(mqtt_client, [ordered_set, named_table, public, - {keypos, 2}, {write_concurrency, true}]), - Schedulers = erlang:system_info(schedulers), - gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), - StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), - Children = lists:map( - fun(I) -> - Name = {emqttd_cm, I}, - gproc_pool:add_worker(emqttd_cm:pool(), Name, I), - {Name, {emqttd_cm, start_link, [I, StatsFun]}, - permanent, 10000, worker, [emqttd_cm]} - end, lists:seq(1, Schedulers)), - {ok, {{one_for_all, 10, 100}, Children}}. + %% Create client table + create_client_tab(), + %% CM Pool Sup + MFA = {?CM, start_link, [emqttd_stats:statsfun('clients/count', 'clients/max')]}, + PoolSup = emqttd_pool_sup:spec(pool_sup, [?CM, hash, erlang:system_info(schedulers), MFA]), + + {ok, {{one_for_all, 10, 3600}, [PoolSup]}}. + +create_client_tab() -> + case ets:info(?TAB, name) of + undefined -> + ets:new(?TAB, [ordered_set, named_table, public, + {keypos, 2}, {write_concurrency, true}]); + _ -> + ok + end. diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index bde8344b0..08f053aea 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -19,21 +19,19 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd control. +%%% @doc emqttd control +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_ctl). --author("Feng Lee "). +-behaviour(gen_server). -include("emqttd.hrl"). -include("emqttd_cli.hrl"). --behaviour(gen_server). - -define(SERVER, ?MODULE). %% API Function Exports diff --git a/src/emqttd_dist.erl b/src/emqttd_dist.erl index fc440bac9..50ee55583 100644 --- a/src/emqttd_dist.erl +++ b/src/emqttd_dist.erl @@ -19,10 +19,10 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd distribution functions. +%%% @doc emqttd distribution functions +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_dist). diff --git a/src/emqttd_gen_mod.erl b/src/emqttd_gen_mod.erl index 682b9ab05..ae67e01b6 100644 --- a/src/emqttd_gen_mod.erl +++ b/src/emqttd_gen_mod.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd gen_mod behaviour +%%% @doc emqttd gen_mod behaviour +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_gen_mod). --author("Feng Lee "). - -include("emqttd.hrl"). -ifdef(use_specs). diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index a97ba1f15..09c76e3e2 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -34,11 +34,12 @@ %%% 4. Sequence: 2 bytes sequence in one process %%% %%% @end +%%% +%%% @author Feng Lee +%%% %%%----------------------------------------------------------------------------- -module(emqttd_guid). --author("Feng Lee "). - -export([gen/0, new/0, timestamp/1]). -define(MAX_SEQ, 16#FFFF). diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 5164f8adf..0a17df04d 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd http publish API and websocket client. +%%% @doc emqttd http publish API and websocket client. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_http). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index bce8b6b6f..dedea2ac5 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -21,12 +21,11 @@ %%%----------------------------------------------------------------------------- %%% @doc client keepalive %%% -%%% @end +%%% @author Feng Lee +%%% %%%----------------------------------------------------------------------------- -module(emqttd_keepalive). --author("Feng Lee "). - -export([start/3, check/1, cancel/1]). -record(keepalive, {statfun, statval, diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 9d340b2b3..a8d275340 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Message Functions +%%% @doc MQTT Message Functions +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_message). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index 5f36aa042..b9853ea6d 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -19,21 +19,19 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd metrics. responsible for collecting broker metrics. +%%% @doc emqttd metrics. responsible for collecting broker metrics +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_metrics). --author("Feng Lee "). +-behaviour(gen_server). -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). --behaviour(gen_server). - -define(SERVER, ?MODULE). %% API Function Exports diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 82fece19b..c6a9ce8d4 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd mnesia. +%%% @doc emqttd mnesia +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_mnesia). --author("Feng Lee "). - -include("emqttd.hrl"). -export([start/0, cluster/1]). diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_autosub.erl index 00dc3dc0c..e17c1f2da 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_autosub.erl @@ -19,21 +19,19 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd auto subscribe module. +%%% @doc emqttd auto subscribe module. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_mod_autosub). --author("Feng Lee "). +-behaviour(emqttd_gen_mod). -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). --behaviour(emqttd_gen_mod). - -export([load/1, client_connected/3, unload/1]). -record(state, {topics}). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 81f7348d4..a496b19a5 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -19,19 +19,17 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd presence management module. +%%% @doc emqttd presence management module +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_mod_presence). --author("Feng Lee "). +-behaviour(emqttd_gen_mod). -include("emqttd.hrl"). --behaviour(emqttd_gen_mod). - -export([load/1, unload/1]). -export([client_connected/3, client_disconnected/3]). diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index 5ddf2b2ee..b7a72eccb 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -19,19 +19,17 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd rewrite module. +%%% @doc emqttd rewrite module +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_mod_rewrite). --author("Feng Lee "). +-behaviour(emqttd_gen_mod). -include("emqttd.hrl"). --behaviour(emqttd_gen_mod). - -export([load/1, reload/1, unload/1]). -export([rewrite/3, rewrite/4]). diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index 8fa3ced59..09d09eef1 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -19,19 +19,17 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd module supervisor. +%%% @doc emqttd module supervisor. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_mod_sup). --author("Feng Lee "). +-behaviour(supervisor). -include("emqttd.hrl"). --behaviour(supervisor). - %% API -export([start_link/0, start_child/1, start_child/2]). diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 59bb2883b..82908caab 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -46,11 +46,12 @@ %%% otherwise dropped the oldest pending one. %%% %%% @end +%%% +%%% @author Feng Lee +%%% %%%----------------------------------------------------------------------------- -module(emqttd_mqueue). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_net.erl b/src/emqttd_net.erl index 627856cd8..bfa0b1f6f 100644 --- a/src/emqttd_net.erl +++ b/src/emqttd_net.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd net utility functions. some functions copied from rabbitmq. +%%% @doc emqttd net utility functions. some functions copied from rabbitmq. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_net). --author("Feng Lee "). - -include_lib("kernel/include/inet.hrl"). -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, diff --git a/src/emqttd_opts.erl b/src/emqttd_opts.erl index dfd5b74b6..69b77468b 100644 --- a/src/emqttd_opts.erl +++ b/src/emqttd_opts.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd options handler. +%%% @doc emqttd options handler. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_opts). --author("Feng Lee "). - -export([merge/2, g/2, g/3]). %%------------------------------------------------------------------------------ diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index 286289af3..94e5b457c 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Packet Functions +%%% @doc MQTT Packet Functions +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_packet). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index 01b91131f..05aafd204 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Packet Parser. +%%% @doc MQTT Packet Parser +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_parser). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 4bd86592e..73cd491ff 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd plugins. +%%% @doc emqttd plugins. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_plugins). --author("Feng Lee "). - -include("emqttd.hrl"). -export([load/0, unload/0]). diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 13f4d6b8d..b9acc5a39 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -19,32 +19,42 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd pooler. +%%% @doc emqttd pooler. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_pooler). --author("Feng Lee "). - -behaviour(gen_server). +-include("emqttd_internal.hrl"). + +%% Start the pool supervisor +-export([start_link/0]). + %% API Exports --export([start_link/1, submit/1, async_submit/1]). +-export([start_link/2, submit/1, async_submit/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {id}). +-record(state, {pool, id}). + +%%------------------------------------------------------------------------------ +%% @doc Start Pooler Supervisor. +%% @end +%%------------------------------------------------------------------------------ +start_link() -> + emqttd_pool_sup:start_link(pooler, random, {?MODULE, start_link, []}). %%%============================================================================= %%% API %%%============================================================================= --spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. -start_link(Id) -> - gen_server:start_link({local, name(Id)}, ?MODULE, [Id], []). +-spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. +start_link(Pool, Id) -> + gen_server:start_link({local, name(Id)}, ?MODULE, [Pool, Id], []). name(Id) -> list_to_atom(lists:concat([?MODULE, "_", integer_to_list(Id)])). @@ -54,22 +64,24 @@ name(Id) -> %% @end %%------------------------------------------------------------------------------ submit(Fun) -> - gen_server:call(gproc_pool:pick_worker(pooler), {submit, Fun}, infinity). + Worker = gproc_pool:pick_worker(pooler), + gen_server:call(Worker, {submit, Fun}, infinity). %%------------------------------------------------------------------------------ %% @doc Submit work to pooler asynchronously %% @end %%------------------------------------------------------------------------------ async_submit(Fun) -> - gen_server:cast(gproc_pool:pick_worker(pooler), {async_submit, Fun}). + Worker = gproc_pool:pick_worker(pooler), + gen_server:cast(Worker, {async_submit, Fun}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id]) -> - gproc_pool:connect_worker(pooler, {pooler, Id}), - {ok, #state{id = Id}}. +init([Pool, Id]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id}}. handle_call({submit, Fun}, _From, State) -> {reply, run(Fun), State}; @@ -90,8 +102,8 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{id = I}) -> - gproc_pool:disconnect_worker(pooler, {pooler, I}), ok. +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -105,4 +117,3 @@ run({M, F, A}) -> run(Fun) when is_function(Fun) -> Fun(). - diff --git a/src/emqttd_pooler_sup.erl b/src/emqttd_pooler_sup.erl deleted file mode 100644 index d5f3a0ee5..000000000 --- a/src/emqttd_pooler_sup.erl +++ /dev/null @@ -1,58 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd pooler supervisor. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_pooler_sup). - --author("Feng Lee "). - --include("emqttd.hrl"). - --behaviour(supervisor). - -%% API --export([start_link/0, start_link/1]). - -%% Supervisor callbacks --export([init/1]). - -start_link() -> - start_link(erlang:system_info(schedulers)). - -start_link(PoolSize) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [PoolSize]). - -init([PoolSize]) -> - gproc_pool:new(pooler, random, [{size, PoolSize}]), - Children = lists:map( - fun(I) -> - gproc_pool:add_worker(pooler, {pooler, I}, I), - {{emqttd_pooler, I}, - {emqttd_pooler, start_link, [I]}, - permanent, 5000, worker, [emqttd_pooler]} - end, lists:seq(1, PoolSize)), - {ok, {{one_for_all, 10, 100}, Children}}. - - diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 29985f044..36b9acdaf 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd protocol. +%%% @doc emqttd protocol. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_protocol). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 0f8d3a2c6..8ca3b88ab 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -19,19 +19,21 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd pubsub. +%%% @doc emqttd pubsub +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_pubsub). --author("Feng Lee "). +-behaviour(gen_server2). -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). +-include("emqttd_internal.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). @@ -39,59 +41,42 @@ -copy_mnesia({mnesia, [copy]}). %% API Exports --export([start_link/2]). +-export([start_link/3]). --export([create/1, - subscribe/1, subscribe/2, - unsubscribe/1, - publish/1]). +-export([create/1, subscribe/1, subscribe/2, unsubscribe/1, publish/1]). %% Local node --export([dispatch/2, match/1]). - --behaviour(gen_server2). +-export([match/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% gen_server2 priorities --export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). +-record(state, {pool, id}). --define(POOL, pubsub). - --record(state, {id, submap :: map()}). +-define(ROUTER, emqttd_router). %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= mnesia(boot) -> - %% p2p queue table - ok = emqttd_mnesia:create_table(queue, [ - {type, set}, - {ram_copies, [node()]}, - {record_name, mqtt_queue}, - {attributes, record_info(fields, mqtt_queue)}]), %% topic table ok = emqttd_mnesia:create_table(topic, [ {type, bag}, {ram_copies, [node()]}, {record_name, mqtt_topic}, {attributes, record_info(fields, mqtt_topic)}]), - %% local subscriber table, not shared with other nodes - ok = emqttd_mnesia:create_table(subscriber, [ + %% subscription table + ok = emqttd_mnesia:create_table(subscription, [ {type, bag}, {ram_copies, [node()]}, - {record_name, mqtt_subscriber}, - {attributes, record_info(fields, mqtt_subscriber)}, - {index, [subpid]}, - {local_content, true}]); + {record_name, mqtt_subscription}, + {attributes, record_info(fields, mqtt_subscription)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(queue), ok = emqttd_mnesia:copy_table(topic), - ok = emqttd_mnesia:copy_table(subscriber). + ok = emqttd_mnesia:copy_table(subscription). %%%============================================================================= %%% API @@ -101,11 +86,12 @@ mnesia(copy) -> %% @doc Start one pubsub server %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, Opts) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Pool, Id, Opts) -> {ok, pid()} | ignore | {error, any()} when + Pool :: atom(), Id :: pos_integer(), - Opts :: list(). -start_link(Id, Opts) -> - gen_server2:start_link({local, name(Id)}, ?MODULE, [Id, Opts], []). + Opts :: list(tuple()). +start_link(Pool, Id, Opts) -> + gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, Opts], []). name(Id) -> list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)). @@ -115,38 +101,38 @@ name(Id) -> %% @end %%------------------------------------------------------------------------------ -spec create(Topic :: binary()) -> ok | {error, Error :: any()}. -create(<<"$Q/", _Queue/binary>>) -> - %% protecte from queue - {error, cannot_create_queue}; - create(Topic) when is_binary(Topic) -> - TopicR = #mqtt_topic{topic = Topic, node = node()}, - case mnesia:transaction(fun add_topic/1, [TopicR]) of - {atomic, ok} -> - setstats(topics), ok; - {aborted, Error} -> - {error, Error} + case mnesia:transaction(fun add_topic/1, [#mqtt_topic{topic = Topic, node = node()}]) of + {atomic, ok} -> setstats(topics), ok; + {aborted, Error} -> {error, Error} end. %%------------------------------------------------------------------------------ -%% @doc Subscribe topic +%% @doc Subscribe Topic %% @end %%------------------------------------------------------------------------------ +-spec subscribe(Topic, Qos) -> {ok, Qos} when + Topic :: binary(), + Qos :: mqtt_qos() | mqtt_qos_name(). +subscribe(Topic, Qos) -> + %%TODO:... + subscribe([{Topic, Qos}]). + -spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), Qos :: mqtt_qos() | mqtt_qos_name(). subscribe({Topic, Qos}) when is_binary(Topic) andalso (?IS_QOS(Qos) orelse is_atom(Qos)) -> - call({subscribe, self(), Topic, ?QOS_I(Qos)}); + %%TODO:... + subscribe([{Topic, Qos}]); -subscribe(Topics = [{_Topic, _Qos} | _]) -> - call({subscribe, self(), [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- Topics]}). - --spec subscribe(Topic, Qos) -> {ok, Qos} when - Topic :: binary(), - Qos :: mqtt_qos() | mqtt_qos_name(). -subscribe(Topic, Qos) -> - subscribe({Topic, Qos}). +subscribe(TopicTable0 = [{_Topic, _Qos} | _]) -> + Self = self(), + TopicTable = [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable0], + ?ROUTER:add_routes(TopicTable, Self), + PubSub = gproc_pool:pick_worker(pubsub, Self), + SubReq = {subscribe, Self, TopicTable}, + gen_server2:call(PubSub, SubReq, infinity). %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topic or Topics @@ -154,18 +140,13 @@ subscribe(Topic, Qos) -> %%------------------------------------------------------------------------------ -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> - cast({unsubscribe, self(), Topic}); + unsubscribe([Topic]); unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> - cast({unsubscribe, self(), Topics}). - -call(Req) -> - Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server2:call(Pid, Req, infinity). - -cast(Msg) -> - Pid = gproc_pool:pick_worker(?POOL, self()), - gen_server2:cast(Pid, Msg). + Self = self(), + ?ROUTER:delete_routes(Topics, Self), + PubSub = gproc_pool:pick_worker(pubsub, Self), + gen_server2:cast(PubSub, {unsubscribe, Self, Topics}). %%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes @@ -178,7 +159,7 @@ publish(Msg = #mqtt_message{from = From}) -> = emqttd_broker:foldl_hooks('message.publish', [], Msg), %% Retain message first. Don't create retained topic. - case emqttd_retained:retain(Msg1) of + case emqttd_retainer:retain(Msg1) of ok -> %% TODO: why unset 'retain' flag? publish(Topic, emqttd_message:unset_flag(Msg1)); @@ -186,42 +167,12 @@ publish(Msg = #mqtt_message{from = From}) -> publish(Topic, Msg1) end. -publish(Queue = <<"$Q/", _/binary>>, Msg = #mqtt_message{qos = Qos}) -> - lists:foreach( - fun(#mqtt_queue{qpid = QPid, qos = SubQos}) -> - Msg1 = if - Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; - true -> Msg - end, - QPid ! {dispatch, Msg1} - end, mnesia:dirty_read(queue, Queue)); - publish(Topic, Msg) when is_binary(Topic) -> lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) -> - case Node =:= node() of - true -> dispatch(Name, Msg); - false -> rpc:cast(Node, ?MODULE, dispatch, [Name, Msg]) - end - end, match(Topic)). - -%%------------------------------------------------------------------------------ -%% @doc Dispatch message locally. should only be called by publish. -%% @end -%%------------------------------------------------------------------------------ --spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). -dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - Subscribers = mnesia:dirty_read(subscriber, Topic), - setstats(dropped, Subscribers =:= []), - lists:foreach( - fun(#mqtt_subscriber{subpid=SubPid, qos = SubQos}) -> - Msg1 = if - Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; - true -> Msg - end, - SubPid ! {dispatch, Msg1} - end, Subscribers), - length(Subscribers). + rpc:cast(Node, ?ROUTER, route, [Name, Msg]) + end, match(Topic)). +%%TODO: Benchmark and refactor... -spec match(Topic :: binary()) -> [mqtt_topic()]. match(Topic) when is_binary(Topic) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), @@ -231,172 +182,38 @@ match(Topic) when is_binary(Topic) -> %%% gen_server callbacks %%%============================================================================= -init([Id, _Opts]) -> - %%process_flag(priority, high), - %%process_flag(min_heap_size, 1024*1024), - gproc_pool:connect_worker(pubsub, {?MODULE, Id}), - {ok, #state{id = Id, submap = maps:new()}}. +init([Pool, Id, _Opts]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id}}. -prioritise_call(Msg, _From, _Len, _State) -> - case Msg of - {subscriber, _, _} -> 1; - _ -> 0 - end. - -prioritise_cast(Msg, _Len, _State) -> - case Msg of - {unsubscribe, _, _} -> 2; - _ -> 0 - end. - -prioritise_info(Msg, _Len, _State) -> - case Msg of - {'DOWN', _, _, _, _} -> 3; - _ -> 0 - end. - -handle_call({subscribe, SubPid, Topics}, _From, State) -> - TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) -> - #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}; - ({Topic, Qos}) -> - {#mqtt_topic{topic = Topic, node = node()}, - #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}} - end, Topics), - F = fun() -> - lists:map(fun(QueueR) when is_record(QueueR, mqtt_queue) -> - add_queue(QueueR); - (TopicSub) -> - add_subscriber(TopicSub) - end, TopicSubs) - end, - case mnesia:transaction(F) of +%%TODO: clientId??? +handle_call({subscribe, _SubPid, TopicTable}, _From, State) -> + Records = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable], + case mnesia:transaction(fun() -> [add_topic(Record) || Record <- Records] end) of {atomic, _Result} -> - setstats(all), - NewState = monitor_subscriber(SubPid, State), - %%TODO: grant all qos - {reply, {ok, [Qos || {_Topic, Qos} <- Topics]}, NewState}; + {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, setstats(State)}; {aborted, Error} -> {reply, {error, Error}, State} end; -handle_call({subscribe, SubPid, <<"$Q/", _/binary>> = Queue, Qos}, _From, State) -> - case mnesia:dirty_read(queue, Queue) of - [OldQueueR] -> lager:error("Queue is overwrited by ~p: ~p", [SubPid, OldQueueR]); - [] -> ok - end, - QueueR = #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos}, - case mnesia:transaction(fun add_queue/1, [QueueR]) of - {atomic, ok} -> - setstats(queues), - {reply, {ok, Qos}, monitor_subscriber(SubPid, State)}; - {aborted, Error} -> - {reply, {error, Error}, State} - end; - -handle_call({subscribe, SubPid, Topic, Qos}, _From, State) -> - TopicR = #mqtt_topic{topic = Topic, node = node()}, - Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}, - case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of - {atomic, ok} -> - setstats(all), - {reply, {ok, Qos}, monitor_subscriber(SubPid, State)}; - {aborted, Error} -> - {reply, {error, Error}, State} - end; - handle_call(Req, _From, State) -> lager:error("Bad Request: ~p", [Req]), {reply, {error, badreq}, State}. +%%TODO: clientId??? handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> - - TopicSubs = lists:map(fun(<<"$Q/", _/binary>> = Queue) -> - #mqtt_queue{name = Queue, qpid = SubPid}; - (Topic) -> - {#mqtt_topic{topic = Topic, node = node()}, - #mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'}} - end, Topics), - F = fun() -> - lists:foreach( - fun(QueueR) when is_record(QueueR, mqtt_queue) -> - remove_queue(QueueR); - (TopicSub) -> - remove_subscriber(TopicSub) - end, TopicSubs) - end, - case mnesia:transaction(F) of - {atomic, _} -> ok; - {aborted, Error} -> lager:error("unsubscribe ~p error: ~p", [Topics, Error]) - end, - setstats(all), - {noreply, State}; - -handle_cast({unsubscribe, SubPid, <<"$Q/", _/binary>> = Queue}, State) -> - QueueR = #mqtt_queue{name = Queue, qpid = SubPid}, - case mnesia:transaction(fun remove_queue/1, [QueueR]) of - {atomic, _} -> - setstats(queues); - {aborted, Error} -> - lager:error("unsubscribe queue ~s error: ~p", [Queue, Error]) - end, - {noreply, State}; - -handle_cast({unsubscribe, SubPid, Topic}, State) -> - TopicR = #mqtt_topic{topic = Topic, node = node()}, - Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'}, - case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of - {atomic, _} -> ok; - {aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error]) - end, - setstats(all), {noreply, State}; handle_cast(Msg, State) -> lager:error("Bad Msg: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) -> - case maps:is_key(DownPid, SubMap) of - true -> - Node = node(), - F = fun() -> - %% remove queue... - Queues = mnesia:match_object(queue, #mqtt_queue{qpid = DownPid, _ = '_'}, write), - lists:foreach(fun(QueueR) -> - mnesia:delete_object(queue, QueueR, write) - end, Queues), - - %% remove subscribers... - Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid), - lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) -> - mnesia:delete_object(subscriber, Sub, write), - try_remove_topic(#mqtt_topic{topic = Topic, node = Node}) - end, Subscribers) - end, - case catch mnesia:transaction(F) of - {atomic, _} -> ok; - {aborted, Reason} -> - lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) - end, - setstats(all), - {noreply, State#state{submap = maps:remove(DownPid, SubMap)}}; - false -> - lager:error("Unexpected 'DOWN' from ~p", [DownPid]), - {noreply, State} - end; - handle_info(Info, State) -> lager:error("Unexpected Info: ~p", [Info]), {noreply, State}. -terminate(_Reason, _State) -> - TopicR = #mqtt_topic{_ = '_', node = node()}, - F = fun() -> - [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)] - %%TODO: remove trie?? - end, - mnesia:transaction(F), - setstats(all). +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id), setstats(all). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -405,9 +222,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -add_queue(QueueR) -> - mnesia:write(queue, QueueR, write). - add_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:wread({topic, Topic}) of [] -> @@ -420,51 +234,6 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) -> end end. -%% Fix issue #53 - Remove Overlapping Subscriptions -add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}}) - when is_record(TopicR, mqtt_topic) -> - case add_topic(TopicR) of - ok -> - OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos} - <- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.subpid), - SubTopic =:= Topic, SubQos =/= Qos], - - %% remove overlapping subscribers - if - length(OverlapSubs) =:= 0 -> ok; - true -> - lager:warning("Remove overlapping subscribers: ~p", [OverlapSubs]), - [mnesia:delete_object(subscriber, OverlapSub, write) || OverlapSub <- OverlapSubs] - end, - - %% insert subscriber - mnesia:write(subscriber, Subscriber, write); - Error -> - Error - end. - -monitor_subscriber(SubPid, State = #state{submap = SubMap}) -> - NewSubMap = case maps:is_key(SubPid, SubMap) of - false -> - maps:put(SubPid, erlang:monitor(process, SubPid), SubMap); - true -> - SubMap - end, - State#state{submap = NewSubMap}. - -remove_queue(#mqtt_queue{name = Name, qpid = Pid}) -> - case mnesia:wread({queue, Name}) of - [R = #mqtt_queue{qpid = Pid}] -> - mnesia:delete(queue, R, write); - _ -> - ok - end. - -remove_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> - [mnesia:delete_object(subscriber, Sub, write) || - Sub <- mnesia:match_object(subscriber, Subscriber, write)], - try_remove_topic(TopicR). - try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:read({subscriber, Topic}) of [] -> @@ -481,24 +250,9 @@ try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> %%% Stats functions %%%============================================================================= -setstats(all) -> - [setstats(Stat) || Stat <- [queues, topics, subscribers]]; - -setstats(queues) -> - emqttd_stats:setstats('queues/count', 'queues/max', - mnesia:table_info(queue, size)); - -setstats(topics) -> +setstats(State) -> emqttd_stats:setstats('topics/count', 'topics/max', - mnesia:table_info(topic, size)); -setstats(subscribers) -> - emqttd_stats:setstats('subscribers/count', 'subscribers/max', - mnesia:table_info(subscriber, size)). - -setstats(dropped, false) -> - ignore; -setstats(dropped, true) -> - emqttd_metrics:inc('messages/dropped'). + mnesia:table_info(topic, size)), State. %%%============================================================================= %%% Trace functions diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index a0293c226..33f892ee1 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -19,18 +19,18 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd pubsub supervisor. +%%% @doc PubSub Supervisor +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_pubsub_sup). --author("Feng Lee "). +-behaviour(supervisor). -include("emqttd.hrl"). --behaviour(supervisor). +-define(HELPER, emqttd_pubsub_helper). %% API -export([start_link/0]). @@ -39,19 +39,42 @@ -export([init/1]). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> Opts = emqttd_broker:env(pubsub), - Schedulers = erlang:system_info(schedulers), - PoolSize = proplists:get_value(pool_size, Opts, Schedulers), - gproc_pool:new(pubsub, hash, [{size, PoolSize}]), - Children = lists:map( - fun(I) -> - Name = {emqttd_pubsub, I}, - gproc_pool:add_worker(pubsub, Name, I), - {Name, {emqttd_pubsub, start_link, [I, Opts]}, - permanent, 10000, worker, [emqttd_pubsub]} - end, lists:seq(1, PoolSize)), - {ok, {{one_for_all, 10, 100}, Children}}. + supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]). + +init([Opts]) -> + %% Route Table + create_route_tabs(Opts), + + %% PubSub Pool Sup + MFA = {emqttd_pubsub, start_link, [Opts]}, + PoolSup = emqttd_pool_sup:spec(pool_sup, [ + pubsub, hash, pool_size(Opts), MFA]), + + %% PubSub Helper + Helper = {helper, {?HELPER, start_link, [Opts]}, + permanent, infinity, worker, [?HELPER]}, + {ok, {{one_for_all, 10, 60}, [Helper, PoolSup]}}. + +pool_size(Opts) -> + Schedulers = erlang:system_info(schedulers), + proplists:get_value(pool_size, Opts, Schedulers). + +create_route_tabs(_Opts) -> + TabOpts = [bag, public, named_table, + {write_concurrency, true}], + %% Route Table: Topic -> {Pid, QoS} + %% Route Shard: {Topic, Shard} -> {Pid, QoS} + ensure_tab(route, TabOpts), + + %% Reverse Route Table: Pid -> {Topic, QoS} + ensure_tab(reverse_route, TabOpts). + +ensure_tab(Tab, Opts) -> + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, Opts); + _ -> + ok + end. diff --git a/src/emqttd_retained.erl b/src/emqttd_retainer.erl similarity index 98% rename from src/emqttd_retained.erl rename to src/emqttd_retainer.erl index bf5ad51b0..1d20d3abe 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retainer.erl @@ -19,16 +19,18 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT retained message storage. +%%% @doc MQTT retained message storage. %%% %%% TODO: should match topic tree -%%% +%%% %%% @end +%%% +%%% @author Feng Lee +%%% %%%----------------------------------------------------------------------------- --module(emqttd_retained). +-module(emqttd_retainer). --author("Feng Lee "). +-behaviour(gen_server). -include("emqttd.hrl"). @@ -46,8 +48,6 @@ %% API Function Exports -export([start_link/0, expire/1]). --behaviour(gen_server). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/emqttd_serialiser.erl b/src/emqttd_serialiser.erl index 471904bce..60370ddca 100644 --- a/src/emqttd_serialiser.erl +++ b/src/emqttd_serialiser.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Packet Serialiser. +%%% @doc MQTT Packet Serialiser +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_serialiser). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 1cbe7be22..e708e7fe6 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -19,9 +19,7 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% -%%% Session for persistent MQTT client. +%%% @doc Session for persistent MQTT client. %%% %%% Session State in the broker consists of: %%% @@ -43,11 +41,12 @@ %%% State of Message: newcome, inflight, pending %%% %%% @end +%%% +%%% @author Feng Lee +%%% %%%----------------------------------------------------------------------------- -module(emqttd_session). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). @@ -344,7 +343,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter, %% a new Subscription is created and all matching retained messages are sent. - emqttd_retained:dispatch(Topic, self()), + emqttd_retainer:dispatch(Topic, self()), [{Topic, Qos} | Acc] end end, Subscriptions, TopicTable), diff --git a/src/emqttd_session_sup.erl b/src/emqttd_session_sup.erl index 1375c9fda..b9e6afc50 100644 --- a/src/emqttd_session_sup.erl +++ b/src/emqttd_session_sup.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd session supervisor. +%%% @doc emqttd session supervisor. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_session_sup). --author("Feng Lee "). - -behavior(supervisor). -export([start_link/0, start_session/3]). diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index f3211cae9..742e66d87 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd session manager. +%%% @doc Session Manager +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_sm). --author("Feng Lee "). - -include("emqttd.hrl"). %% Mnesia Callbacks @@ -54,7 +52,7 @@ -record(state, {id}). --define(SM_POOL, ?MODULE). +-define(POOL, ?MODULE). -define(TIMEOUT, 60000). @@ -92,19 +90,13 @@ start_link(Id) -> name(Id) -> list_to_atom("emqttd_sm_" ++ integer_to_list(Id)). -%%------------------------------------------------------------------------------ -%% @doc Pool name. -%% @end -%%------------------------------------------------------------------------------ -pool() -> ?SM_POOL. - %%------------------------------------------------------------------------------ %% @doc Start a session %% @end %%------------------------------------------------------------------------------ -spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}. start_session(CleanSess, ClientId) -> - SM = gproc_pool:pick_worker(?SM_POOL, ClientId), + SM = gproc_pool:pick_worker(?POOL, ClientId), call(SM, {start_session, {CleanSess, ClientId, self()}}). %%------------------------------------------------------------------------------ @@ -150,7 +142,7 @@ call(SM, Req) -> %%%============================================================================= init([Id]) -> - gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), + gproc_pool:connect_worker(?POOL, {?MODULE, Id}), {ok, #state{id = Id}}. prioritise_call(_Msg, _From, _Len, _State) -> @@ -193,6 +185,7 @@ handle_cast(Msg, State) -> lager:error("Unexpected Msg: ~p", [Msg]), {noreply, State}. +%%TODO: fix this issue that index_read is really slow... handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> mnesia:transaction(fun() -> [mnesia:delete_object(session, Sess, write) || Sess @@ -205,7 +198,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok. + gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 43a2da90b..78b84c8d8 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -19,23 +19,21 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd session helper. +%%% @doc Session Helper. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_sm_helper). --author("Feng Lee "). +-behaviour(gen_server). -include("emqttd.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). %% API Function Exports --export([start_link/0]). - --behaviour(gen_server). +-export([start_link/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -47,11 +45,11 @@ %% @doc Start a session helper %% @end %%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec start_link(fun()) -> {ok, pid()} | ignore | {error, any()}. +start_link(StatsFun) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). -init([]) -> +init([StatsFun]) -> mnesia:subscribe(system), {ok, TRef} = timer:send_interval(timer:seconds(1), tick), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index d75d0f3bf..c2b4b3304 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -19,25 +19,27 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd session manager supervisor. +%%% @doc Session Manager Supervisor. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_sm_sup). --author("Feng Lee "). +-behaviour(supervisor). -include("emqttd.hrl"). --define(CHILD(Mod), {Mod, {Mod, start_link, []}, - permanent, 5000, worker, [Mod]}). +-define(SM, emqttd_sm). + +-define(HELPER, emqttd_sm_helper). + +-define(TABS, [mqtt_transient_session, + mqtt_persistent_session]). %% API -export([start_link/0]). --behaviour(supervisor). - %% Supervisor callbacks -export([init/1]). @@ -45,20 +47,23 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - init_session_ets(), - Schedulers = erlang:system_info(schedulers), - gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), - Managers = lists:map( - fun(I) -> - Name = {emqttd_sm, I}, - gproc_pool:add_worker(emqttd_sm:pool(), Name, I), - {Name, {emqttd_sm, start_link, [I]}, - permanent, 10000, worker, [emqttd_sm]} - end, lists:seq(1, Schedulers)), - {ok, {{one_for_all, 10, 100}, [?CHILD(emqttd_sm_helper) | Managers]}}. + %% Create session tables + create_session_tabs(), -init_session_ets() -> - Tables = [mqtt_transient_session, mqtt_persistent_session], - Attrs = [ordered_set, named_table, public, {write_concurrency, true}], - lists:foreach(fun(Tab) -> ets:new(Tab, Attrs) end, Tables). + %% Helper + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), + Helper = {?HELPER, {?HELPER, start_link, [StatsFun]}, + permanent, 5000, worker, [?HELPER]}, + + %% SM Pool Sup + MFA = {?SM, start_link, []}, + PoolSup = emqttd_pool_sup:spec(pool_sup, [ + ?SM, hash, erlang:system_info(schedulers), MFA]), + + {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}. + +create_session_tabs() -> + Opts = [ordered_set, named_table, public, + {write_concurrency, true}], + [ets:new(Tab, Opts) || Tab <- ?TABS]. diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 6f382259b..a922e4392 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd statistics. +%%% @doc emqttd statistics +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_stats). --author("Feng Lee "). - -include("emqttd.hrl"). -behaviour(gen_server). diff --git a/src/emqttd_sup.erl b/src/emqttd_sup.erl index d6bd648fe..018ff40dd 100644 --- a/src/emqttd_sup.erl +++ b/src/emqttd_sup.erl @@ -19,19 +19,17 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd supervisor. +%%% @doc emqttd top supervisor. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_sup). --author("Feng Lee "). +-behaviour(supervisor). -include("emqttd.hrl"). --behaviour(supervisor). - %% API -export([start_link/0, start_child/1, start_child/2]). diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 31c7a0d3e..73203d3f0 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd system monitor. +%%% @doc emqttd system monitor +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_sysmon). --author("Feng Lee "). - -behavior(gen_server). -export([start_link/1]). diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 07ed4482e..558f2dfa4 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Topic Functions +%%% @doc MQTT Topic Functions +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_topic). --author("Feng Lee "). - -import(lists, [reverse/1]). -export([match/2, validate/1, triples/1, words/1, wildcard/1]). diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 2bd430b05..d1dd2f289 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -19,22 +19,20 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% Trace MQTT packets/messages by clientid or topic. +%%% @doc Trace MQTT packets/messages by clientid or topic. +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_trace). --author("Feng Lee "). +-behaviour(gen_server). %% API Function Exports -export([start_link/0]). -export([start_trace/2, stop_trace/1, all_traces/0]). --behaviour(gen_server). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/emqttd_trie.erl b/src/emqttd_trie.erl index a22107086..a10021ed3 100644 --- a/src/emqttd_trie.erl +++ b/src/emqttd_trie.erl @@ -25,11 +25,12 @@ %%% [Trie](http://en.wikipedia.org/wiki/Trie) %%% %%% @end +%%% +%%% @author Feng Lee +%%% %%%----------------------------------------------------------------------------- -module(emqttd_trie). --author("Feng Lee "). - %% Mnesia Callbacks -export([mnesia/1]). diff --git a/src/emqttd_util.erl b/src/emqttd_util.erl index 81b1bd54d..e7abecb7f 100644 --- a/src/emqttd_util.erl +++ b/src/emqttd_util.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd utility functions. +%%% @doc emqttd utility functions +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_util). --author("Feng Lee "). - -export([apply_module_attributes/1, all_module_attributes/1, cancel_timer/1, diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index 5d0619631..0b7710a38 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd erlang vm. +%%% @doc emqttd erlang vm. +%%% +%%% @author huangdan %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_vm). --author('huangdan'). - -export([schedulers/0]). -export([microsecs/0]). diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index cb9a49e74..8e145f8ea 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -19,15 +19,13 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd websocket client. +%%% @doc emqttd websocket client +%%% +%%% @author Feng Lee %%% -%%% @end %%%----------------------------------------------------------------------------- -module(emqttd_ws_client). --author("Feng Lee "). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl").