From 938e45f1337900cd6ec57a680e01df927f7db639 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sat, 18 Apr 2015 00:06:28 +0800 Subject: [PATCH 01/15] how to route --- doc/route.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 doc/route.md diff --git a/doc/route.md b/doc/route.md new file mode 100644 index 000000000..c48a50886 --- /dev/null +++ b/doc/route.md @@ -0,0 +1,19 @@ + + +ClientA -> SessionA -> Route -> PubSub -> SessionB -> ClientB + + +ClientA -> Session -> PubSub -> Route -> SessionB -> ClientB + | | + Trie Subscriber + + +ClientPidA -> ClientPidB + + +ClientPidA -> SessionPidB -> ClientB + + +ClientPidA -> SessionPidA -> SessionPidB -> ClientPidB + + From 8694a825d31d01f1c6facfa6dd09885fc75ab5ab Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sat, 18 Apr 2015 18:36:57 +0800 Subject: [PATCH 02/15] route --- apps/emqttd/src/emqttd_router.erl | 37 ++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl index 6999abf21..d06ea379e 100644 --- a/apps/emqttd/src/emqttd_router.erl +++ b/apps/emqttd/src/emqttd_router.erl @@ -25,6 +25,8 @@ -include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -35,12 +37,32 @@ %%Router Chain--> --->In Out<--- -export([route/2]). +%% Mnesia Callbacks +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, {}). +%%%============================================================================= +%%% Mnesia callbacks +%%%============================================================================= +mnesia(boot) -> + %% topic table + ok = emqttd_mnesia:create_table(topic, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, mqtt_topic}, + {attributes, record_info(fields, mqtt_topic)}]). + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(topic), + %%%============================================================================= %%% API %%%============================================================================= @@ -71,7 +93,20 @@ route(From, Msg) -> %%% gen_server callbacks %%%============================================================================= init([]) -> - {ok, #state{}, hibernate}. + TabId = ets:new(?CLIENT_TABLE, [bag, + named_table, + public, + {read_concurrency, true}]), + %% local subscriber table, not shared with other nodes + ok = emqttd_mnesia:create_table(subscriber, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, mqtt_subscriber}, + {attributes, record_info(fields, mqtt_subscriber)}, + {index, [subpid]}, + {local_content, true}]); + + {ok, #state{tab = TabId}}. handle_call(_Request, _From, State) -> {reply, ok, State}. From fd8024821b9fdb93944ef6916b54e02f254eaf6b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Apr 2015 12:49:13 +0800 Subject: [PATCH 03/15] gproc pool --- apps/emqttd/include/emqttd.hrl | 2 +- apps/emqttd/src/emqttd_broker.erl | 10 +- apps/emqttd/src/emqttd_cm.erl | 4 +- apps/emqttd/src/emqttd_pooler.erl | 69 ++++++++++++++ apps/emqttd/src/emqttd_pooler_sup.erl | 57 ++++++++++++ apps/emqttd/src/emqttd_pubsub.erl | 87 ++++++++--------- apps/emqttd/src/emqttd_pubsub_sup.erl | 59 ++++++++++++ apps/emqttd/src/emqttd_router.erl | 129 -------------------------- rebar.config | 3 +- rel/files/app.config | 2 + 10 files changed, 242 insertions(+), 180 deletions(-) create mode 100644 apps/emqttd/src/emqttd_pooler.erl create mode 100644 apps/emqttd/src/emqttd_pooler_sup.erl create mode 100644 apps/emqttd/src/emqttd_pubsub_sup.erl delete mode 100644 apps/emqttd/src/emqttd_router.erl diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 023db8156..a1523c22d 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -57,7 +57,7 @@ -record(mqtt_subscriber, { topic :: binary(), qos = 0 :: 0 | 1 | 2, - subpid :: pid() + pid :: pid() }). -type mqtt_subscriber() :: #mqtt_subscriber{}. diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 2aa9ae8f7..764a1ad31 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -214,13 +214,13 @@ create(Topic) -> emqttd_pubsub:create(Topic). retain(Topic, Payload) when is_binary(Payload) -> - emqttd_router:route(broker, #mqtt_message{retain = true, - topic = Topic, - payload = Payload}). + emqttd_pubsub:publish(#mqtt_message{retain = true, + topic = Topic, + payload = Payload}). publish(Topic, Payload) when is_binary(Payload) -> - emqttd_router:route(broker, #mqtt_message{topic = Topic, - payload = Payload}). + emqttd_pubsub:publish(#mqtt_message{topic = Topic, + payload = Payload}). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 1692401a9..3a13a54ae 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -32,8 +32,6 @@ -define(SERVER, ?MODULE). --define(CLIENT_TABLE, mqtt_client). - %% API Exports -export([start_link/0]). @@ -52,6 +50,8 @@ -record(state, {tab}). +-define(CLIENT_TABLE, mqtt_client). + %%%============================================================================= %%% API %%%============================================================================= diff --git a/apps/emqttd/src/emqttd_pooler.erl b/apps/emqttd/src/emqttd_pooler.erl new file mode 100644 index 000000000..16b4bca57 --- /dev/null +++ b/apps/emqttd/src/emqttd_pooler.erl @@ -0,0 +1,69 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd pooler supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_pooler). + +-author('feng@emqtt.io'). + +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% API Exports +-export([start_link/1]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {id}). + +%%%============================================================================= +%%% API +%%%============================================================================= +-spec start_link(I :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. +start_link(I) -> + gen_server:start_link(?MODULE, [I], []). + +init([I]) -> + gproc_pool:connect_worker(pooler, {pooler, I}), + {ok, #state{id = I}}. + +handle_call(_Req, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{id = I}) -> + gproc_pool:disconnect_worker(pooler, {pooler, I}), ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + diff --git a/apps/emqttd/src/emqttd_pooler_sup.erl b/apps/emqttd/src/emqttd_pooler_sup.erl new file mode 100644 index 000000000..c2a62bbbb --- /dev/null +++ b/apps/emqttd/src/emqttd_pooler_sup.erl @@ -0,0 +1,57 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd pooler supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_pooler_sup). + +-author('feng@emqtt.io'). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_link/1]). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(PoolSize) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [PoolSize]). + +init([PoolSize]) -> + gproc_pool:new(pooler, random, [{size, PoolSize}]), + Children = lists:map( + fun(I) -> + gproc_pool:add_worker(pooler, {pooler, I}, I), + {{emqttd_pooler, I}, + {emqttd_pooler, start_link, [I]}, + permanent, 5000, worker, [emqttd_pooler]} + end, lists:seq(1, PoolSize)), + {ok, {{one_for_all, 10, 100}, Children}}. + diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 4d6586380..8c6d6ffa1 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -32,20 +32,16 @@ -include("emqttd.hrl"). --behaviour(gen_server). - --define(SERVER, ?MODULE). - --define(SUBACK_ERR, 128). - %% Mnesia Callbacks -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). +-behaviour(gen_server). + %% API Exports --export([start_link/0]). +-export([start_link/0, name/1]). -export([create/1, subscribe/1, subscribe/2, @@ -58,6 +54,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-define(SUBACK_ERR, 128). + -record(state, {submap :: map()}). %%%============================================================================= @@ -76,7 +74,7 @@ mnesia(boot) -> {ram_copies, [node()]}, {record_name, mqtt_subscriber}, {attributes, record_info(fields, mqtt_subscriber)}, - {index, [subpid]}, + {index, [pid]}, {local_content, true}]); mnesia(copy) -> @@ -85,7 +83,9 @@ mnesia(copy) -> %%%============================================================================= %%% API +%%% %%%============================================================================= +%%% %%------------------------------------------------------------------------------ %% @doc @@ -93,9 +93,12 @@ mnesia(copy) -> %% %% @end %%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec start_link(Opts) -> {ok, pid()} | ignore | {error, any()}. +start_link(Opts) -> + gen_server:start_link(?MODULE, [], []). + +name(I) -> + list_to_atom("emqttd_pubsub_" ++ integer_to_list(I)). %%------------------------------------------------------------------------------ %% @doc @@ -103,47 +106,41 @@ start_link() -> %% %% @end %%------------------------------------------------------------------------------ --spec create(binary()) -> ok. +-spec create(binary()) -> {atomic, ok} | {aborted, Reason :: any()}. create(Topic) when is_binary(Topic) -> - Record = #mqtt_topic{topic = Topic, node = node()}, - {atomic, ok} = mnesia:transaction(fun insert_topic/1, [Record]), ok. + TopicRecord = #mqtt_topic{topic = Topic, node = node()}, + Result = mnesia:transaction(fun create_topic/1, [TopicRecord]), + setstats(topics), Result. %%------------------------------------------------------------------------------ %% @doc -%% Subscribe topics +%% Subscribe topic or topics. %% %% @end %%------------------------------------------------------------------------------ --spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when +-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when Topic :: binary(), Qos :: mqtt_qos(). subscribe(Topics = [{_Topic, _Qos} | _]) -> {ok, lists:map(fun({Topic, Qos}) -> case subscribe(Topic, Qos) of - {ok, GrantedQos} -> + {ok, GrantedQos} -> GrantedQos; - Error -> - lager:error("Failed to subscribe '~s': ~p", [Topic, Error]), + {error, Error} -> + lager:error("subscribe '~s' error: ~p", [Topic, Error]), ?SUBACK_ERR end end, Topics)}. --spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}. +-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()} | {error, any()}. subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> - TopicRecord = #mqtt_topic{topic = Topic, node = node()}, - Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()}, - F = fun() -> - case insert_topic(TopicRecord) of - ok -> insert_subscriber(Subscriber); - Error -> Error - end - end, - case mnesia:transaction(F) of + case create(Topic) of {atomic, ok} -> - {ok, Qos}; - {aborted, Reason} -> - {error, Reason} - end. + Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = self()}, + ets:insert_new(?SUBSCRIBER_TAB, Subscriber), + {ok, Qos}; % Grant all qos + {aborted, Reason} -> + {error, Reason}. %%------------------------------------------------------------------------------ %% @doc @@ -153,15 +150,17 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> %%------------------------------------------------------------------------------ -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> - SubPid = self(), + Pattern = #mqtt_subscriber{topic = Topic, _ = '_', pid = self()}, + ets:match_delete(?SUBSCRIBER_TAB, Pattern), + TopicRecord = #mqtt_topic{topic = Topic, node = node()}, F = fun() -> %%TODO record name... - Pattern = #mqtt_subscriber{topic = Topic, _ = '_', subpid = SubPid}, [mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)], try_remove_topic(TopicRecord) end, - {atomic, _} = mneisa:transaction(F), ok; + %{atomic, _} = mneisa:transaction(F), + ok; unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> lists:foreach(fun(T) -> unsubscribe(T) end, Topics). @@ -193,7 +192,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - case mnesia:dirty_read(subscriber, Topic) of + case ets:lookup:(?SUBSCRIBER_TAB, Topic) of [] -> %%TODO: not right when clusted... setstats(dropped); @@ -307,15 +306,19 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= -insert_topic(Record = #mqtt_topic{topic = Topic}) -> + +-spec create_topic(#mqtt_topic{}) -> {atomic, ok} | {aborted, any()}. +create_topic(TopicRecord = #mqtt_topic{topic = Topic}) -> case mnesia:wread({topic, Topic}) of [] -> ok = emqttd_trie:insert(Topic), - mnesia:write(topic, Record, write); + mnesia:write(topic, TopicRecord, write); Records -> - case lists:member(Record, Records) of - true -> ok; - false -> mnesia:write(topic, Record, write) + case lists:member(TopicRecord, Records) of + true -> + ok; + false -> + mnesia:write(topic, TopicRecord, write) end end. diff --git a/apps/emqttd/src/emqttd_pubsub_sup.erl b/apps/emqttd/src/emqttd_pubsub_sup.erl new file mode 100644 index 000000000..35bd20fb6 --- /dev/null +++ b/apps/emqttd/src/emqttd_pubsub_sup.erl @@ -0,0 +1,59 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd pubsub supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_pubsub_sup). + +-author('feng@emqtt.io'). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/1]). + +%% Supervisor callbacks +-export([init/1]). + +start_link(Opts) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]). + +init([Opts]) -> + Schedulers = erlang:system_info(schedulers), + PoolSize = proplists:get_value(pool, Opts, Schedulers), + gproc_pool:new(pubsub, hash, [{size, PoolSize}]), + Children = lists:map( + fun(I) -> + gproc_pool:add_worker(pubsub, emqttd_pubsub:name(I), I), + child(I, Opts) + end, lists:seq(1, PoolSize)), + {ok, {{one_for_all, 10, 100}, Children}}. + +child(I, Opts) -> + {{emqttd_pubsub, I}, + {emqttd_pubsub, start_link, [I, Opts]}, + permanent, 5000, worker, [emqttd_pubsub]}. + diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl deleted file mode 100644 index d06ea379e..000000000 --- a/apps/emqttd/src/emqttd_router.erl +++ /dev/null @@ -1,129 +0,0 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - -%%TODO: route chain... statistics --module(emqttd_router). - --include_lib("emqtt/include/emqtt.hrl"). - --include("emqttd.hrl"). - --behaviour(gen_server). - --define(SERVER, ?MODULE). - -%% API Function Exports --export([start_link/0]). - -%%Router Chain--> --->In Out<--- --export([route/2]). - -%% Mnesia Callbacks --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {}). - -%%%============================================================================= -%%% Mnesia callbacks -%%%============================================================================= -mnesia(boot) -> - %% topic table - ok = emqttd_mnesia:create_table(topic, [ - {type, bag}, - {ram_copies, [node()]}, - {record_name, mqtt_topic}, - {attributes, record_info(fields, mqtt_topic)}]). - -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(topic), - -%%%============================================================================= -%%% API -%%%============================================================================= - -%%------------------------------------------------------------------------------ -%% @doc -%% Start emqttd router. -%% -%% @end -%%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%------------------------------------------------------------------------------ -%% @doc -%% Route mqtt message. From is clienid or module. -%% -%% @end -%%------------------------------------------------------------------------------ --spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok. -route(From, Msg) -> - lager:info("Route ~s from ~s", [emqtt_message:format(Msg), From]), - emqttd_msg_store:retain(Msg), - emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)). - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= -init([]) -> - TabId = ets:new(?CLIENT_TABLE, [bag, - named_table, - public, - {read_concurrency, true}]), - %% local subscriber table, not shared with other nodes - ok = emqttd_mnesia:create_table(subscriber, [ - {type, bag}, - {ram_copies, [node()]}, - {record_name, mqtt_subscriber}, - {attributes, record_info(fields, mqtt_subscriber)}, - {index, [subpid]}, - {local_content, true}]); - - {ok, #state{tab = TabId}}. - -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%============================================================================= -%%% Internal functions -%%%============================================================================= - diff --git a/rebar.config b/rebar.config index c68a1f394..253a79d3f 100644 --- a/rebar.config +++ b/rebar.config @@ -18,11 +18,12 @@ {validate_app_modules, true}. {sub_dirs, [ - "rel", + "rel", "apps/emqtt", "apps/emqttd"]}. {deps, [ + {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, {esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}}, {mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}} diff --git a/rel/files/app.config b/rel/files/app.config index 6e94bc81e..86ef3a181 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -72,6 +72,8 @@ {max_message_num, 100000}, {max_playload_size, 16#ffff} ]}, + %% PubSub + {pubsub, []}, %% Broker {broker, [ {sys_interval, 60} From ab84b6ff09d847159b21354f776c636bd032688a Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sun, 19 Apr 2015 19:35:09 +0800 Subject: [PATCH 04/15] integrate gproc --- apps/emqttd/src/emqttd.app.src | 2 +- apps/emqttd/src/emqttd_app.erl | 62 ++--- apps/emqttd/src/emqttd_bridge.erl | 4 +- apps/emqttd/src/emqttd_event.erl | 4 +- apps/emqttd/src/emqttd_http.erl | 8 +- apps/emqttd/src/emqttd_metrics.erl | 2 +- apps/emqttd/src/emqttd_pooler.erl | 29 ++- apps/emqttd/src/emqttd_protocol.erl | 2 +- apps/emqttd/src/emqttd_pubsub.erl | 320 +++++++++++++------------- apps/emqttd/src/emqttd_pubsub_sup.erl | 13 +- apps/emqttd/src/emqttd_session.erl | 6 +- apps/emqttd/src/emqttd_sysmon.erl | 4 +- doc/pubsub.md | 1 + rel/files/vm.args | 2 +- rel/reltool.config | 2 + 15 files changed, 230 insertions(+), 231 deletions(-) diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 26053005b..5841321f9 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.6.0"}, + {vsn, "0.6.1"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 941c565e4..e7e116dd6 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -33,19 +33,6 @@ %% Application callbacks -export([start/2, stop/1]). -%% Servers --define(SERVERS, [config, - event, - client, - session, - pubsub, - router, - broker, - metrics, - bridge, - access_control, - sysmon]). - -define(PRINT_MSG(Msg), io:format(Msg)). -define(PRINT(Format, Args), io:format(Format, Args)). @@ -79,7 +66,25 @@ print_vsn() -> ?PRINT("~s ~s is running now~n", [Desc, Vsn]). start_servers(Sup) -> - Servers = lists:flatten([server(Srv) || Srv <- ?SERVERS]), + {ok, SessOpts} = application:get_env(session), + {ok, PubSubOpts} = application:get_env(pubsub), + {ok, BrokerOpts} = application:get_env(broker), + {ok, MetricOpts} = application:get_env(metrics), + {ok, AccessOpts} = application:get_env(access_control), + Servers = [ + {"emqttd config", emqttd_config}, + {"emqttd event", emqttd_event}, + {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, + {"emqttd client manager", emqttd_cm}, + {"emqttd session manager", emqttd_sm}, + {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}, + {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts}, + %{"emqttd router", emqttd_router}, + {"emqttd broker", emqttd_broker, BrokerOpts}, + {"emqttd metrics", emqttd_metrics, MetricOpts}, + {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, + {"emqttd access control", emqttd_access_control, AccessOpts}, + {"emqttd system monitor", emqttd_sysmon}], [start_server(Sup, Server) || Server <- Servers]. start_server(_Sup, {Name, F}) when is_function(F) -> @@ -97,35 +102,6 @@ start_server(Sup, {Name, Server, Opts}) -> start_child(Sup, Server, Opts), ?PRINT_MSG("[done]~n"). -%%TODO: redesign later... -server(config) -> - {"emqttd config", emqttd_config}; -server(event) -> - {"emqttd event", emqttd_event}; -server(client) -> - {"emqttd client manager", emqttd_cm}; -server(session) -> - {ok, SessOpts} = application:get_env(session), - [{"emqttd session manager", emqttd_sm}, - {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}]; -server(pubsub) -> - {"emqttd pubsub", emqttd_pubsub}; -server(router) -> - {"emqttd router", emqttd_router}; -server(broker) -> - {ok, BrokerOpts} = application:get_env(broker), - {"emqttd broker", emqttd_broker, BrokerOpts}; -server(metrics) -> - {ok, MetricOpts} = application:get_env(metrics), - {"emqttd metrics", emqttd_metrics, MetricOpts}; -server(bridge) -> - {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}; -server(access_control) -> - {ok, AcOpts} = application:get_env(access_control), - {"emqttd access control", emqttd_access_control, AcOpts}; -server(sysmon) -> - {"emqttd system monitor", emqttd_sysmon}. - start_child(Sup, {supervisor, Name}) -> supervisor:start_child(Sup, supervisor_spec(Name)); start_child(Sup, Name) when is_atom(Name) -> diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 24a80d6a8..8e4896351 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -113,7 +113,7 @@ handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down} {noreply, State}; handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) -> - rpc:cast(Node, emqttd_router, route, [transform(Msg, State)]), + rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]), {noreply, State}; handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> @@ -172,5 +172,3 @@ transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos, end, Msg1#mqtt_message{topic = <>}. - - diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index b551655e9..8a6af1db8 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -75,13 +75,13 @@ init([]) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, - emqttd_router:route(event, Msg), + emqttd_pubsub:publish(Msg), {ok, State}; handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, - emqttd_router:route(event, Msg), + emqttd_pubsub:publish(Msg), {ok, State}; handle_event({subscribed, ClientId, TopicTable}, State) -> diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 5dd17dc19..4a24f4893 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) -> Message = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_router:route(http, #mqtt_message{qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + emqttd_pubsub:publish(#mqtt_message{qos = Qos, + retain = Retain, + topic = Topic, + payload = Message}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index fac00e034..ce03af907 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -220,7 +220,7 @@ systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). publish(Topic, Payload) -> - emqttd_router:route(metrics, #mqtt_message{topic = Topic, payload = Payload}). + emqttd_pubsub:publish(#mqtt_message{topic = Topic, payload = Payload}). new_metric({gauge, Name}) -> ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); diff --git a/apps/emqttd/src/emqttd_pooler.erl b/apps/emqttd/src/emqttd_pooler.erl index 16b4bca57..09ad7bf7a 100644 --- a/apps/emqttd/src/emqttd_pooler.erl +++ b/apps/emqttd/src/emqttd_pooler.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd pooler supervisor. +%%% emqttd pooler. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -30,10 +30,8 @@ -behaviour(gen_server). --define(SERVER, ?MODULE). - %% API Exports --export([start_link/1]). +-export([start_link/1, submit/1, async_submit/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -48,13 +46,26 @@ start_link(I) -> gen_server:start_link(?MODULE, [I], []). +submit(Fun) -> + gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity). + +async_submit(Fun) -> + gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}). + init([I]) -> gproc_pool:connect_worker(pooler, {pooler, I}), {ok, #state{id = I}}. +handle_call({submit, Fun}, _From, State) -> + {reply, run(Fun), State}; + handle_call(_Req, _From, State) -> {reply, ok, State}. +handle_cast({async_submit, Fun}, State) -> + run(Fun), + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. @@ -67,3 +78,13 @@ terminate(_Reason, #state{id = I}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +run({M, F, A}) -> + erlang:apply(M, F, A); +run(Fun) when is_function(Fun) -> + Fun(). + + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 6c6b29808..08de737f5 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -266,7 +266,7 @@ send_willmsg(_ClientId, undefined) -> ignore; %%TODO:should call session... send_willmsg(ClientId, WillMsg) -> - emqttd_router:route(ClientId, WillMsg). + emqttd_pubsub:publish(WillMsg). start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 8c6d6ffa1..4792d8f61 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -28,10 +28,10 @@ -author('feng@emqtt.io'). --include_lib("emqtt/include/emqtt.hrl"). - -include("emqttd.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). @@ -41,10 +41,10 @@ -behaviour(gen_server). %% API Exports --export([start_link/0, name/1]). +-export([start_link/2]). -export([create/1, - subscribe/1, subscribe/2, + subscribe/1, unsubscribe/1, publish/1, publish/2, %local node @@ -54,9 +54,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(SUBACK_ERR, 128). +-define(POOL, pubsub). --record(state, {submap :: map()}). +-record(state, {id, submap :: map()}). %%%============================================================================= %%% Mnesia callbacks @@ -83,93 +83,60 @@ mnesia(copy) -> %%%============================================================================= %%% API -%%% %%%============================================================================= -%%% %%------------------------------------------------------------------------------ -%% @doc -%% Start Pubsub. -%% -%% @end +%% @doc Start one pubsub. %%------------------------------------------------------------------------------ --spec start_link(Opts) -> {ok, pid()} | ignore | {error, any()}. -start_link(Opts) -> - gen_server:start_link(?MODULE, [], []). - -name(I) -> - list_to_atom("emqttd_pubsub_" ++ integer_to_list(I)). +-spec start_link(Id, Opts) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + Opts :: list(). +start_link(Id, Opts) -> + gen_server:start_link(?MODULE, [Id, Opts], []). %%------------------------------------------------------------------------------ -%% @doc -%% Create topic. -%% -%% @end +%% @doc Create topic. Notice That this transaction is not protected by pubsub pool. %%------------------------------------------------------------------------------ --spec create(binary()) -> {atomic, ok} | {aborted, Reason :: any()}. +-spec create(Topic :: binary()) -> ok | {error, Error :: any()}. create(Topic) when is_binary(Topic) -> - TopicRecord = #mqtt_topic{topic = Topic, node = node()}, - Result = mnesia:transaction(fun create_topic/1, [TopicRecord]), - setstats(topics), Result. + TopicR = #mqtt_topic{topic = Topic, node = node()}, + case mnesia:transaction(fun add_topic/1, [TopicR]) of + {atomic, ok} -> setstats(topics), ok; + {aborted, Error} -> {error, Error} + end. %%------------------------------------------------------------------------------ -%% @doc -%% Subscribe topic or topics. -%% -%% @end +%% @doc Subscribe topic. %%------------------------------------------------------------------------------ --spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when +-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), Qos :: mqtt_qos(). + +subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) -> + call({subscribe, self(), Topic, Qos}); + subscribe(Topics = [{_Topic, _Qos} | _]) -> - {ok, lists:map(fun({Topic, Qos}) -> - case subscribe(Topic, Qos) of - {ok, GrantedQos} -> - GrantedQos; - {error, Error} -> - lager:error("subscribe '~s' error: ~p", [Topic, Error]), - ?SUBACK_ERR - end - end, Topics)}. + call({subscribe, self(), Topics}). --spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()} | {error, any()}. -subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> - case create(Topic) of - {atomic, ok} -> - Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = self()}, - ets:insert_new(?SUBSCRIBER_TAB, Subscriber), - {ok, Qos}; % Grant all qos - {aborted, Reason} -> - {error, Reason}. - -%%------------------------------------------------------------------------------ -%% @doc -%% Unsubscribe Topic or Topics -%% -%% @end -%%------------------------------------------------------------------------------ +%% @doc Unsubscribe Topic or Topics -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> - Pattern = #mqtt_subscriber{topic = Topic, _ = '_', pid = self()}, - ets:match_delete(?SUBSCRIBER_TAB, Pattern), - - TopicRecord = #mqtt_topic{topic = Topic, node = node()}, - F = fun() -> - %%TODO record name... - [mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)], - try_remove_topic(TopicRecord) - end, - %{atomic, _} = mneisa:transaction(F), - ok; + cast({unsubscribe, self(), Topic}); unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> - lists:foreach(fun(T) -> unsubscribe(T) end, Topics). + cast({unsubscribe, self(), Topics}). + +call(Req) -> + Pid = gproc_pool:pick_worker(?POOL, self()), + lager:info("~p call ~p", [self(), Pid]), + gen_server:call(Pid, Req, infinity). + +cast(Msg) -> + Pid = gproc_pool:pick_worker(?POOL, self()), + gen_server:cast(Pid, Msg). %%------------------------------------------------------------------------------ -%% @doc -%% Publish to cluster node. -%% -%% @end +%% @doc Publish to cluster nodes. %%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. publish(Msg=#mqtt_message{topic=Topic}) -> @@ -184,107 +151,113 @@ publish(Topic, Msg) when is_binary(Topic) -> end end, match(Topic)). -%%------------------------------------------------------------------------------ -%% @doc -%% Dispatch Locally. Should only be called by publish. -%% -%% @end -%%------------------------------------------------------------------------------ +%% @doc Dispatch message locally. should only be called by publish. -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - case ets:lookup:(?SUBSCRIBER_TAB, Topic) of - [] -> - %%TODO: not right when clusted... - setstats(dropped); - Subscribers -> - lists:foreach( - fun(#mqtt_subscriber{qos = SubQos, subpid=SubPid}) -> - Msg1 = if - Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; - true -> Msg - end, - SubPid ! {dispatch, {self(), Msg1}} - end, Subscribers) - end. + Subscribers = mnesia:dirty_read(subscriber, Topic), + setstats(dropped, Subscribers =:= []), %%TODO:... + lists:foreach( + fun(#mqtt_subscriber{qos = SubQos, pid=SubPid}) -> + Msg1 = if + Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; + true -> Msg + end, + SubPid ! {dispatch, {self(), Msg1}} + end, Subscribers), + length(Subscribers). -%%------------------------------------------------------------------------------ -%% @doc -%% @private -%% Match topic. -%% -%% @end -%%------------------------------------------------------------------------------ -spec match(Topic :: binary()) -> [mqtt_topic()]. match(Topic) when is_binary(Topic) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]), - lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). + lists:append([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - %%TODO: really need? - process_flag(priority, high), +init([Id, _Opts]) -> process_flag(min_heap_size, 1024*1024), - mnesia:subscribe({table, topic, simple}), - mnesia:subscribe({table, subscriber, simple}), - {ok, #state{submap = maps:new()}}. + gproc_pool:connect_worker(pubsub, {?MODULE, Id}), + {ok, #state{id = Id, submap = maps:new()}}. + +handle_call({subscribe, SubPid, Topics}, _From, State) -> + TopicSubs = lists:map(fun({Topic, Qos}) -> + {#mqtt_topic{topic = Topic, node = node()}, + #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}} + end, Topics), + F = fun() -> + lists:map(fun add_subscriber/1, TopicSubs) + end, + case mnesia:transaction(F) of + {atomic, _Result} -> + setstats(all), + NewState = monitor_subscriber(SubPid, State), + %% grant all qos + {reply, {ok, [Qos || {_Topic, Qos} <- Topics]}, NewState}; + {aborted, Error} -> + {reply, {error, Error}, State} + end; + +handle_call({subscribe, SubPid, Topic, Qos}, _From, State) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}, + case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of + {atomic, ok} -> + setstats(all), + {reply, {ok, Qos}, monitor_subscriber(SubPid, State)}; + {aborted, Error} -> + {reply, {error, Error}, State} + end; handle_call(Req, _From, State) -> lager:error("Bad Request: ~p", [Req]), {reply, {error, badreq}, State}. +handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> + TopicSubs = lists:map(fun(Topic) -> + {#mqtt_topic{topic = Topic, node = node()}, + #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}} + end, Topics), + F = fun() -> lists:foreach(fun remove_subscriber/1, TopicSubs) end, + case mnesia:transaction(F) of + {atomic, _} -> ok; + {aborted, Error} -> lager:error("unsubscribe ~p error: ~p", [Topics, Error]) + end, + setstats(all), + {noreply, State}; + +handle_cast({unsubscribe, SubPid, Topic}, State) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}, + case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of + {atomic, _} -> ok; + {aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error]) + end, + setstats(all), + {noreply, State}; + handle_cast(Msg, State) -> lager:error("Bad Msg: ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, #mqtt_subscriber{subpid = Pid}, _ActivityId}}, - State = #state{submap = SubMap}) -> - NewSubMap = - case maps:is_key(Pid, SubMap) of - false -> - maps:put(Pid, erlang:monitor(process, Pid), SubMap); - true -> - SubMap - end, - setstats(subscribers), - {noreply, State#state{submap = NewSubMap}}; - -handle_info({mnesia_table_event, {write, #mqtt_topic{}, _ActivityId}}, State) -> - %%TODO: this is not right when clusterd. - setstats(topics), - {noreply, State}; - -%% {write, #topic{}, _ActivityId} -%% {delete_object, _OldRecord, _ActivityId} -%% {delete, {Tab, Key}, ActivityId} -handle_info({mnesia_table_event, _Event}, State) -> - setstats(topics), - setstats(subscribers), - {noreply, State}; - handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) -> case maps:is_key(DownPid, SubMap) of true -> Node = node(), F = fun() -> - Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid), + Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid), lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) -> mnesia:delete_object(subscriber, Sub, write), try_remove_topic(#mqtt_topic{topic = Topic, node = Node}) end, Subscribers) end, - NewState = case catch mnesia:transaction(F) of - {atomic, _} -> - State#state{submap = maps:remove(DownPid, SubMap)}; + {atomic, _} -> ok; {aborted, Reason} -> - lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]), - State + lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) end, - setstats(topics), setstats(subscribers), - {noreply, NewState}; + setstats(all), + {noreply, State#state{submap = maps:remove(DownPid, SubMap)}}; false -> lager:error("Unexpected 'DOWN' from ~p", [DownPid]), {noreply, State} @@ -295,10 +268,13 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, _State) -> - mnesia:unsubscribe({table, topic, simple}), - mnesia:unsubscribe({table, subscriber, simple}), - %%TODO: clear topics belongs to this node??? - ok. + TopicR = #mqtt_topic{_ = '_', node = node()}, + F = fun() -> + [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)] + %%TODO: remove trie?? + end, + mnesia:transaction(F), + setstats(all). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -307,28 +283,44 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= --spec create_topic(#mqtt_topic{}) -> {atomic, ok} | {aborted, any()}. -create_topic(TopicRecord = #mqtt_topic{topic = Topic}) -> +add_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:wread({topic, Topic}) of [] -> ok = emqttd_trie:insert(Topic), - mnesia:write(topic, TopicRecord, write); + mnesia:write(topic, TopicR, write); Records -> - case lists:member(TopicRecord, Records) of - true -> - ok; - false -> - mnesia:write(topic, TopicRecord, write) + case lists:member(TopicR, Records) of + true -> ok; + false -> mnesia:write(topic, TopicR, write) end end. -insert_subscriber(Subscriber) -> - mnesia:write(subscriber, Subscriber, write). +add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> + case add_topic(TopicR) of + ok -> + mnesia:write(subscriber, Subscriber, write); + Error -> + Error + end. -try_remove_topic(Record = #mqtt_topic{topic = Topic}) -> +monitor_subscriber(SubPid, State = #state{submap = SubMap}) -> + NewSubMap = case maps:is_key(SubPid, SubMap) of + false -> + maps:put(SubPid, erlang:monitor(process, SubPid), SubMap); + true -> + SubMap + end, + State#state{submap = NewSubMap}. + +remove_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> + [mnesia:delete_object(subscriber, Sub, write) || + Sub <- mnesia:match_object(subscriber, Subscriber, write)], + try_remove_topic(TopicR). + +try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:read({subscriber, Topic}) of [] -> - mnesia:delete_object(topic, Record, write), + mnesia:delete_object(topic, TopicR, write), case mnesia:read(topic, Topic) of [] -> emqttd_trie:delete(Topic); _ -> ok @@ -337,13 +329,23 @@ try_remove_topic(Record = #mqtt_topic{topic = Topic}) -> ok end. +%%%============================================================================= +%%% Stats functions +%%%============================================================================= +setstats(all) -> + setstats(topics), + setstats(subscribers); setstats(topics) -> - emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)); - + emqttd_broker:setstat('topics/count', + mnesia:table_info(topic, size)); setstats(subscribers) -> emqttd_broker:setstats('subscribers/count', 'subscribers/max', - mnesia:table_info(subscriber, size)); -setstats(dropped) -> + mnesia:table_info(subscriber, size)). + +setstats(dropped, false) -> + ignore; +setstats(dropped, true) -> emqttd_metrics:inc('messages/dropped'). + diff --git a/apps/emqttd/src/emqttd_pubsub_sup.erl b/apps/emqttd/src/emqttd_pubsub_sup.erl index 35bd20fb6..b6a7936f6 100644 --- a/apps/emqttd/src/emqttd_pubsub_sup.erl +++ b/apps/emqttd/src/emqttd_pubsub_sup.erl @@ -43,17 +43,14 @@ start_link(Opts) -> init([Opts]) -> Schedulers = erlang:system_info(schedulers), - PoolSize = proplists:get_value(pool, Opts, Schedulers), + PoolSize = proplists:get_value(pool_size, Opts, Schedulers), gproc_pool:new(pubsub, hash, [{size, PoolSize}]), Children = lists:map( fun(I) -> - gproc_pool:add_worker(pubsub, emqttd_pubsub:name(I), I), - child(I, Opts) + Name = {emqttd_pubsub, I}, + gproc_pool:add_worker(pubsub, Name, I), + {Name, {emqttd_pubsub, start_link, [I, Opts]}, + permanent, 5000, worker, [emqttd_pubsub]} end, lists:seq(1, PoolSize)), {ok, {{one_for_all, 10, 100}, Children}}. -child(I, Opts) -> - {{emqttd_pubsub, I}, - {emqttd_pubsub, start_link, [I, Opts]}, - permanent, 5000, worker, [emqttd_pubsub]}. - diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 8458a0075..2bca04699 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> %%------------------------------------------------------------------------------ -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). publish(Session, ClientId, {?QOS_0, Message}) -> - emqttd_router:route(ClientId, Message), Session; + emqttd_pubsub:publish(Message), Session; publish(Session, ClientId, {?QOS_1, Message}) -> - emqttd_router:route(ClientId, Message), Session; + emqttd_pubsub:publish(Message), Session; publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> @@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> puback(SessState = #session_state{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of - {ok, Msg} -> emqttd_router:route(ClientId, Msg); + {ok, Msg} -> emqttd_pubsub:publish(Msg); error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) end, SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; diff --git a/apps/emqttd/src/emqttd_sysmon.erl b/apps/emqttd/src/emqttd_sysmon.erl index 687de5b8c..ed676b1dd 100644 --- a/apps/emqttd/src/emqttd_sysmon.erl +++ b/apps/emqttd/src/emqttd_sysmon.erl @@ -55,7 +55,9 @@ start_link() -> %%%============================================================================= init([]) -> - erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]), + erlang:system_monitor(self(), [{long_gc, 5000}, + {large_heap, 8 * 1024 * 1024}, + busy_port]), {ok, #state{}}. handle_call(Request, _From, State) -> diff --git a/doc/pubsub.md b/doc/pubsub.md index 8978c29e3..69fa14a91 100644 --- a/doc/pubsub.md +++ b/doc/pubsub.md @@ -14,6 +14,7 @@ PubQos | SubQos | In Message | Out Message 2 | 1 | - | - 2 | 2 | - | - + ## Publish diff --git a/rel/files/vm.args b/rel/files/vm.args index 897fe97f3..4ff3744f2 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -24,5 +24,5 @@ #-env ERL_MAX_ETS_TABLES 1024 ## Tweak GC to run more often -##-env ERL_FULLSWEEP_AFTER 10 +##-env ERL_FULLSWEEP_AFTER 1000 # diff --git a/rel/reltool.config b/rel/reltool.config index 9d913f6d7..6a23dbb01 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -15,6 +15,7 @@ inets, goldrush, lager, + gproc, esockd, mochiweb, emqttd @@ -45,6 +46,7 @@ {app, inets, [{mod_cond, app},{incl_cond, include}]}, {app, goldrush, [{mod_cond, app}, {incl_cond, include}]}, {app, lager, [{mod_cond, app}, {incl_cond, include}]}, + {app, gproc, [{mod_cond, app}, {incl_cond, include}]}, {app, esockd, [{mod_cond, app}, {incl_cond, include}]}, {app, mochiweb, [{mod_cond, app}, {incl_cond, include}]}, {app, emqtt, [{mod_cond, app}, {incl_cond, include}]}, From 229bcb6873420a89263e911bffaea280fd793619 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 03:26:09 +0800 Subject: [PATCH 05/15] retain message when publish --- apps/emqttd/src/emqttd_broker.erl | 10 +++++----- apps/emqttd/src/emqttd_event.erl | 4 ++-- apps/emqttd/src/emqttd_http.erl | 8 ++++---- apps/emqttd/src/emqttd_metrics.erl | 3 ++- apps/emqttd/src/emqttd_msg_store.erl | 2 +- apps/emqttd/src/emqttd_protocol.erl | 2 +- apps/emqttd/src/emqttd_pubsub.erl | 23 ++++++++++++++++------- apps/emqttd/src/emqttd_session.erl | 6 +++--- 8 files changed, 34 insertions(+), 24 deletions(-) diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 764a1ad31..faff38698 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -214,13 +214,13 @@ create(Topic) -> emqttd_pubsub:create(Topic). retain(Topic, Payload) when is_binary(Payload) -> - emqttd_pubsub:publish(#mqtt_message{retain = true, - topic = Topic, - payload = Payload}). + emqttd_pubsub:publish(broker, #mqtt_message{retain = true, + topic = Topic, + payload = Payload}). publish(Topic, Payload) when is_binary(Payload) -> - emqttd_pubsub:publish(#mqtt_message{topic = Topic, - payload = Payload}). + emqttd_pubsub:publish(broker, #mqtt_message{topic = Topic, + payload = Payload}). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index 8a6af1db8..a4c6a9252 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -75,13 +75,13 @@ init([]) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, - emqttd_pubsub:publish(Msg), + emqttd_pubsub:publish(event, Msg), {ok, State}; handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, - emqttd_pubsub:publish(Msg), + emqttd_pubsub:publish(event, Msg), {ok, State}; handle_event({subscribed, ClientId, TopicTable}, State) -> diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 4a24f4893..c4f95f21a 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) -> Message = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_pubsub:publish(#mqtt_message{qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + emqttd_pubsub:publish(http, #mqtt_message{qos = Qos, + retain = Retain, + topic = Topic, + payload = Message}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index ce03af907..0941b16dd 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -220,7 +220,8 @@ systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). publish(Topic, Payload) -> - emqttd_pubsub:publish(#mqtt_message{topic = Topic, payload = Payload}). + emqttd_pubsub:publish(metrics, #mqtt_message{topic = Topic, + payload = Payload}). new_metric({gauge, Name}) -> ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); diff --git a/apps/emqttd/src/emqttd_msg_store.erl b/apps/emqttd/src/emqttd_msg_store.erl index 4b9aa191b..4e03c0e03 100644 --- a/apps/emqttd/src/emqttd_msg_store.erl +++ b/apps/emqttd/src/emqttd_msg_store.erl @@ -84,7 +84,7 @@ retain(Msg = #mqtt_message{topic = Topic, lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); {_, false}-> lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)]) - end. + end, ok. limit(table) -> proplists:get_value(max_message_num, env()); diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 08de737f5..592de67e6 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -266,7 +266,7 @@ send_willmsg(_ClientId, undefined) -> ignore; %%TODO:should call session... send_willmsg(ClientId, WillMsg) -> - emqttd_pubsub:publish(WillMsg). + emqttd_pubsub:publish(ClientId, WillMsg). start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 4792d8f61..5e726b54c 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -32,6 +32,8 @@ -include_lib("emqtt/include/emqtt.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). @@ -46,7 +48,7 @@ -export([create/1, subscribe/1, unsubscribe/1, - publish/1, publish/2, + publish/2, %local node dispatch/2, match/1]). @@ -138,12 +140,19 @@ cast(Msg) -> %%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes. %%------------------------------------------------------------------------------ --spec publish(Msg :: mqtt_message()) -> ok. -publish(Msg=#mqtt_message{topic=Topic}) -> - publish(Topic, Msg). - --spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). -publish(Topic, Msg) when is_binary(Topic) -> +-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok. +publish(From, Msg=#mqtt_message{topic=Topic}) -> + lager:info("~s PUBLISH to ~s", [From, Topic]), + %% Retain message first. Don't create retained topic. + case emqttd_msg_store:retain(Msg) of + ok -> + %TODO: why unset 'retain' flag? + publish(From, Topic, emqtt_message:unset_flag(Msg)); + ignore -> + publish(From, Topic, Msg) + end. + +publish(_From, Topic, Msg) when is_binary(Topic) -> lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) -> case Node =:= node() of true -> dispatch(Name, Msg); diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 2bca04699..a2a9a3ecd 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> %%------------------------------------------------------------------------------ -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). publish(Session, ClientId, {?QOS_0, Message}) -> - emqttd_pubsub:publish(Message), Session; + emqttd_pubsub:publish(ClientId, Message), Session; publish(Session, ClientId, {?QOS_1, Message}) -> - emqttd_pubsub:publish(Message), Session; + emqttd_pubsub:publish(ClientId, Message), Session; publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> @@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> puback(SessState = #session_state{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of - {ok, Msg} -> emqttd_pubsub:publish(Msg); + {ok, Msg} -> emqttd_pubsub:publish(ClientId, Msg); error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) end, SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; From a44e955d640314872c4e7504563f18b96fc712e4 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 03:26:44 +0800 Subject: [PATCH 06/15] groc 0.3.* --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 253a79d3f..2b3406b51 100644 --- a/rebar.config +++ b/rebar.config @@ -23,7 +23,7 @@ "apps/emqttd"]}. {deps, [ - {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, + {gproc, "0.3.*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, {esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}}, {mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}} From cf0068c2b33aa5d68e353e25e24ab774fd877e9b Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:23:54 +0800 Subject: [PATCH 07/15] ignore_lib_apps(Apps) --- apps/emqttd/src/emqttd_utils.erl | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/apps/emqttd/src/emqttd_utils.erl b/apps/emqttd/src/emqttd_utils.erl index 916c19fd8..c5ceecef9 100644 --- a/apps/emqttd/src/emqttd_utils.erl +++ b/apps/emqttd/src/emqttd_utils.erl @@ -40,7 +40,7 @@ all_module_attributes(Name) -> lists:usort( lists:append( [[{App, Module} || Module <- Modules] || - {App, _, _} <- application:loaded_applications(), + {App, _, _} <- ignore_lib_apps(application:loaded_applications()), {ok, Modules} <- [application:get_key(App, modules)]])), lists:foldl( fun ({App, Module}, Acc) -> @@ -62,3 +62,17 @@ module_attributes(Module) -> V end. +ignore_lib_apps(Apps) -> + LibApps = [kernel, stdlib, sasl, + syntax_tools, ssl, crypto, + mnesia, os_mon, inets, + goldrush, lager, gproc, + runtime_tools, snmp, otp_mibs, + public_key, asn1, ssh, + common_test, observer, webtool, + xmerl, tools, test_server, + compiler, debugger, eunit, + et, gen_logger, wx, + hipe, esockd, mochiweb], + [App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)]. + From 02614e8ae4cad776033e6636b3c2481cae690981 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:26:01 +0800 Subject: [PATCH 08/15] fix issue#95 - Topic filters in ACL rule should support 'eq' tag --- apps/emqttd/src/emqttd_access_rule.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index 990b7e5ff..fe4ea2e5e 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -58,7 +58,7 @@ compile({A, all}) when (A =:= allow) orelse (A =:= deny) -> {A, all}; compile({A, Who, Access, TopicFilters}) when (A =:= allow) orelse (A =:= deny) -> - {A, compile(who, Who), Access, [compile(topic, bin(Topic)) || Topic <- TopicFilters]}. + {A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}. compile(who, all) -> all; @@ -74,8 +74,10 @@ compile(who, {user, all}) -> compile(who, {user, Username}) -> {user, bin(Username)}; +compile(topic, {eq, Topic}) -> + {eq, emqtt_topic:words(bin(Topic))}; compile(topic, Topic) -> - Words = emqtt_topic:words(Topic), + Words = emqtt_topic:words(bin(Topic)), case 'pattern?'(Words) of true -> {pattern, Words}; false -> Words @@ -138,6 +140,8 @@ match_topics(Client, Topic, [TopicFilter|Filters]) -> false -> match_topics(Client, Topic, Filters) end. +match_topic(Topic, {eq, TopicFilter}) -> + Topic =:= TopicFilter; match_topic(Topic, TopicFilter) -> emqtt_topic:match(Topic, TopicFilter). From 50e033c71dd814511f836194da5fbb6aa84b57f9 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:29:56 +0800 Subject: [PATCH 09/15] TAB --- apps/emqttd/src/emqttd_acl_internal.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqttd/src/emqttd_acl_internal.erl b/apps/emqttd/src/emqttd_acl_internal.erl index 5903a7f62..c5c0ea0fb 100644 --- a/apps/emqttd/src/emqttd_acl_internal.erl +++ b/apps/emqttd/src/emqttd_acl_internal.erl @@ -37,7 +37,7 @@ %% ACL callbacks -export([init/1, check_acl/2, reload_acl/1, description/0]). --define(ACL_RULE_TABLE, mqtt_acl_rule). +-define(ACL_RULE_TAB, mqtt_acl_rule). -record(state, {acl_file, nomatch = allow}). @@ -48,7 +48,7 @@ %% @doc Read all rules. -spec all_rules() -> list(emqttd_access_rule:rule()). all_rules() -> - case ets:lookup(?ACL_RULE_TABLE, all_rules) of + case ets:lookup(?ACL_RULE_TAB, all_rules) of [] -> []; [{_, Rules}] -> Rules end. @@ -60,7 +60,7 @@ all_rules() -> %% @doc init internal ACL. -spec init(AclOpts :: list()) -> {ok, State :: any()}. init(AclOpts) -> - ets:new(?ACL_RULE_TABLE, [set, public, named_table, {read_concurrency, true}]), + ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]), AclFile = proplists:get_value(file, AclOpts), Default = proplists:get_value(nomatch, AclOpts, allow), State = #state{acl_file = AclFile, nomatch = Default}, @@ -71,10 +71,10 @@ load_rules(#state{acl_file = AclFile}) -> {ok, Terms} = file:consult(AclFile), Rules = [emqttd_access_rule:compile(Term) || Term <- Terms], lists:foreach(fun(PubSub) -> - ets:insert(?ACL_RULE_TABLE, {PubSub, + ets:insert(?ACL_RULE_TAB, {PubSub, lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) end, [publish, subscribe]), - ets:insert(?ACL_RULE_TABLE, {all_rules, Terms}). + ets:insert(?ACL_RULE_TAB, {all_rules, Terms}). filter(_PubSub, {allow, all}) -> true; @@ -103,7 +103,7 @@ check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) -> end. lookup(PubSub) -> - case ets:lookup(?ACL_RULE_TABLE, PubSub) of + case ets:lookup(?ACL_RULE_TAB, PubSub) of [] -> []; [{PubSub, Rules}] -> Rules end. From eff6bed994d00fc60ea02ccad9379e96ee311529 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:31:19 +0800 Subject: [PATCH 10/15] disc copies --- apps/emqttd/src/emqttd_auth_clientid.erl | 26 ++++++++++++------------ apps/emqttd/src/emqttd_auth_username.erl | 24 +++++++++++----------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/apps/emqttd/src/emqttd_auth_clientid.erl b/apps/emqttd/src/emqttd_auth_clientid.erl index 2d3fab999..55ad8dcd2 100644 --- a/apps/emqttd/src/emqttd_auth_clientid.erl +++ b/apps/emqttd/src/emqttd_auth_clientid.erl @@ -39,9 +39,9 @@ %% emqttd_auth callbacks -export([init/1, check/3, description/0]). --define(AUTH_CLIENTID_TABLE, mqtt_auth_clientid). +-define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). --record(?AUTH_CLIENTID_TABLE, {clientid, ipaddr, password}). +-record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}). add_clientid(ClientId) when is_binary(ClientId) -> R = #mqtt_auth_clientid{clientid = ClientId}, @@ -52,19 +52,19 @@ add_clientid(ClientId, Password) -> mnesia:transaction(fun() -> mnesia:write(R) end). lookup_clientid(ClientId) -> - mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId). + mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). all_clientids() -> - mnesia:dirty_all_keys(?AUTH_CLIENTID_TABLE). + mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). remove_clientid(ClientId) -> - mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TABLE, ClientId}) end). + mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end). init(Opts) -> - mnesia:create_table(?AUTH_CLIENTID_TABLE, [ + mnesia:create_table(?AUTH_CLIENTID_TAB, [ {ram_copies, [node()]}, - {attributes, record_info(fields, ?AUTH_CLIENTID_TABLE)}]), - mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), ram_copies), + {attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]), + mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), case proplists:get_value(file, Opts) of undefined -> ok; File -> load(File) @@ -80,9 +80,9 @@ check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, check(_Client, undefined, [{password, yes}|_]) -> {error, "Password undefined"}; check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) -> - case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of + case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of [] -> {error, "ClientId Not Found"}; - [#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext?? + [#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext?? _ -> {error, "Password Not Right"} end. @@ -118,10 +118,10 @@ load(Fd, eof, Clients) -> file:close(Fd). check_clientid_only(ClientId, IpAddr) -> - case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of + case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of [] -> {error, "ClientId Not Found"}; - [#?AUTH_CLIENTID_TABLE{ipaddr = undefined}] -> ok; - [#?AUTH_CLIENTID_TABLE{ipaddr = {_, {Start, End}}}] -> + [#?AUTH_CLIENTID_TAB{ipaddr = undefined}] -> ok; + [#?AUTH_CLIENTID_TAB{ipaddr = {_, {Start, End}}}] -> I = esockd_access:atoi(IpAddr), case I >= Start andalso I =< End of true -> ok; diff --git a/apps/emqttd/src/emqttd_auth_username.erl b/apps/emqttd/src/emqttd_auth_username.erl index 6407be5d7..af89cf352 100644 --- a/apps/emqttd/src/emqttd_auth_username.erl +++ b/apps/emqttd/src/emqttd_auth_username.erl @@ -38,35 +38,35 @@ %% emqttd_auth callbacks -export([init/1, check/3, description/0]). --define(AUTH_USERNAME_TABLE, mqtt_auth_username). +-define(AUTH_USERNAME_TAB, mqtt_auth_username). --record(?AUTH_USERNAME_TABLE, {username, password}). +-record(?AUTH_USERNAME_TAB, {username, password}). %%%============================================================================= %%% API %%%============================================================================= add_user(Username, Password) -> - R = #?AUTH_USERNAME_TABLE{username = Username, password = hash(Password)}, + R = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, mnesia:transaction(fun() -> mnesia:write(R) end). lookup_user(Username) -> - mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username). + mnesia:dirty_read(?AUTH_USERNAME_TAB, Username). remove_user(Username) -> - mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TABLE, Username}) end). + mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TAB, Username}) end). all_users() -> - mnesia:dirty_all_keys(?AUTH_USERNAME_TABLE). + mnesia:dirty_all_keys(?AUTH_USERNAME_TAB). %%%============================================================================= %%% emqttd_auth callbacks %%%============================================================================= init(Opts) -> - mnesia:create_table(?AUTH_USERNAME_TABLE, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, ?AUTH_USERNAME_TABLE)}]), - mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), ram_copies), + mnesia:create_table(?AUTH_USERNAME_TAB, [ + {disc_copies, [node()]}, + {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), + mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies), {ok, Opts}. check(#mqtt_client{username = undefined}, _Password, _Opts) -> @@ -74,10 +74,10 @@ check(#mqtt_client{username = undefined}, _Password, _Opts) -> check(_User, undefined, _Opts) -> {error, "Password undefined"}; check(#mqtt_client{username = Username}, Password, _Opts) -> - case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of + case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of [] -> {error, "Username Not Found"}; - [#?AUTH_USERNAME_TABLE{password = <>}] -> + [#?AUTH_USERNAME_TAB{password = <>}] -> case Hash =:= md5_hash(Salt, Password) of true -> ok; false -> {error, "Password Not Right"} From fb8833bb861f21afa2ef016578822afdcabc7ac8 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:31:52 +0800 Subject: [PATCH 11/15] TAB --- apps/emqttd/src/emqttd_broker.erl | 20 ++++++++++---------- apps/emqttd/src/emqttd_cm.erl | 16 ++++++++-------- apps/emqttd/src/emqttd_metrics.erl | 20 ++++++++++---------- apps/emqttd/src/emqttd_sm.erl | 10 ++++++---- 4 files changed, 34 insertions(+), 32 deletions(-) diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index faff38698..4d4dd9b18 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -34,7 +34,7 @@ -define(SERVER, ?MODULE). --define(BROKER_TABLE, mqtt_broker). +-define(BROKER_TAB, mqtt_broker). %% API Function Exports -export([start_link/1]). @@ -115,7 +115,7 @@ datetime() -> %%------------------------------------------------------------------------------ -spec getstats() -> [{atom(), non_neg_integer()}]. getstats() -> - ets:tab2list(?BROKER_TABLE). + ets:tab2list(?BROKER_TAB). %%------------------------------------------------------------------------------ %% @doc @@ -125,7 +125,7 @@ getstats() -> %%------------------------------------------------------------------------------ -spec getstat(atom()) -> non_neg_integer() | undefined. getstat(Name) -> - case ets:lookup(?BROKER_TABLE, Name) of + case ets:lookup(?BROKER_TAB, Name) of [{Name, Val}] -> Val; [] -> undefined end. @@ -138,7 +138,7 @@ getstat(Name) -> %%------------------------------------------------------------------------------ -spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean(). setstat(Stat, Val) -> - ets:update_element(?BROKER_TABLE, Stat, {2, Val}). + ets:update_element(?BROKER_TAB, Stat, {2, Val}). %%------------------------------------------------------------------------------ %% @doc @@ -148,13 +148,13 @@ setstat(Stat, Val) -> %%------------------------------------------------------------------------------ -spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean(). setstats(Stat, MaxStat, Val) -> - MaxVal = ets:lookup_element(?BROKER_TABLE, MaxStat, 2), + MaxVal = ets:lookup_element(?BROKER_TAB, MaxStat, 2), if Val > MaxVal -> - ets:update_element(?BROKER_TABLE, MaxStat, {2, Val}); + ets:update_element(?BROKER_TAB, MaxStat, {2, Val}); true -> ok end, - ets:update_element(?BROKER_TABLE, Stat, {2, Val}). + ets:update_element(?BROKER_TAB, Stat, {2, Val}). %%%============================================================================= %%% gen_server callbacks @@ -162,9 +162,9 @@ setstats(Stat, MaxStat, Val) -> init([Options]) -> random:seed(now()), - ets:new(?BROKER_TABLE, [set, public, named_table, {write_concurrency, true}]), + ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]), Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, - [ets:insert(?BROKER_TABLE, {Topic, 0}) || Topic <- Topics], + [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], [ok = create(systop(Topic)) || Topic <- Topics], @@ -191,7 +191,7 @@ handle_info(tick, State) -> publish(systop(uptime), list_to_binary(uptime(State))), publish(systop(datetime), list_to_binary(datetime())), [publish(systop(Stat), i2b(Val)) - || {Stat, Val} <- ets:tab2list(?BROKER_TABLE)], + || {Stat, Val} <- ets:tab2list(?BROKER_TAB)], {noreply, tick(State), hibernate}; handle_info(_Info, State) -> diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 3a13a54ae..4b763e1dc 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -50,7 +50,7 @@ -record(state, {tab}). --define(CLIENT_TABLE, mqtt_client). +-define(CLIENT_TAB, mqtt_client). %%%============================================================================= %%% API @@ -74,7 +74,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(?CLIENT_TABLE, ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. @@ -87,7 +87,7 @@ lookup(ClientId) when is_binary(ClientId) -> register(ClientId) when is_binary(ClientId) -> Pid = self(), %% this is atomic - case ets:insert_new(?CLIENT_TABLE, {ClientId, Pid, undefined}) of + case ets:insert_new(?CLIENT_TAB, {ClientId, Pid, undefined}) of true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid}); false -> gen_server:cast(?SERVER, {register, ClientId, Pid}) end. @@ -117,7 +117,7 @@ getstats() -> %%%============================================================================= init([]) -> - TabId = ets:new(?CLIENT_TABLE, [set, + TabId = ets:new(?CLIENT_TAB, [set, named_table, public, {write_concurrency, true}]), @@ -144,10 +144,10 @@ handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) -> {noreply, setstats(State)}; handle_cast({unregister, ClientId, Pid}, State) -> - case ets:lookup(?CLIENT_TABLE, ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef, [flush]), - ets:delete(?CLIENT_TABLE, ClientId); + ets:delete(?CLIENT_TAB, ClientId); [_] -> ignore; [] -> @@ -159,7 +159,7 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?CLIENT_TABLE, {'_', DownPid, MRef}), + ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -191,6 +191,6 @@ registerd(Tab, {ClientId, Pid}) -> setstats(State) -> emqttd_broker:setstats('clients/count', 'clients/max', - ets:info(?CLIENT_TABLE, size)), State. + ets:info(?CLIENT_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index 0941b16dd..9d2f21c77 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -36,7 +36,7 @@ -define(SERVER, ?MODULE). --define(METRIC_TABLE, mqtt_broker_metric). +-define(METRIC_TAB, mqtt_broker_metric). %% API Function Exports -export([start_link/1]). @@ -81,7 +81,7 @@ all() -> {ok, Count} -> maps:put(Metric, Count+Val, Map); error -> maps:put(Metric, Val, Map) end - end, #{}, ?METRIC_TABLE)). + end, #{}, ?METRIC_TAB)). %%------------------------------------------------------------------------------ %% @doc @@ -91,7 +91,7 @@ all() -> %%------------------------------------------------------------------------------ -spec value(atom()) -> non_neg_integer(). value(Metric) -> - lists:sum(ets:select(?METRIC_TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). + lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). %%------------------------------------------------------------------------------ %% @doc @@ -125,9 +125,9 @@ inc(Metric, Val) when is_atom(Metric) and is_integer(Val) -> %%------------------------------------------------------------------------------ -spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer(). inc(gauge, Metric, Val) -> - ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, Val}); + ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val}); inc(counter, Metric, Val) -> - ets:update_counter(?METRIC_TABLE, key(counter, Metric), {2, Val}). + ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}). %%------------------------------------------------------------------------------ %% @doc @@ -147,7 +147,7 @@ dec(gauge, Metric) -> %%------------------------------------------------------------------------------ -spec dec(gauge, atom(), pos_integer()) -> integer(). dec(gauge, Metric, Val) -> - ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, -Val}). + ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}). %%------------------------------------------------------------------------------ %% @doc @@ -158,7 +158,7 @@ dec(gauge, Metric, Val) -> set(Metric, Val) when is_atom(Metric) -> set(gauge, Metric, Val). set(gauge, Metric, Val) -> - ets:insert(?METRIC_TABLE, {key(gauge, Metric), Val}). + ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}). %%------------------------------------------------------------------------------ %% @doc @@ -180,7 +180,7 @@ init([Options]) -> random:seed(now()), Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table - ets:new(?METRIC_TABLE, [set, public, named_table, {write_concurrency, true}]), + ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), % Init metrics [new_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics @@ -224,11 +224,11 @@ publish(Topic, Payload) -> payload = Payload}). new_metric({gauge, Name}) -> - ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); + ets:insert(?METRIC_TAB, {{Name, 0}, 0}); new_metric({counter, Name}) -> Schedulers = lists:seq(1, erlang:system_info(schedulers)), - [ets:insert(?METRIC_TABLE, {{Name, I}, 0}) || I <- Schedulers]. + [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers]. tick(State = #state{pub_interval = PubInterval}) -> tick(PubInterval, State). diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index 977db4b6b..7c65bfb5d 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -44,7 +44,7 @@ -define(SERVER, ?MODULE). --define(SESSION_TABLE, mqtt_session). +-define(SESSION_TAB, mqtt_session). %% API Function Exports -export([start_link/0]). @@ -72,7 +72,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(?SESSION_TABLE, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. @@ -103,7 +103,7 @@ destroy_session(ClientId) -> init([]) -> process_flag(trap_exit, true), - TabId = ets:new(?SESSION_TABLE, [set, protected, named_table]), + TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), {ok, #state{tab = TabId}}. handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) -> @@ -157,8 +157,10 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= + setstats(State) -> emqttd_broker:setstats('sessions/count', 'sessions/max', - ets:info(?SESSION_TABLE, size)), State. + ets:info(?SESSION_TAB, size)), State. + From c4c0be44f332fe984e097f61dc725d8d893109bd Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:47:56 +0800 Subject: [PATCH 12/15] align --- apps/emqttd/src/emqttd_cm.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 4b763e1dc..6b9d34848 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -118,9 +118,9 @@ getstats() -> init([]) -> TabId = ets:new(?CLIENT_TAB, [set, - named_table, - public, - {write_concurrency, true}]), + named_table, + public, + {write_concurrency, true}]), {ok, #state{tab = TabId}}. handle_call(Req, _From, State) -> From 98a3110beb733951250a75e484d26f9c4515502c Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:48:45 +0800 Subject: [PATCH 13/15] 'eq' tag --- rel/files/acl.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/files/acl.config b/rel/files/acl.config index f7ce98042..0d8962d49 100644 --- a/rel/files/acl.config +++ b/rel/files/acl.config @@ -20,7 +20,7 @@ {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}. -{deny, all, subscribe, ["$SYS/#", "#"]}. +{deny, all, subscribe, [{eq, "$SYS/#"}, {eq, "#"}]}. {allow, all}. From fa5958feea0f8aebe2028fb0ab73fe92efc75c44 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:49:27 +0800 Subject: [PATCH 14/15] demo plugin --- .../src/emqttd_demo_acl.erl | 51 +++++++++++++++++++ .../src/emqttd_demo_auth.erl | 41 +++++++++++++++ .../src/emqttd_plugin_demo.app.src | 12 +++++ .../src/emqttd_plugin_demo_app.erl | 16 ++++++ .../src/emqttd_plugin_demo_sup.erl | 27 ++++++++++ 5 files changed, 147 insertions(+) create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_demo_acl.erl create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_demo_auth.erl create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl diff --git a/plugins/emqttd_plugin_demo/src/emqttd_demo_acl.erl b/plugins/emqttd_plugin_demo/src/emqttd_demo_acl.erl new file mode 100644 index 000000000..e92fd9733 --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_demo_acl.erl @@ -0,0 +1,51 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd demo acl module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_demo_acl). + +-author('feng@emqtt.io'). + +-include_lib("emqttd/include/emqttd.hrl"). + +-behaviour(emqttd_acl). + +-export([check_acl/3, reload_acl/0, description/0]). + +-spec check_acl(User, PubSub, Topic) -> {ok, allow | deny} | ignore | {error, any()} when + User :: mqtt_user(), + PubSub :: publish | subscribe, + Topic :: binary(). +check_acl(_User, _PubSub, _Topic) -> + ignore. + +reload_acl() -> + ok. + +description() -> + "Demo ACL Module". + + + diff --git a/plugins/emqttd_plugin_demo/src/emqttd_demo_auth.erl b/plugins/emqttd_plugin_demo/src/emqttd_demo_auth.erl new file mode 100644 index 000000000..956d2dcf1 --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_demo_auth.erl @@ -0,0 +1,41 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd demo auth module. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_demo_auth). + +-author('feng@emqtt.io'). + +-include_lib("emqttd/include/emqttd.hrl"). + +-behaviour(emqttd_auth). + +%% callbacks... +-export([check_login/2]). + +-spec check_login(mqtt_user(), undefined | binary()) -> true | false | ignore. +check_login(User, Password) -> + true. + diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src new file mode 100644 index 000000000..1fd8ce62d --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src @@ -0,0 +1,12 @@ +{application, emqttd_plugin_demo, + [ + {description, ""}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { emqttd_plugin_demo_app, []}}, + {env, []} + ]}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl new file mode 100644 index 000000000..9042a449a --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl @@ -0,0 +1,16 @@ +-module(emqttd_plugin_demo_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + emqttd_plugin_demo_sup:start_link(). + +stop(_State) -> + ok. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl new file mode 100644 index 000000000..65dad7a60 --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl @@ -0,0 +1,27 @@ +-module(emqttd_plugin_demo_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. + From f9143dda7b9c888fdf15710ee218ba7bbe205911 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 20 Apr 2015 04:50:37 +0800 Subject: [PATCH 15/15] 0.6.1 --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f4cfb91d..7e365f3f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ eMQTTD ChangeLog ================== +0.6.1-alpha (2015-04-20) +------------------------- + +Redesign PUBSUB + 0.6.0-alpha (2015-04-08) -------------------------