From 80117c1e8a55fa9de23aa17dd25895065d7a715f Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 3 Dec 2015 12:56:13 +0800 Subject: [PATCH] rewrite pubsub, router --- include/emqttd.hrl | 6 +- rel/files/emqttd.config.development | 10 +- rel/files/emqttd.config.production | 20 +-- src/emqttd.app.src | 6 +- src/emqttd_app.erl | 1 - src/emqttd_pubsub.erl | 178 +++++++++++++++++---------- src/emqttd_pubsub_helper.erl | 44 ++++++- src/emqttd_pubsub_sup.erl | 27 +---- src/emqttd_router.erl | 181 ++++++++++------------------ src/emqttd_router_sup.erl | 46 ------- src/emqttd_sm.erl | 24 ++-- 11 files changed, 255 insertions(+), 288 deletions(-) delete mode 100644 src/emqttd_router_sup.erl diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 739600324..c22cffa40 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -63,9 +63,9 @@ %% MQTT Subscription %%------------------------------------------------------------------------------ -record(mqtt_subscription, { - clientid :: binary() | atom(), - topic :: binary(), - qos = 0 :: 0 | 1 | 2 + subid :: binary() | atom(), + topic :: binary(), + qos = 0 :: 0 | 1 | 2 }). -type mqtt_subscription() :: #mqtt_subscription{}. diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index 506f1c48e..47360b4a7 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -146,11 +146,15 @@ {pubsub, [ %% Default should be scheduler numbers %% {pool_size, 8}, + + %% Subscription: disc | ram + {subscription, ram}, - %% Route aging time(second) - {shard, true}, + %% Route shard + {route_shard, true}, - {aging, 10} + %% Route aging time(seconds) + {route_aging, 10} ]}, %% Bridge diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 2fb4fb91b..e7cbd9c58 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -133,18 +133,20 @@ %% Max Payload Size of retained message {max_playload_size, 65536} ]}, - %% PubSub - {pubsub, [ - %% default should be scheduler numbers - %% {pool_size, 8} - ]}, - %% Router - {router, [ + %% PubSub and Router + {pubsub, [ %% Default should be scheduler numbers %% {pool_size, 8}, - %% Route aging time(second) - {aging, 5} + + %% Subscription: disc | ram + {subscription, ram}, + + %% Route shard + {route_shard, true}, + + %% Route aging time(seconds) + {route_aging, 10} ]}, %% Bridge diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 4e77ea1bb..3abd3b39b 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,12 +1,14 @@ {application, emqttd, [ {id, "emqttd"}, - {vsn, "0.13.1"}, + {vsn, "0.14.0"}, {description, "Erlang MQTT Broker"}, {modules, []}, {registered, []}, {applications, [kernel, - stdlib]}, + stdlib, + gproc, + esockd]}, {mod, {emqttd_app, []}}, {env, []} ]}. diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 005251757..09665bd99 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -70,7 +70,6 @@ start_listeners() -> start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, {"emqttd trace", emqttd_trace}, - {"emqttd router", {supervisor, emqttd_router_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 8ca3b88ab..1971150de 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -43,7 +43,8 @@ %% API Exports -export([start_link/3]). --export([create/1, subscribe/1, subscribe/2, unsubscribe/1, publish/1]). +-export([create/1, subscribe/1, subscribe/2, + unsubscribe/1, unsubscribe/2, publish/1]). %% Local node -export([match/1]). @@ -52,25 +53,35 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-ifdef(TEST). +-compile(export_all). +-endif. + -record(state, {pool, id}). -define(ROUTER, emqttd_router). +-define(HELPER, emqttd_pubsub_helper). + %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= mnesia(boot) -> - %% topic table + %% Topic Table ok = emqttd_mnesia:create_table(topic, [ {type, bag}, {ram_copies, [node()]}, {record_name, mqtt_topic}, {attributes, record_info(fields, mqtt_topic)}]), - %% subscription table + RamOrDisc = case env(subscription) of + disc -> disc_copies; + _ -> ram_copies + end, + %% Subscription Table ok = emqttd_mnesia:create_table(subscription, [ {type, bag}, - {ram_copies, [node()]}, + {RamOrDisc, [node()]}, {record_name, mqtt_subscription}, {attributes, record_info(fields, mqtt_subscription)}]); @@ -78,6 +89,9 @@ mnesia(copy) -> ok = emqttd_mnesia:copy_table(topic), ok = emqttd_mnesia:copy_table(subscription). +env(Key) -> + proplists:get_value(Key, emqttd_broker:env(pubsub)). + %%%============================================================================= %%% API %%%============================================================================= @@ -97,42 +111,46 @@ name(Id) -> list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)). %%------------------------------------------------------------------------------ -%% @doc Create topic. Notice That this transaction is not protected by pubsub pool +%% @doc Create Topic. %% @end %%------------------------------------------------------------------------------ -spec create(Topic :: binary()) -> ok | {error, Error :: any()}. create(Topic) when is_binary(Topic) -> - case mnesia:transaction(fun add_topic/1, [#mqtt_topic{topic = Topic, node = node()}]) of - {atomic, ok} -> setstats(topics), ok; + Record = #mqtt_topic{topic = Topic, node = node()}, + case mnesia:transaction(fun add_topic/1, [Record]) of + {atomic, ok} -> ok; {aborted, Error} -> {error, Error} end. %%------------------------------------------------------------------------------ -%% @doc Subscribe Topic +%% @doc Subscribe Topics %% @end %%------------------------------------------------------------------------------ --spec subscribe(Topic, Qos) -> {ok, Qos} when - Topic :: binary(), - Qos :: mqtt_qos() | mqtt_qos_name(). -subscribe(Topic, Qos) -> - %%TODO:... - subscribe([{Topic, Qos}]). - -spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), Qos :: mqtt_qos() | mqtt_qos_name(). -subscribe({Topic, Qos}) when is_binary(Topic) andalso (?IS_QOS(Qos) orelse is_atom(Qos)) -> - %%TODO:... +subscribe({Topic, Qos}) -> subscribe([{Topic, Qos}]); +subscribe(TopicTable) when is_list(TopicTable) -> + call({subscribe, {undefined, self()}, fixqos(TopicTable)}). -subscribe(TopicTable0 = [{_Topic, _Qos} | _]) -> - Self = self(), - TopicTable = [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable0], - ?ROUTER:add_routes(TopicTable, Self), - PubSub = gproc_pool:pick_worker(pubsub, Self), - SubReq = {subscribe, Self, TopicTable}, - gen_server2:call(PubSub, SubReq, infinity). +-spec subscribe(ClientId, {Topic, Qos} | list({Topic, Qos})) -> + {ok, Qos | list(Qos)} | {error, any()} when + ClientId :: binary(), + Topic :: binary(), + Qos :: mqtt_qos() | mqtt_qos_name(). +subscribe(ClientId, {Topic, Qos}) when is_binary(ClientId) -> + subscribe(ClientId, [{Topic, Qos}]); +subscribe(ClientId, TopicTable) when is_binary(ClientId) andalso is_list(TopicTable) -> + call({subscribe, {ClientId, self()}, fixqos(TopicTable)}). + +fixqos(TopicTable) -> + [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable]. + +call(Request) -> + PubSub = gproc_pool:pick_worker(pubsub, self()), + gen_server2:call(PubSub, Request, infinity). %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topic or Topics @@ -141,12 +159,18 @@ subscribe(TopicTable0 = [{_Topic, _Qos} | _]) -> -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> unsubscribe([Topic]); - unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> - Self = self(), - ?ROUTER:delete_routes(Topics, Self), - PubSub = gproc_pool:pick_worker(pubsub, Self), - gen_server2:cast(PubSub, {unsubscribe, Self, Topics}). + cast({unsubscribe, {undefined, self()}, Topics}). + +-spec unsubscribe(binary(), binary() | list(binary())) -> ok. +unsubscribe(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> + unsubscribe(ClientId, [Topic]); +unsubscribe(ClientId, Topics = [Topic|_]) when is_binary(Topic) -> + cast({unsubscribe, {ClientId, self()}, Topics}). + +cast(Msg) -> + PubSub = gproc_pool:pick_worker(pubsub, self()), + gen_server2:cast(PubSub, Msg). %%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes @@ -169,10 +193,13 @@ publish(Msg = #mqtt_message{from = From}) -> publish(Topic, Msg) when is_binary(Topic) -> lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) -> - rpc:cast(Node, ?ROUTER, route, [Name, Msg]) - end, match(Topic)). + rpc:cast(Node, ?ROUTER, route, [Name, Msg]) + end, match(Topic)). -%%TODO: Benchmark and refactor... +%%------------------------------------------------------------------------------ +%% @doc Match Topic Name with Topic Filters +%% @end +%%------------------------------------------------------------------------------ -spec match(Topic :: binary()) -> [mqtt_topic()]. match(Topic) when is_binary(Topic) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), @@ -182,16 +209,29 @@ match(Topic) when is_binary(Topic) -> %%% gen_server callbacks %%%============================================================================= -init([Pool, Id, _Opts]) -> +init([Pool, Id, Opts]) -> + ?ROUTER:init(Opts), ?GPROC_POOL(join, Pool, Id), + process_flag(priority, high), {ok, #state{pool = Pool, id = Id}}. -%%TODO: clientId??? -handle_call({subscribe, _SubPid, TopicTable}, _From, State) -> - Records = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable], - case mnesia:transaction(fun() -> [add_topic(Record) || Record <- Records] end) of - {atomic, _Result} -> - {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, setstats(State)}; +handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) -> + %% Clean aging topics + ?HELPER:clean([Topic || {Topic, _Qos} <- TopicTable]), + + %% Add routes first + ?ROUTER:add_routes(TopicTable, SubPid), + + %% Add topics + Node = node(), + TRecords = [#mqtt_topic{topic = Topic, node = Node} || {Topic, _Qos} <- TopicTable], + + %% Add subscriptions + case mnesia:transaction(fun add_topics/1, [TRecords]) of + {atomic, _} -> + %% store subscription + %% mnesia:async_dirty(fun add_subscriptions/2, [SubId, TopicTable]), + {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State}; {aborted, Error} -> {reply, {error, Error}, State} end; @@ -200,20 +240,29 @@ handle_call(Req, _From, State) -> lager:error("Bad Request: ~p", [Req]), {reply, {error, badreq}, State}. -%%TODO: clientId??? -handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> +handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State) -> + %% Delete routes first + ?ROUTER:delete_routes(Topics, SubPid), + + %% Remove subscriptions + mnesia:async_dirty(fun remove_subscriptions/2, [SubId, Topics]), + {noreply, State}; handle_cast(Msg, State) -> lager:error("Bad Msg: ~p", [Msg]), {noreply, State}. +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + ?ROUTER:delete_routes(DownPid), + {noreply, State, hibernate}; + handle_info(Info, State) -> lager:error("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id), setstats(all). + ?GPROC_POOL(leave, Pool, Id). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -222,6 +271,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +add_topics(Records) -> + lists:foreach(fun add_topic/1, Records). + add_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:wread({topic, Topic}) of [] -> @@ -234,35 +286,33 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) -> end end. -try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> - case mnesia:read({subscriber, Topic}) of - [] -> - mnesia:delete_object(topic, TopicR, write), - case mnesia:read(topic, Topic) of - [] -> emqttd_trie:delete(Topic); - _ -> ok - end; - _ -> - ok - end. +add_subscriptions(undefined, _TopicTable) -> + ok; +add_subscriptions(SubId, TopicTable) -> + lists:foreach(fun({Topic, Qos}) -> + %%TODO: this is not right... + Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos}, + mnesia:write(subscription, Subscription, write) + end,TopicTable). + +remove_subscriptions(undefined, _Topics) -> + ok; +remove_subscriptions(SubId, Topics) -> + lists:foreach(fun(Topic) -> + Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, + [mnesia:delete_object(subscription, Subscription, write) + || Subscription <- mnesia:match_object(subscription, Pattern, write)] + end, Topics). %%%============================================================================= -%%% Stats functions -%%%============================================================================= - -setstats(State) -> - emqttd_stats:setstats('topics/count', 'topics/max', - mnesia:table_info(topic, size)), State. - -%%%============================================================================= -%%% Trace functions +%%% Trace Functions %%%============================================================================= trace(publish, From, _Msg) when is_atom(From) -> - %%dont' trace broker publish + %% Dont' trace broker publish ignore; trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> lager:info([{client, From}, {topic, Topic}], - "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl index cd387425c..297344900 100644 --- a/src/emqttd_pubsub_helper.erl +++ b/src/emqttd_pubsub_helper.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0]). +-export([start_link/1, clean/1, setstats/1]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -50,15 +50,30 @@ %% API Function Definitions %% ------------------------------------------------------------------ -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(Opts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). + +clean(Topics) -> + ok. + +setstats(topic) -> + Size = mnesia:table_info(topic, size), + emqttd_stats:setstats('topics/count', 'topics/max', Size); + +setstats(subscription) -> + ok. %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ -init(Args) -> - {ok, Args}. +init([Opts]) -> + %% Aging Timer + AgingSecs = proplists:get_value(aging, Opts, 5), + + {ok, TRef} = timer:send_interval(timer:seconds(AgingSecs), aging), + + {ok, #state{aging = #aging{topics = [], timer = TRef}}}. handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -69,7 +84,8 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{aging = #aging{timer = TRef}}) -> + timer:cancel(TRef), TopicR = #mqtt_topic{_ = '_', node = node()}, F = fun() -> [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)] @@ -85,3 +101,19 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Function Definitions %% ------------------------------------------------------------------ +try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> + case mnesia:read({subscriber, Topic}) of + [] -> + mnesia:delete_object(topic, TopicR, write), + case mnesia:read(topic, Topic) of + [] -> emqttd_trie:delete(Topic); + _ -> ok + end; + _ -> + ok + end. + +%%%============================================================================= +%%% Stats functions +%%%============================================================================= + diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 33f892ee1..c28999055 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -43,38 +43,17 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]). init([Opts]) -> - %% Route Table - create_route_tabs(Opts), + %% PubSub Helper + Helper = {helper, {?HELPER, start_link, [Opts]}, + permanent, infinity, worker, [?HELPER]}, %% PubSub Pool Sup MFA = {emqttd_pubsub, start_link, [Opts]}, PoolSup = emqttd_pool_sup:spec(pool_sup, [ pubsub, hash, pool_size(Opts), MFA]), - - %% PubSub Helper - Helper = {helper, {?HELPER, start_link, [Opts]}, - permanent, infinity, worker, [?HELPER]}, {ok, {{one_for_all, 10, 60}, [Helper, PoolSup]}}. pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Opts, Schedulers). -create_route_tabs(_Opts) -> - TabOpts = [bag, public, named_table, - {write_concurrency, true}], - %% Route Table: Topic -> {Pid, QoS} - %% Route Shard: {Topic, Shard} -> {Pid, QoS} - ensure_tab(route, TabOpts), - - %% Reverse Route Table: Pid -> {Topic, QoS} - ensure_tab(reverse_route, TabOpts). - -ensure_tab(Tab, Opts) -> - case ets:info(Tab, name) of - undefined -> - ets:new(Tab, Opts); - _ -> - ok - end. - diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index d6b4019f2..2423e26ab 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -36,64 +36,85 @@ %%%----------------------------------------------------------------------------- -module(emqttd_router). --behaviour(gen_server2). - -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). --export([start_link/2, add_routes/1, add_routes/2, route/2, +-export([init/1, lookup/1, route/2, add_routes/2, delete_routes/1, delete_routes/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%%TODO: test... --compile(export_all). - -%%%============================================================================= -%%% API Function Definitions -%%%============================================================================= - %%------------------------------------------------------------------------------ -%% @doc Start router. +%% @doc Create route tables. %% @end %%------------------------------------------------------------------------------ -start_link(Id, Opts) -> - gen_server2:start_link(?MODULE, [Id, Opts], []). +init(_Opts) -> + TabOpts = [bag, public, named_table, + {write_concurrency, true}], + %% Route Table: Topic -> {Pid, QoS} + %% Route Shard: {Topic, Shard} -> {Pid, QoS} + ensure_tab(route, TabOpts), + + %% Reverse Route Table: Pid -> {Topic, QoS} + ensure_tab(reverse_route, TabOpts). + +ensure_tab(Tab, Opts) -> + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, Opts); + _ -> + ok + end. %%------------------------------------------------------------------------------ %% @doc Add Routes. %% @end %%------------------------------------------------------------------------------ --spec add_routes(list({binary(), mqtt_qos()})) -> ok. -add_routes(TopicTable) -> - add_routes(TopicTable, self()). - -spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok. -add_routes(TopicTable, Pid) -> - Router = gproc_pool:pick_worker(router, Pid), - gen_server2:cast(Router, {add_routes, TopicTable, Pid}). +add_routes(TopicTable, Pid) when is_pid(Pid) -> + case lookup(Pid) of + [] -> + erlang:monitor(process, Pid), + insert_routes(TopicTable, Pid); + TopicInEts -> + {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), + update_routes(UpdatedTopics, Pid), + insert_routes(NewTopics, Pid) + end. %%------------------------------------------------------------------------------ -%% @doc Lookup topics that a pid subscribed. +%% @doc Lookup Routes %% @end %%------------------------------------------------------------------------------ -spec lookup(pid()) -> list({binary(), mqtt_qos()}). lookup(Pid) when is_pid(Pid) -> [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. +%%------------------------------------------------------------------------------ +%% @doc Delete Routes. +%% @end +%%------------------------------------------------------------------------------ +-spec delete_routes(list(binary()), pid()) -> ok. +delete_routes(Topics, Pid) -> + Routes = [{Topic, Pid} || Topic <- Topics], + lists:foreach(fun delete_route/1, Routes). + +-spec delete_routes(pid()) -> ok. +delete_routes(Pid) when is_pid(Pid) -> + Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup(Pid)], + ets:delete(reverse_route, Pid), + lists:foreach(fun delete_route_only/1, Routes). + %%------------------------------------------------------------------------------ %% @doc Route Message on Local Node. %% @end %%------------------------------------------------------------------------------ --spec route(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). +-spec route(binary(), mqtt_message()) -> non_neg_integer(). route(Queue = <<"$Q/", _Q>>, Msg) -> case ets:lookup(route, Queue) of [] -> setstats(dropped, true); Routes -> - Idx = random:uniform(length(Routes)), + Idx = crypto:rand_uniform(1, length(Routes) + 1), {_, SubPid, SubQos} = lists:nth(Idx, Routes), SubPid ! {dispatch, tune_qos(SubQos, Msg)} end; @@ -111,86 +132,6 @@ tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos -> tune_qos(_SubQos, Msg) -> Msg. -%%------------------------------------------------------------------------------ -%% @doc Delete Routes. -%% @end -%%------------------------------------------------------------------------------ --spec delete_routes(list(binary())) -> ok. -delete_routes(Topics) -> - delete_routes(Topics, self()). - --spec delete_routes(list(binary()), pid()) -> ok. -delete_routes(Topics, Pid) -> - Router = gproc_pool:pick_worker(router, Pid), - gen_server2:cast(Router, {delete_routes, Topics, Pid}). - -%%%============================================================================= -%%% gen_server Function Definitions -%%%============================================================================= - -init([Id, Opts]) -> - %% Only ETS Operations - process_flag(priority, high), - - %% Aging Timer - AgingSecs = proplists:get_value(aging, Opts, 5), - - {ok, TRef} = timer:send_interval(timer:seconds(AgingSecs), aging), - - gproc_pool:connect_worker(router, {?MODULE, Id}), - - {ok, #state{aging = #aging{topics = [], timer = TRef}}}. - -handle_call(Req, _From, State) -> - lager:error("Unexpected Request: ~p", [Req]), - {reply, {error, unsupported_req}, State}. - -handle_cast({add_routes, TopicTable, Pid}, State) -> - case lookup(Pid) of - [] -> - erlang:monitor(process, Pid), - ets_add_routes(TopicTable, Pid); - TopicInEts -> - {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), - ets_update_routes(UpdatedTopics, Pid), - ets_add_routes(NewTopics, Pid) - end, - {noreply, State}; - -handle_cast({delete, Topics, Pid}, State) -> - Routes = [{Topic, Pid} || Topic <- Topics], - lists:foreach(fun ets_delete_route/1, Routes), - %% TODO: aging route...... - {noreply, State}; - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> - Topics = [Topic || {Topic, _Qos} <- lookup(DownPid)], - ets:delete(reverse_route, DownPid), - lists:foreach(fun(Topic) -> - ets:match_delete(route, {Topic, DownPid, '_'}) - end, Topics), - %% TODO: aging route...... - {noreply, State}; - -handle_info(aging, State = #state{aging = #aging{topics = Topics}}) -> - %%TODO.. aging - %%io:format("Aging Topics: ~p~n", [Topics]), - {noreply, State}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, #state{id = Id, aging = #aging{timer = TRef}}) -> - timer:cancel(TRef), - gproc_pool:connect_worker(route, {?MODULE, Id}), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - %%%============================================================================= %%% Internal Functions %%%============================================================================= @@ -211,39 +152,41 @@ diff([{Topic, Qos}|TopicTable], TopicInEts, NewAcc, UpAcc) -> diff(TopicTable, TopicInEts, [{Topic, Qos}|NewAcc], UpAcc) end. -ets_add_routes([], _Pid) -> +insert_routes([], _Pid) -> ok; -ets_add_routes(TopicTable, Pid) -> +insert_routes(TopicTable, Pid) -> {Routes, ReverseRoutes} = routes(TopicTable, Pid), ets:insert(route, Routes), ets:insert(reverse_route, ReverseRoutes). -ets_update_routes([], _Pid) -> +update_routes([], _Pid) -> ok; -ets_update_routes(TopicTable, Pid) -> +update_routes(TopicTable, Pid) -> {Routes, ReverseRoutes} = routes(TopicTable, Pid), - lists:foreach(fun ets_update_route/1, Routes), - lists:foreach(fun ets_update_reverse_route/1, ReverseRoutes). + lists:foreach(fun update_route/1, Routes), + lists:foreach(fun update_reverse_route/1, ReverseRoutes). -ets_update_route(Route = {Topic, Pid, _Qos}) -> +update_route(Route = {Topic, Pid, _Qos}) -> ets:match_delete(route, {Topic, Pid, '_'}), ets:insert(route, Route). -ets_update_reverse_route(RevRoute = {Pid, Topic, _Qos}) -> +update_reverse_route(RevRoute = {Pid, Topic, _Qos}) -> ets:match_delete(reverse_route, {Pid, Topic, '_'}), ets:insert(reverse_route, RevRoute). -ets_delete_route({Topic, Pid}) -> - ets:match_delete(reverse_route, {Pid, Topic, '_'}), - ets:match_delete(route, {Topic, Pid, '_'}). - routes(TopicTable, Pid) -> F = fun(Topic, Qos) -> {{Topic, Pid, Qos}, {Pid, Topic, Qos}} end, lists:unzip([F(Topic, Qos) || {Topic, Qos} <- TopicTable]). +delete_route({Topic, Pid}) -> + ets:match_delete(reverse_route, {Pid, Topic, '_'}), + ets:match_delete(route, {Topic, Pid, '_'}). + +delete_route_only({Topic, Pid}) -> + ets:match_delete(route, {Topic, Pid, '_'}). + setstats(dropped, false) -> ignore; - setstats(dropped, true) -> emqttd_metrics:inc('messages/dropped'). diff --git a/src/emqttd_router_sup.erl b/src/emqttd_router_sup.erl deleted file mode 100644 index 8193ccea8..000000000 --- a/src/emqttd_router_sup.erl +++ /dev/null @@ -1,46 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc Router Supervisor -%%% -%%% @author Feng Lee -%%% -%%%----------------------------------------------------------------------------- --module(emqttd_router_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -start_link() -> - Opts = emqttd_broker:env(router), - supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]). - -init([Opts]) -> - create_route_tabs(Opts), - MFA = {emqttd_router, start_link, [Opts]}, - PoolSup = emqttd_pool_sup:spec(pool_sup, [router, hash, MFA]), - {ok, {{one_for_all, 10, 3600}, [PoolSup]}}. - diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 742e66d87..36645a4d9 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -28,6 +28,8 @@ -include("emqttd.hrl"). +-include("emqttd_internal.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). @@ -35,7 +37,7 @@ -copy_mnesia({mnesia, [copy]}). %% API Function Exports --export([start_link/1, pool/0]). +-export([start_link/2]). -export([start_session/2, lookup_session/1]). @@ -50,7 +52,7 @@ %% gen_server2 priorities -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). --record(state, {id}). +-record(state, {pool, id}). -define(POOL, ?MODULE). @@ -64,7 +66,7 @@ %%%============================================================================= mnesia(boot) -> - %% global session... + %% Global session... ok = emqttd_mnesia:create_table(session, [ {type, ordered_set}, {ram_copies, [node()]}, @@ -83,9 +85,9 @@ mnesia(copy) -> %% @doc Start a session manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. -start_link(Id) -> - gen_server2:start_link({local, name(Id)}, ?MODULE, [Id], []). +-spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. +start_link(Pool, Id) -> + gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id], []). name(Id) -> list_to_atom("emqttd_sm_" ++ integer_to_list(Id)). @@ -141,9 +143,9 @@ call(SM, Req) -> %%% gen_server callbacks %%%============================================================================= -init([Id]) -> - gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{id = Id}}. +init([Pool, Id]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id}}. prioritise_call(_Msg, _From, _Len, _State) -> 1. @@ -197,8 +199,8 @@ handle_info(Info, State) -> lager:error("Unexpected Info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id). code_change(_OldVsn, State, _Extra) -> {ok, State}.