From db8b7c9d8248303e7944129fa0b6b8ecf470e242 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 2 Dec 2015 18:01:51 +0800 Subject: [PATCH] 0.14 --- TODO | 7 + doc/pool.md | 7 + src/emqttd_pool_sup.erl | 62 +++++++++ src/emqttd_pubsub_helper.erl | 87 ++++++++++++ src/emqttd_router.erl | 249 +++++++++++++++++++++++++++++++++++ src/emqttd_router_sup.erl | 46 +++++++ 6 files changed, 458 insertions(+) create mode 100644 TODO create mode 100644 doc/pool.md create mode 100644 src/emqttd_pool_sup.erl create mode 100644 src/emqttd_pubsub_helper.erl create mode 100644 src/emqttd_router.erl create mode 100644 src/emqttd_router_sup.erl diff --git a/TODO b/TODO new file mode 100644 index 000000000..f1fa5239f --- /dev/null +++ b/TODO @@ -0,0 +1,7 @@ + +TODO 1. refactor gproc_pool usage + +TODO 2. emqttd_router, emqttd_pubsub to route message + +TODO 3. sup, pool_sup, manager...... + diff --git a/doc/pool.md b/doc/pool.md new file mode 100644 index 000000000..1f032fd55 --- /dev/null +++ b/doc/pool.md @@ -0,0 +1,7 @@ +sup(one_for_all) + manager + pool_sup(one_for_one) + worker1 + worker2 + ... + workerN diff --git a/src/emqttd_pool_sup.erl b/src/emqttd_pool_sup.erl new file mode 100644 index 000000000..c19f61b8e --- /dev/null +++ b/src/emqttd_pool_sup.erl @@ -0,0 +1,62 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc Common Pool Supervisor +%%% +%%% @author Feng Lee +%%% +%%%----------------------------------------------------------------------------- +-module(emqttd_pool_sup). + +-behaviour(supervisor). + +%% API +-export([spec/2, start_link/3, start_link/4]). + +%% Supervisor callbacks +-export([init/1]). + +-spec spec(any(), list()) -> supervisor:child_spec(). +spec(Id, Args) -> + {Id, {?MODULE, start_link, Args}, + transient, infinity, supervisor, [?MODULE]}. + +-spec start_link(atom(), atom(), mfa()) -> {ok, pid()} | {error, any()}. +start_link(Pool, Type, MFA) -> + Schedulers = erlang:system_info(schedulers), + start_link(Pool, Type, Schedulers, MFA). + +-spec start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}. +start_link(Pool, Type, Size, MFA) -> + supervisor:start_link({local, sup_name(Pool)}, ?MODULE, [Pool, Type, Size, MFA]). + +sup_name(Pool) -> + list_to_atom(atom_to_list(Pool) ++ "_pool_sup"). + +init([Pool, Type, Size, {M, F, Args}]) -> + emqttd:ensure_pool(Pool, Type, [{size, Size}]), + {ok, {{one_for_one, 10, 3600}, [ + begin + gproc_pool:add_worker(Pool, {Pool, I}, I), + {{M, I}, {M, F, [Pool, I | Args]}, + transient, 5000, worker, [M]} + end || I <- lists:seq(1, Size)]}}. + diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl new file mode 100644 index 000000000..cd387425c --- /dev/null +++ b/src/emqttd_pubsub_helper.erl @@ -0,0 +1,87 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc PubSub Helper +%%% +%%% @author Feng Lee +%%% +%%%----------------------------------------------------------------------------- +-module(emqttd_pubsub_helper). + +-behaviour(gen_server). + +-include("emqttd.hrl"). + +-define(SERVER, ?MODULE). + +%% API Function Exports +-export([start_link/0]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(aging, {topics, timer}). + +-record(state, {aging :: #aging{}}). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init(Args) -> + {ok, Args}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + TopicR = #mqtt_topic{_ = '_', node = node()}, + F = fun() -> + [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)] + %%TODO: remove trie?? + end, + mnesia:transaction(F), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ + diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl new file mode 100644 index 000000000..d6b4019f2 --- /dev/null +++ b/src/emqttd_router.erl @@ -0,0 +1,249 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc MQTT Message Router on Local Node +%%% +%%% Route Table: +%%% +%%% Topic -> {Pid1, Qos}, {Pid2, Qos}, ... +%%% +%%% Reverse Route Table: +%%% +%%% Pid -> {Topic1, Qos}, {Topic2, Qos}, ... +%%% +%%% @end +%%% +%%% @author Feng Lee +%%% +%%%----------------------------------------------------------------------------- +-module(emqttd_router). + +-behaviour(gen_server2). + +-include("emqttd.hrl"). + +-include("emqttd_protocol.hrl"). + +-export([start_link/2, add_routes/1, add_routes/2, route/2, + delete_routes/1, delete_routes/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%TODO: test... +-compile(export_all). + +%%%============================================================================= +%%% API Function Definitions +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Start router. +%% @end +%%------------------------------------------------------------------------------ +start_link(Id, Opts) -> + gen_server2:start_link(?MODULE, [Id, Opts], []). + +%%------------------------------------------------------------------------------ +%% @doc Add Routes. +%% @end +%%------------------------------------------------------------------------------ +-spec add_routes(list({binary(), mqtt_qos()})) -> ok. +add_routes(TopicTable) -> + add_routes(TopicTable, self()). + +-spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok. +add_routes(TopicTable, Pid) -> + Router = gproc_pool:pick_worker(router, Pid), + gen_server2:cast(Router, {add_routes, TopicTable, Pid}). + +%%------------------------------------------------------------------------------ +%% @doc Lookup topics that a pid subscribed. +%% @end +%%------------------------------------------------------------------------------ +-spec lookup(pid()) -> list({binary(), mqtt_qos()}). +lookup(Pid) when is_pid(Pid) -> + [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. + +%%------------------------------------------------------------------------------ +%% @doc Route Message on Local Node. +%% @end +%%------------------------------------------------------------------------------ +-spec route(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). +route(Queue = <<"$Q/", _Q>>, Msg) -> + case ets:lookup(route, Queue) of + [] -> + setstats(dropped, true); + Routes -> + Idx = random:uniform(length(Routes)), + {_, SubPid, SubQos} = lists:nth(Idx, Routes), + SubPid ! {dispatch, tune_qos(SubQos, Msg)} + end; + +route(Topic, Msg) -> + Routes = ets:lookup(route, Topic), + setstats(dropped, Routes =:= []), + lists:foreach( + fun({_Topic, SubPid, SubQos}) -> + SubPid ! {dispatch, tune_qos(SubQos, Msg)} + end, Routes). + +tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos -> + Msg#mqtt_message{qos = SubQos}; +tune_qos(_SubQos, Msg) -> + Msg. + +%%------------------------------------------------------------------------------ +%% @doc Delete Routes. +%% @end +%%------------------------------------------------------------------------------ +-spec delete_routes(list(binary())) -> ok. +delete_routes(Topics) -> + delete_routes(Topics, self()). + +-spec delete_routes(list(binary()), pid()) -> ok. +delete_routes(Topics, Pid) -> + Router = gproc_pool:pick_worker(router, Pid), + gen_server2:cast(Router, {delete_routes, Topics, Pid}). + +%%%============================================================================= +%%% gen_server Function Definitions +%%%============================================================================= + +init([Id, Opts]) -> + %% Only ETS Operations + process_flag(priority, high), + + %% Aging Timer + AgingSecs = proplists:get_value(aging, Opts, 5), + + {ok, TRef} = timer:send_interval(timer:seconds(AgingSecs), aging), + + gproc_pool:connect_worker(router, {?MODULE, Id}), + + {ok, #state{aging = #aging{topics = [], timer = TRef}}}. + +handle_call(Req, _From, State) -> + lager:error("Unexpected Request: ~p", [Req]), + {reply, {error, unsupported_req}, State}. + +handle_cast({add_routes, TopicTable, Pid}, State) -> + case lookup(Pid) of + [] -> + erlang:monitor(process, Pid), + ets_add_routes(TopicTable, Pid); + TopicInEts -> + {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), + ets_update_routes(UpdatedTopics, Pid), + ets_add_routes(NewTopics, Pid) + end, + {noreply, State}; + +handle_cast({delete, Topics, Pid}, State) -> + Routes = [{Topic, Pid} || Topic <- Topics], + lists:foreach(fun ets_delete_route/1, Routes), + %% TODO: aging route...... + {noreply, State}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + Topics = [Topic || {Topic, _Qos} <- lookup(DownPid)], + ets:delete(reverse_route, DownPid), + lists:foreach(fun(Topic) -> + ets:match_delete(route, {Topic, DownPid, '_'}) + end, Topics), + %% TODO: aging route...... + {noreply, State}; + +handle_info(aging, State = #state{aging = #aging{topics = Topics}}) -> + %%TODO.. aging + %%io:format("Aging Topics: ~p~n", [Topics]), + {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{id = Id, aging = #aging{timer = TRef}}) -> + timer:cancel(TRef), + gproc_pool:connect_worker(route, {?MODULE, Id}), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================= +%%% Internal Functions +%%%============================================================================= + +diff(TopicTable, TopicInEts) -> + diff(TopicTable, TopicInEts, [], []). + +diff([], _TopicInEts, NewAcc, UpAcc) -> + {NewAcc, UpAcc}; + +diff([{Topic, Qos}|TopicTable], TopicInEts, NewAcc, UpAcc) -> + case lists:keyfind(Topic, 1, TopicInEts) of + {Topic, Qos} -> + diff(TopicTable, TopicInEts, NewAcc, UpAcc); + {Topic, _Qos} -> + diff(TopicTable, TopicInEts, NewAcc, [{Topic, Qos}|UpAcc]); + false -> + diff(TopicTable, TopicInEts, [{Topic, Qos}|NewAcc], UpAcc) + end. + +ets_add_routes([], _Pid) -> + ok; +ets_add_routes(TopicTable, Pid) -> + {Routes, ReverseRoutes} = routes(TopicTable, Pid), + ets:insert(route, Routes), + ets:insert(reverse_route, ReverseRoutes). + +ets_update_routes([], _Pid) -> + ok; +ets_update_routes(TopicTable, Pid) -> + {Routes, ReverseRoutes} = routes(TopicTable, Pid), + lists:foreach(fun ets_update_route/1, Routes), + lists:foreach(fun ets_update_reverse_route/1, ReverseRoutes). + +ets_update_route(Route = {Topic, Pid, _Qos}) -> + ets:match_delete(route, {Topic, Pid, '_'}), + ets:insert(route, Route). + +ets_update_reverse_route(RevRoute = {Pid, Topic, _Qos}) -> + ets:match_delete(reverse_route, {Pid, Topic, '_'}), + ets:insert(reverse_route, RevRoute). + +ets_delete_route({Topic, Pid}) -> + ets:match_delete(reverse_route, {Pid, Topic, '_'}), + ets:match_delete(route, {Topic, Pid, '_'}). + +routes(TopicTable, Pid) -> + F = fun(Topic, Qos) -> {{Topic, Pid, Qos}, {Pid, Topic, Qos}} end, + lists:unzip([F(Topic, Qos) || {Topic, Qos} <- TopicTable]). + +setstats(dropped, false) -> + ignore; + +setstats(dropped, true) -> + emqttd_metrics:inc('messages/dropped'). + diff --git a/src/emqttd_router_sup.erl b/src/emqttd_router_sup.erl new file mode 100644 index 000000000..8193ccea8 --- /dev/null +++ b/src/emqttd_router_sup.erl @@ -0,0 +1,46 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc Router Supervisor +%%% +%%% @author Feng Lee +%%% +%%%----------------------------------------------------------------------------- +-module(emqttd_router_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + Opts = emqttd_broker:env(router), + supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]). + +init([Opts]) -> + create_route_tabs(Opts), + MFA = {emqttd_router, start_link, [Opts]}, + PoolSup = emqttd_pool_sup:spec(pool_sup, [router, hash, MFA]), + {ok, {{one_for_all, 10, 3600}, [PoolSup]}}. +