diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index 4cd9a4169..6872b419f 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -147,17 +147,17 @@ %% Default should be scheduler numbers %% {pool_size, 8}, - %% Subscription: disc | ram + %% Subscription: disc | ram | false {subscription, ram}, %% Route shard - {route_shard, true}, + {route_shard, false}, %% Route delay, false | integer {route_delay, false}, %% Route aging time(seconds) - {route_aging, 10} + {route_aging, 5} ]}, %% Bridge diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 3b05c4672..806888527 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -139,17 +139,17 @@ %% Default should be scheduler numbers %% {pool_size, 8}, - %% Subscription: disc | ram + %% Subscription: disc | ram | false {subscription, ram}, %% Route shard - {route_shard, true}, + {route_shard, false}, %% Route delay, false | integer {route_delay, false}, %% Route aging time(seconds) - {route_aging, 10} + {route_aging, 5} ]}, %% Bridge diff --git a/src/emqttd.erl b/src/emqttd.erl index 44db5c068..ba44d790d 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -26,9 +26,9 @@ -module(emqttd). -export([start/0, env/1, env/2, - open_listeners/1, close_listeners/1, + start_listeners/0, stop_listeners/0, load_all_mods/0, is_mod_enabled/1, - is_running/1, ensure_pool/3]). + is_running/1]). -define(MQTT_SOCKOPTS, [ binary, @@ -38,6 +38,8 @@ {nodelay, true} ]). +-define(APP, ?MODULE). + -type listener() :: {atom(), inet:port_number(), [esockd:option()]}. %%------------------------------------------------------------------------------ @@ -61,32 +63,34 @@ env(Group, Name) -> proplists:get_value(Name, env(Group)). %%------------------------------------------------------------------------------ -%% @doc Open Listeners +%% @doc Start Listeners %% @end %%------------------------------------------------------------------------------ --spec open_listeners([listener()]) -> any(). -open_listeners(Listeners) when is_list(Listeners) -> - [open_listener(Listener) || Listener <- Listeners]. +-spec start_listeners() -> any(). +start_listeners() -> + {ok, Listeners} = application:get_env(?APP, listeners), + lists:foreach(fun start_listener/1, Listeners). -%% open mqtt port -open_listener({mqtt, Port, Options}) -> - open_listener(mqtt, Port, Options); +%% Start mqtt listener +-spec start_listener(listener()) -> any(). +start_listener({mqtt, Port, Options}) -> + start_listener(mqtt, Port, Options); -%% open mqtt(SSL) port -open_listener({mqtts, Port, Options}) -> - open_listener(mqtts, Port, Options); +%% Start mqtt(SSL) listener +start_listener({mqtts, Port, Options}) -> + start_listener(mqtts, Port, Options); -%% open http port -open_listener({http, Port, Options}) -> +%% Start http listener +start_listener({http, Port, Options}) -> MFArgs = {emqttd_http, handle_request, []}, mochiweb:start_http(Port, Options, MFArgs); -%% open https port -open_listener({https, Port, Options}) -> +%% Start https listener +start_listener({https, Port, Options}) -> MFArgs = {emqttd_http, handle_request, []}, mochiweb:start_http(Port, Options, MFArgs). -open_listener(Protocol, Port, Options) -> +start_listener(Protocol, Port, Options) -> MFArgs = {emqttd_client, start_link, [env(mqtt)]}, esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs). @@ -96,14 +100,14 @@ merge_sockopts(Options) -> emqttd_opts:merge(Options, [{sockopts, SockOpts}]). %%------------------------------------------------------------------------------ -%% @doc Close Listeners +%% @doc Stop Listeners %% @end %%------------------------------------------------------------------------------ --spec close_listeners([listener()]) -> any(). -close_listeners(Listeners) when is_list(Listeners) -> - [close_listener(Listener) || Listener <- Listeners]. +stop_listeners() -> + {ok, Listeners} = application:get_env(?APP, listeners), + lists:foreach(fun stop_listener/1, Listeners). -close_listener({Protocol, Port, _Options}) -> +stop_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). load_all_mods() -> @@ -127,13 +131,3 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. -%%------------------------------------------------------------------------------ -%% @doc Ensure gproc pool exist. -%% @end -%%------------------------------------------------------------------------------ -ensure_pool(Pool, Type, Opts) -> - try gproc_pool:new(Pool, Type, Opts) - catch - error:exists -> ok - end. - diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 561090851..b236b41f7 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -49,7 +49,7 @@ start(_StartType, _StartArgs) -> emqttd_cli:load(), emqttd:load_all_mods(), emqttd_plugins:load(), - start_listeners(), + emqttd:start_listeners(), register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -62,10 +62,6 @@ print_vsn() -> {ok, Desc} = application:get_key(description), ?PRINT("~s ~s is running now~n", [Desc, Vsn]). -start_listeners() -> - {ok, Listeners} = application:get_env(listeners), - emqttd:open_listeners(Listeners). - start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, {"emqttd trace", emqttd_trace}, @@ -132,15 +128,5 @@ worker_spec(M, F, A) -> -spec stop(State :: term()) -> term(). stop(_State) -> - stop_listeners(). - -stop_listeners() -> - %% ensure that esockd applications is started? - case lists:keyfind(esockd, 1, application:which_applications()) of - false -> - ignore; - _Tuple -> - {ok, Listeners} = application:get_env(listeners), - emqttd:close_listeners(Listeners) - end. + catch emqttd:stop_listeners(). diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 263283e99..79e28743b 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -82,7 +82,7 @@ init([Node, SubTopic, Options]) -> MQueue = emqttd_mqueue:new(qname(Node, SubTopic), [{max_len, State#state.max_queue_len}], emqttd_alarm:alarm_fun()), - emqttd_pubsub:subscribe(SubTopic, State#state.qos), + emqttd_pubsub:subscribe({SubTopic, State#state.qos}), {ok, State#state{mqueue = MQueue}}; false -> {stop, {cannot_connect, Node}} diff --git a/src/emqttd_pool_sup.erl b/src/emqttd_pool_sup.erl index a1c1f2565..8d0694a94 100644 --- a/src/emqttd_pool_sup.erl +++ b/src/emqttd_pool_sup.erl @@ -51,11 +51,23 @@ sup_name(Pool) -> list_to_atom(atom_to_list(Pool) ++ "_pool_sup"). init([Pool, Type, Size, {M, F, Args}]) -> - emqttd:ensure_pool(Pool, Type, [{size, Size}]), + ensure_pool(Pool, Type, [{size, Size}]), {ok, {{one_for_one, 10, 3600}, [ begin - gproc_pool:add_worker(Pool, {Pool, I}, I), + ensure_pool_worker(Pool, {Pool, I}, I), {{M, I}, {M, F, [Pool, I | Args]}, transient, 5000, worker, [M]} end || I <- lists:seq(1, Size)]}}. +ensure_pool(Pool, Type, Opts) -> + try gproc_pool:new(Pool, Type, Opts) + catch + error:exists -> ok + end. + +ensure_pool_worker(Pool, Name, Slot) -> + try gproc_pool:add_worker(Pool, Name, Slot) + catch + error:exists -> ok + end. + diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index f0885c282..f2b545784 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -67,18 +67,16 @@ %%%============================================================================= mnesia(boot) -> ok = create_table(topic, ram_copies), - case env(subscription) of - disc -> ok = create_table(subscription, disc_copies); - ram -> ok = create_table(subscription, ram_copies); - false -> ok - end; + if_subscription(fun(RamOrDisc) -> + ok = create_table(subscription, RamOrDisc) + end); mnesia(copy) -> ok = emqttd_mnesia:copy_table(topic), - case env(subscription) of - false -> ok; - _ -> ok = emqttd_mnesia:copy_table(subscription) - end. + %% Only one disc_copy??? + if_subscription(fun(_RamOrDisc) -> + ok = emqttd_mnesia:copy_table(subscription) + end). %% Topic Table create_table(topic, RamOrDisc) -> @@ -96,16 +94,27 @@ create_table(subscription, RamOrDisc) -> {record_name, mqtt_subscription}, {attributes, record_info(fields, mqtt_subscription)}]). +if_subscription(Fun) -> + case env(subscription) of + disc -> Fun(disc_copies); + ram -> Fun(ram_copies); + false -> ok; + undefined -> ok + end. + env(Key) -> case get({pubsub, Key}) of undefined -> - Val = proplists:get_value(Key, emqttd_broker:env(pubsub)), - put({pubsub, Key}, Val), - Val; + cache_env(Key); Val -> Val end. +cache_env(Key) -> + Val = emqttd_opts:g(Key, emqttd_broker:env(pubsub)), + put({pubsub, Key}, Val), + Val. + %%%============================================================================= %%% API %%%============================================================================= @@ -217,7 +226,8 @@ publish(Topic, Msg) when is_binary(Topic) -> -spec match(Topic :: binary()) -> [mqtt_topic()]. match(Topic) when is_binary(Topic) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), - lists:append([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). + %% ets:lookup for topic table will be copied. + lists:append([ets:lookup(topic, Name) || Name <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks @@ -226,25 +236,23 @@ match(Topic) when is_binary(Topic) -> init([Pool, Id, Opts]) -> ?ROUTER:init(Opts), ?GPROC_POOL(join, Pool, Id), - process_flag(priority, high), {ok, #state{pool = Pool, id = Id}}. handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) -> - %% Clean aging topics - ?HELPER:clean([Topic || {Topic, _Qos} <- TopicTable]), - %% Add routes first ?ROUTER:add_routes(TopicTable, SubPid), %% Add topics - Node = node(), - TRecords = [#mqtt_topic{topic = Topic, node = Node} || {Topic, _Qos} <- TopicTable], - - %% Add subscriptions - case mnesia:transaction(fun add_topics/1, [TRecords]) of + Topics = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable], + + case mnesia:transaction(fun add_topics/1, [Topics]) of {atomic, _} -> - %%TODO: store subscription - %% mnesia:async_dirty(fun add_subscriptions/2, [SubId, TopicTable]), + if_subscription( + fun(_) -> + %% Add subscriptions + Args = [fun add_subscriptions/2, [SubId, TopicTable]], + emqttd_pooler:async_submit({mnesia, async_dirty, Args}) + end), {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State}; {aborted, Error} -> {reply, {error, Error}, State} @@ -257,10 +265,12 @@ handle_call(Req, _From, State) -> handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State) -> %% Delete routes first ?ROUTER:delete_routes(Topics, SubPid), - %% Remove subscriptions - mnesia:async_dirty(fun remove_subscriptions/2, [SubId, Topics]), - + if_subscription( + fun(_) -> + Args = [fun remove_subscriptions/2, [SubId, Topics]], + emqttd_pooler:async_submit({mnesia, async_dirty, Args}) + end), {noreply, State}; handle_cast(Msg, State) -> @@ -268,7 +278,13 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + Routes = ?ROUTER:lookup_routes(DownPid), + + %% Delete all routes of the process ?ROUTER:delete_routes(DownPid), + + ?HELPER:aging([Topic || {Topic, _Qos} <- Routes, not ?ROUTER:has_route(Topic)]), + {noreply, State, hibernate}; handle_info(Info, State) -> diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl index 34859c979..636ada983 100644 --- a/src/emqttd_pubsub_helper.erl +++ b/src/emqttd_pubsub_helper.erl @@ -19,100 +19,177 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc PubSub Helper +%%% @doc PubSub Route Aging Helper %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- -module(emqttd_pubsub_helper). --behaviour(gen_server). +-behaviour(gen_server2). -include("emqttd.hrl"). --define(SERVER, ?MODULE). - %% API Function Exports --export([start_link/1, clean/1, setstats/1]). +-export([start_link/1, aging/1, setstats/1]). -%% ------------------------------------------------------------------ %% gen_server Function Exports -%% ------------------------------------------------------------------ - -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(aging, {topics, timer}). +-ifdef(TEST). +-compile(export_all). +-endif. + +-record(aging, {topics, time, tref}). -record(state, {aging :: #aging{}}). -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ +-define(SERVER, ?MODULE). +-define(ROUTER, emqttd_router). + +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Start pubsub helper. +%% @end +%%------------------------------------------------------------------------------ +-spec start_link(list(tuple())) -> {ok, pid()} | ignore | {error, any()}. start_link(Opts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). + gen_server2:start_link({local, ?SERVER}, ?MODULE, [Opts], []). -clean(Topics) -> - ok. +%%------------------------------------------------------------------------------ +%% @doc Aging topics +%% @end +%%------------------------------------------------------------------------------ +-spec aging(list(binary())) -> ok. +aging(Topics) -> + gen_server2:cast(?SERVER, {aging, Topics}). setstats(topic) -> - Size = mnesia:table_info(topic, size), - emqttd_stats:setstats('topics/count', 'topics/max', Size); - + emqttd_stats:setstats('topics/count', 'topics/max', + mnesia:table_info(topic, size)); setstats(subscription) -> - ok. + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', + mnesia:table_info(subscription, size)). -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= init([Opts]) -> + + mnesia:subscribe(system), + + AgingSecs = proplists:get_value(route_aging, Opts, 5), + %% Aging Timer - AgingSecs = proplists:get_value(aging, Opts, 5), + {ok, AgingTref} = start_tick(AgingSecs div 2), - {ok, TRef} = timer:send_interval(timer:seconds(AgingSecs), aging), + {ok, #state{aging = #aging{topics = dict:new(), + time = AgingSecs, + tref = AgingTref}}}. - {ok, #state{aging = #aging{topics = [], timer = TRef}}}. +start_tick(Secs) -> + timer:send_interval(timer:seconds(Secs), {clean, aged}). handle_call(_Request, _From, State) -> {reply, ok, State}. +handle_cast({aging, Topics}, State = #state{aging = Aging}) -> + #aging{topics = Dict} = Aging, + TS = emqttd_util:now_to_secs(), + Dict1 = + lists:foldl(fun(Topic, Acc) -> + case dict:find(Topic, Acc) of + {ok, _} -> Acc; + error -> dict:store(Topic, TS, Acc) + end + end, Dict, Topics), + {noreply, State#state{aging = Aging#aging{topics = Dict1}}}; + handle_cast(_Msg, State) -> {noreply, State}. +handle_info({clean, aged}, State = #state{aging = Aging}) -> + + #aging{topics = Dict, time = Time} = Aging, + + ByTime = emqttd_util:now_to_secs() - Time, + + Dict1 = try_clean(ByTime, dict:to_list(Dict)), + + NewAging = Aging#aging{topics = dict:from_list(Dict1)}, + + {noreply, State#state{aging = NewAging}, hibernate}; + +handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> + Pattern = #mqtt_topic{_ = '_', node = Node}, + F = fun() -> + [mnesia:delete_object(topic, R, write) || + R <- mnesia:match_object(topic, Pattern, write)] + end, + mnesia:async_dirty(F), + {noreply, State}; + handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{aging = #aging{timer = TRef}}) -> - timer:cancel(TRef), - TopicR = #mqtt_topic{_ = '_', node = node()}, - F = fun() -> - [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)] - %%TODO: remove trie?? - end, - mnesia:transaction(F), - ok. +terminate(_Reason, #state{aging = #aging{tref = TRef}}) -> + timer:cancel(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% Internal Functions +%%%============================================================================= + +try_clean(ByTime, List) -> + try_clean(ByTime, List, []). + +try_clean(_ByTime, [], Acc) -> + Acc; + +try_clean(ByTime, [{Topic, TS} | Left], Acc) -> + case ?ROUTER:has_route(Topic) of + false -> + try_clean2(ByTime, {Topic, TS}, Left, Acc); + true -> + try_clean(ByTime, Left, Acc) + end. + +try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime -> + try_clean(ByTime, Left, [{Topic, TS}|Acc]); + +try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + io:format("Try to remove topic: ~p~n", [Topic]), + mnesia:transaction(fun try_remove_topic/1, [TopicR]), + try_clean(ByTime, Left, Acc). try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> - case mnesia:read({subscriber, Topic}) of - [] -> - mnesia:delete_object(topic, TopicR, write), - case mnesia:read(topic, Topic) of - [] -> emqttd_trie:delete(Topic); - _ -> ok - end; - _ -> - ok - end. - -%%%============================================================================= -%%% Stats functions -%%%============================================================================= + %% Lock topic first + case mnesia:wread({topic, Topic}) of + [] -> ok; + [TopicR] -> + if_no_route(Topic, fun() -> + %% Remove topic and trie + mnesia:delete_object(topic, TopicR, write), + emqttd_trie:delete(Topic) + end); + _More -> + if_no_route(Topic, fun() -> + %% Remove topic + mnesia:delete_object(topic, TopicR, write) + end) + end. + +if_no_route(Topic, Fun) -> + case ?ROUTER:has_route(Topic) of + true -> ok; + false -> Fun() + end. diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index a39cfda31..dd95c5cba 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -39,7 +39,7 @@ -include("emqttd_protocol.hrl"). --export([init/1, route/2, lookup_routes/1, +-export([init/1, route/2, lookup_routes/1, has_route/1, add_routes/2, delete_routes/1, delete_routes/2]). -ifdef(TEST). @@ -92,6 +92,14 @@ add_routes(TopicTable, Pid) when is_pid(Pid) -> lookup_routes(Pid) when is_pid(Pid) -> [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. +%%------------------------------------------------------------------------------ +%% @doc Has Route +%% @end +%%------------------------------------------------------------------------------ +-spec has_route(binary()) -> boolean(). +has_route(Topic) -> + ets:member(route, Topic). + %%------------------------------------------------------------------------------ %% @doc Delete Routes. %% @end diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 29c8b4c5d..bfc885ae4 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -320,7 +320,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli hibernate(Session); _ -> %% subscribe first and don't care if the subscriptions have been existed - {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), + {ok, GrantedQos} = emqttd_pubsub:subscribe(ClientId, TopicTable), AckFun(GrantedQos), diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index ba9d690a5..75f8abe1f 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -51,7 +51,6 @@ start_link(StatsFun) -> init([StatsFun]) -> mnesia:subscribe(system), {ok, TRef} = timer:send_interval(timer:seconds(1), tick), - StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. handle_call(_Request, _From, State) ->