From ab84b6ff09d847159b21354f776c636bd032688a Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sun, 19 Apr 2015 19:35:09 +0800 Subject: [PATCH] integrate gproc --- apps/emqttd/src/emqttd.app.src | 2 +- apps/emqttd/src/emqttd_app.erl | 62 ++--- apps/emqttd/src/emqttd_bridge.erl | 4 +- apps/emqttd/src/emqttd_event.erl | 4 +- apps/emqttd/src/emqttd_http.erl | 8 +- apps/emqttd/src/emqttd_metrics.erl | 2 +- apps/emqttd/src/emqttd_pooler.erl | 29 ++- apps/emqttd/src/emqttd_protocol.erl | 2 +- apps/emqttd/src/emqttd_pubsub.erl | 320 +++++++++++++------------- apps/emqttd/src/emqttd_pubsub_sup.erl | 13 +- apps/emqttd/src/emqttd_session.erl | 6 +- apps/emqttd/src/emqttd_sysmon.erl | 4 +- doc/pubsub.md | 1 + rel/files/vm.args | 2 +- rel/reltool.config | 2 + 15 files changed, 230 insertions(+), 231 deletions(-) diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 26053005b..5841321f9 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.6.0"}, + {vsn, "0.6.1"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 941c565e4..e7e116dd6 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -33,19 +33,6 @@ %% Application callbacks -export([start/2, stop/1]). -%% Servers --define(SERVERS, [config, - event, - client, - session, - pubsub, - router, - broker, - metrics, - bridge, - access_control, - sysmon]). - -define(PRINT_MSG(Msg), io:format(Msg)). -define(PRINT(Format, Args), io:format(Format, Args)). @@ -79,7 +66,25 @@ print_vsn() -> ?PRINT("~s ~s is running now~n", [Desc, Vsn]). start_servers(Sup) -> - Servers = lists:flatten([server(Srv) || Srv <- ?SERVERS]), + {ok, SessOpts} = application:get_env(session), + {ok, PubSubOpts} = application:get_env(pubsub), + {ok, BrokerOpts} = application:get_env(broker), + {ok, MetricOpts} = application:get_env(metrics), + {ok, AccessOpts} = application:get_env(access_control), + Servers = [ + {"emqttd config", emqttd_config}, + {"emqttd event", emqttd_event}, + {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, + {"emqttd client manager", emqttd_cm}, + {"emqttd session manager", emqttd_sm}, + {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}, + {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts}, + %{"emqttd router", emqttd_router}, + {"emqttd broker", emqttd_broker, BrokerOpts}, + {"emqttd metrics", emqttd_metrics, MetricOpts}, + {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, + {"emqttd access control", emqttd_access_control, AccessOpts}, + {"emqttd system monitor", emqttd_sysmon}], [start_server(Sup, Server) || Server <- Servers]. start_server(_Sup, {Name, F}) when is_function(F) -> @@ -97,35 +102,6 @@ start_server(Sup, {Name, Server, Opts}) -> start_child(Sup, Server, Opts), ?PRINT_MSG("[done]~n"). -%%TODO: redesign later... -server(config) -> - {"emqttd config", emqttd_config}; -server(event) -> - {"emqttd event", emqttd_event}; -server(client) -> - {"emqttd client manager", emqttd_cm}; -server(session) -> - {ok, SessOpts} = application:get_env(session), - [{"emqttd session manager", emqttd_sm}, - {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}]; -server(pubsub) -> - {"emqttd pubsub", emqttd_pubsub}; -server(router) -> - {"emqttd router", emqttd_router}; -server(broker) -> - {ok, BrokerOpts} = application:get_env(broker), - {"emqttd broker", emqttd_broker, BrokerOpts}; -server(metrics) -> - {ok, MetricOpts} = application:get_env(metrics), - {"emqttd metrics", emqttd_metrics, MetricOpts}; -server(bridge) -> - {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}; -server(access_control) -> - {ok, AcOpts} = application:get_env(access_control), - {"emqttd access control", emqttd_access_control, AcOpts}; -server(sysmon) -> - {"emqttd system monitor", emqttd_sysmon}. - start_child(Sup, {supervisor, Name}) -> supervisor:start_child(Sup, supervisor_spec(Name)); start_child(Sup, Name) when is_atom(Name) -> diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 24a80d6a8..8e4896351 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -113,7 +113,7 @@ handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down} {noreply, State}; handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) -> - rpc:cast(Node, emqttd_router, route, [transform(Msg, State)]), + rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]), {noreply, State}; handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> @@ -172,5 +172,3 @@ transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos, end, Msg1#mqtt_message{topic = <>}. - - diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index b551655e9..8a6af1db8 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -75,13 +75,13 @@ init([]) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, - emqttd_router:route(event, Msg), + emqttd_pubsub:publish(Msg), {ok, State}; handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, - emqttd_router:route(event, Msg), + emqttd_pubsub:publish(Msg), {ok, State}; handle_event({subscribed, ClientId, TopicTable}, State) -> diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 5dd17dc19..4a24f4893 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) -> Message = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_router:route(http, #mqtt_message{qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + emqttd_pubsub:publish(#mqtt_message{qos = Qos, + retain = Retain, + topic = Topic, + payload = Message}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index fac00e034..ce03af907 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -220,7 +220,7 @@ systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). publish(Topic, Payload) -> - emqttd_router:route(metrics, #mqtt_message{topic = Topic, payload = Payload}). + emqttd_pubsub:publish(#mqtt_message{topic = Topic, payload = Payload}). new_metric({gauge, Name}) -> ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); diff --git a/apps/emqttd/src/emqttd_pooler.erl b/apps/emqttd/src/emqttd_pooler.erl index 16b4bca57..09ad7bf7a 100644 --- a/apps/emqttd/src/emqttd_pooler.erl +++ b/apps/emqttd/src/emqttd_pooler.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd pooler supervisor. +%%% emqttd pooler. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -30,10 +30,8 @@ -behaviour(gen_server). --define(SERVER, ?MODULE). - %% API Exports --export([start_link/1]). +-export([start_link/1, submit/1, async_submit/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -48,13 +46,26 @@ start_link(I) -> gen_server:start_link(?MODULE, [I], []). +submit(Fun) -> + gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity). + +async_submit(Fun) -> + gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}). + init([I]) -> gproc_pool:connect_worker(pooler, {pooler, I}), {ok, #state{id = I}}. +handle_call({submit, Fun}, _From, State) -> + {reply, run(Fun), State}; + handle_call(_Req, _From, State) -> {reply, ok, State}. +handle_cast({async_submit, Fun}, State) -> + run(Fun), + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. @@ -67,3 +78,13 @@ terminate(_Reason, #state{id = I}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +run({M, F, A}) -> + erlang:apply(M, F, A); +run(Fun) when is_function(Fun) -> + Fun(). + + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 6c6b29808..08de737f5 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -266,7 +266,7 @@ send_willmsg(_ClientId, undefined) -> ignore; %%TODO:should call session... send_willmsg(ClientId, WillMsg) -> - emqttd_router:route(ClientId, WillMsg). + emqttd_pubsub:publish(WillMsg). start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 8c6d6ffa1..4792d8f61 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -28,10 +28,10 @@ -author('feng@emqtt.io'). --include_lib("emqtt/include/emqtt.hrl"). - -include("emqttd.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). @@ -41,10 +41,10 @@ -behaviour(gen_server). %% API Exports --export([start_link/0, name/1]). +-export([start_link/2]). -export([create/1, - subscribe/1, subscribe/2, + subscribe/1, unsubscribe/1, publish/1, publish/2, %local node @@ -54,9 +54,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(SUBACK_ERR, 128). +-define(POOL, pubsub). --record(state, {submap :: map()}). +-record(state, {id, submap :: map()}). %%%============================================================================= %%% Mnesia callbacks @@ -83,93 +83,60 @@ mnesia(copy) -> %%%============================================================================= %%% API -%%% %%%============================================================================= -%%% %%------------------------------------------------------------------------------ -%% @doc -%% Start Pubsub. -%% -%% @end +%% @doc Start one pubsub. %%------------------------------------------------------------------------------ --spec start_link(Opts) -> {ok, pid()} | ignore | {error, any()}. -start_link(Opts) -> - gen_server:start_link(?MODULE, [], []). - -name(I) -> - list_to_atom("emqttd_pubsub_" ++ integer_to_list(I)). +-spec start_link(Id, Opts) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + Opts :: list(). +start_link(Id, Opts) -> + gen_server:start_link(?MODULE, [Id, Opts], []). %%------------------------------------------------------------------------------ -%% @doc -%% Create topic. -%% -%% @end +%% @doc Create topic. Notice That this transaction is not protected by pubsub pool. %%------------------------------------------------------------------------------ --spec create(binary()) -> {atomic, ok} | {aborted, Reason :: any()}. +-spec create(Topic :: binary()) -> ok | {error, Error :: any()}. create(Topic) when is_binary(Topic) -> - TopicRecord = #mqtt_topic{topic = Topic, node = node()}, - Result = mnesia:transaction(fun create_topic/1, [TopicRecord]), - setstats(topics), Result. + TopicR = #mqtt_topic{topic = Topic, node = node()}, + case mnesia:transaction(fun add_topic/1, [TopicR]) of + {atomic, ok} -> setstats(topics), ok; + {aborted, Error} -> {error, Error} + end. %%------------------------------------------------------------------------------ -%% @doc -%% Subscribe topic or topics. -%% -%% @end +%% @doc Subscribe topic. %%------------------------------------------------------------------------------ --spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when +-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), Qos :: mqtt_qos(). + +subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) -> + call({subscribe, self(), Topic, Qos}); + subscribe(Topics = [{_Topic, _Qos} | _]) -> - {ok, lists:map(fun({Topic, Qos}) -> - case subscribe(Topic, Qos) of - {ok, GrantedQos} -> - GrantedQos; - {error, Error} -> - lager:error("subscribe '~s' error: ~p", [Topic, Error]), - ?SUBACK_ERR - end - end, Topics)}. + call({subscribe, self(), Topics}). --spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()} | {error, any()}. -subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> - case create(Topic) of - {atomic, ok} -> - Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = self()}, - ets:insert_new(?SUBSCRIBER_TAB, Subscriber), - {ok, Qos}; % Grant all qos - {aborted, Reason} -> - {error, Reason}. - -%%------------------------------------------------------------------------------ -%% @doc -%% Unsubscribe Topic or Topics -%% -%% @end -%%------------------------------------------------------------------------------ +%% @doc Unsubscribe Topic or Topics -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> - Pattern = #mqtt_subscriber{topic = Topic, _ = '_', pid = self()}, - ets:match_delete(?SUBSCRIBER_TAB, Pattern), - - TopicRecord = #mqtt_topic{topic = Topic, node = node()}, - F = fun() -> - %%TODO record name... - [mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)], - try_remove_topic(TopicRecord) - end, - %{atomic, _} = mneisa:transaction(F), - ok; + cast({unsubscribe, self(), Topic}); unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> - lists:foreach(fun(T) -> unsubscribe(T) end, Topics). + cast({unsubscribe, self(), Topics}). + +call(Req) -> + Pid = gproc_pool:pick_worker(?POOL, self()), + lager:info("~p call ~p", [self(), Pid]), + gen_server:call(Pid, Req, infinity). + +cast(Msg) -> + Pid = gproc_pool:pick_worker(?POOL, self()), + gen_server:cast(Pid, Msg). %%------------------------------------------------------------------------------ -%% @doc -%% Publish to cluster node. -%% -%% @end +%% @doc Publish to cluster nodes. %%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. publish(Msg=#mqtt_message{topic=Topic}) -> @@ -184,107 +151,113 @@ publish(Topic, Msg) when is_binary(Topic) -> end end, match(Topic)). -%%------------------------------------------------------------------------------ -%% @doc -%% Dispatch Locally. Should only be called by publish. -%% -%% @end -%%------------------------------------------------------------------------------ +%% @doc Dispatch message locally. should only be called by publish. -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - case ets:lookup:(?SUBSCRIBER_TAB, Topic) of - [] -> - %%TODO: not right when clusted... - setstats(dropped); - Subscribers -> - lists:foreach( - fun(#mqtt_subscriber{qos = SubQos, subpid=SubPid}) -> - Msg1 = if - Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; - true -> Msg - end, - SubPid ! {dispatch, {self(), Msg1}} - end, Subscribers) - end. + Subscribers = mnesia:dirty_read(subscriber, Topic), + setstats(dropped, Subscribers =:= []), %%TODO:... + lists:foreach( + fun(#mqtt_subscriber{qos = SubQos, pid=SubPid}) -> + Msg1 = if + Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; + true -> Msg + end, + SubPid ! {dispatch, {self(), Msg1}} + end, Subscribers), + length(Subscribers). -%%------------------------------------------------------------------------------ -%% @doc -%% @private -%% Match topic. -%% -%% @end -%%------------------------------------------------------------------------------ -spec match(Topic :: binary()) -> [mqtt_topic()]. match(Topic) when is_binary(Topic) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]), - lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). + lists:append([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - %%TODO: really need? - process_flag(priority, high), +init([Id, _Opts]) -> process_flag(min_heap_size, 1024*1024), - mnesia:subscribe({table, topic, simple}), - mnesia:subscribe({table, subscriber, simple}), - {ok, #state{submap = maps:new()}}. + gproc_pool:connect_worker(pubsub, {?MODULE, Id}), + {ok, #state{id = Id, submap = maps:new()}}. + +handle_call({subscribe, SubPid, Topics}, _From, State) -> + TopicSubs = lists:map(fun({Topic, Qos}) -> + {#mqtt_topic{topic = Topic, node = node()}, + #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}} + end, Topics), + F = fun() -> + lists:map(fun add_subscriber/1, TopicSubs) + end, + case mnesia:transaction(F) of + {atomic, _Result} -> + setstats(all), + NewState = monitor_subscriber(SubPid, State), + %% grant all qos + {reply, {ok, [Qos || {_Topic, Qos} <- Topics]}, NewState}; + {aborted, Error} -> + {reply, {error, Error}, State} + end; + +handle_call({subscribe, SubPid, Topic, Qos}, _From, State) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}, + case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of + {atomic, ok} -> + setstats(all), + {reply, {ok, Qos}, monitor_subscriber(SubPid, State)}; + {aborted, Error} -> + {reply, {error, Error}, State} + end; handle_call(Req, _From, State) -> lager:error("Bad Request: ~p", [Req]), {reply, {error, badreq}, State}. +handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) -> + TopicSubs = lists:map(fun(Topic) -> + {#mqtt_topic{topic = Topic, node = node()}, + #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}} + end, Topics), + F = fun() -> lists:foreach(fun remove_subscriber/1, TopicSubs) end, + case mnesia:transaction(F) of + {atomic, _} -> ok; + {aborted, Error} -> lager:error("unsubscribe ~p error: ~p", [Topics, Error]) + end, + setstats(all), + {noreply, State}; + +handle_cast({unsubscribe, SubPid, Topic}, State) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}, + case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of + {atomic, _} -> ok; + {aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error]) + end, + setstats(all), + {noreply, State}; + handle_cast(Msg, State) -> lager:error("Bad Msg: ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, #mqtt_subscriber{subpid = Pid}, _ActivityId}}, - State = #state{submap = SubMap}) -> - NewSubMap = - case maps:is_key(Pid, SubMap) of - false -> - maps:put(Pid, erlang:monitor(process, Pid), SubMap); - true -> - SubMap - end, - setstats(subscribers), - {noreply, State#state{submap = NewSubMap}}; - -handle_info({mnesia_table_event, {write, #mqtt_topic{}, _ActivityId}}, State) -> - %%TODO: this is not right when clusterd. - setstats(topics), - {noreply, State}; - -%% {write, #topic{}, _ActivityId} -%% {delete_object, _OldRecord, _ActivityId} -%% {delete, {Tab, Key}, ActivityId} -handle_info({mnesia_table_event, _Event}, State) -> - setstats(topics), - setstats(subscribers), - {noreply, State}; - handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) -> case maps:is_key(DownPid, SubMap) of true -> Node = node(), F = fun() -> - Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid), + Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid), lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) -> mnesia:delete_object(subscriber, Sub, write), try_remove_topic(#mqtt_topic{topic = Topic, node = Node}) end, Subscribers) end, - NewState = case catch mnesia:transaction(F) of - {atomic, _} -> - State#state{submap = maps:remove(DownPid, SubMap)}; + {atomic, _} -> ok; {aborted, Reason} -> - lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]), - State + lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) end, - setstats(topics), setstats(subscribers), - {noreply, NewState}; + setstats(all), + {noreply, State#state{submap = maps:remove(DownPid, SubMap)}}; false -> lager:error("Unexpected 'DOWN' from ~p", [DownPid]), {noreply, State} @@ -295,10 +268,13 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, _State) -> - mnesia:unsubscribe({table, topic, simple}), - mnesia:unsubscribe({table, subscriber, simple}), - %%TODO: clear topics belongs to this node??? - ok. + TopicR = #mqtt_topic{_ = '_', node = node()}, + F = fun() -> + [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)] + %%TODO: remove trie?? + end, + mnesia:transaction(F), + setstats(all). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -307,28 +283,44 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= --spec create_topic(#mqtt_topic{}) -> {atomic, ok} | {aborted, any()}. -create_topic(TopicRecord = #mqtt_topic{topic = Topic}) -> +add_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:wread({topic, Topic}) of [] -> ok = emqttd_trie:insert(Topic), - mnesia:write(topic, TopicRecord, write); + mnesia:write(topic, TopicR, write); Records -> - case lists:member(TopicRecord, Records) of - true -> - ok; - false -> - mnesia:write(topic, TopicRecord, write) + case lists:member(TopicR, Records) of + true -> ok; + false -> mnesia:write(topic, TopicR, write) end end. -insert_subscriber(Subscriber) -> - mnesia:write(subscriber, Subscriber, write). +add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> + case add_topic(TopicR) of + ok -> + mnesia:write(subscriber, Subscriber, write); + Error -> + Error + end. -try_remove_topic(Record = #mqtt_topic{topic = Topic}) -> +monitor_subscriber(SubPid, State = #state{submap = SubMap}) -> + NewSubMap = case maps:is_key(SubPid, SubMap) of + false -> + maps:put(SubPid, erlang:monitor(process, SubPid), SubMap); + true -> + SubMap + end, + State#state{submap = NewSubMap}. + +remove_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> + [mnesia:delete_object(subscriber, Sub, write) || + Sub <- mnesia:match_object(subscriber, Subscriber, write)], + try_remove_topic(TopicR). + +try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> case mnesia:read({subscriber, Topic}) of [] -> - mnesia:delete_object(topic, Record, write), + mnesia:delete_object(topic, TopicR, write), case mnesia:read(topic, Topic) of [] -> emqttd_trie:delete(Topic); _ -> ok @@ -337,13 +329,23 @@ try_remove_topic(Record = #mqtt_topic{topic = Topic}) -> ok end. +%%%============================================================================= +%%% Stats functions +%%%============================================================================= +setstats(all) -> + setstats(topics), + setstats(subscribers); setstats(topics) -> - emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)); - + emqttd_broker:setstat('topics/count', + mnesia:table_info(topic, size)); setstats(subscribers) -> emqttd_broker:setstats('subscribers/count', 'subscribers/max', - mnesia:table_info(subscriber, size)); -setstats(dropped) -> + mnesia:table_info(subscriber, size)). + +setstats(dropped, false) -> + ignore; +setstats(dropped, true) -> emqttd_metrics:inc('messages/dropped'). + diff --git a/apps/emqttd/src/emqttd_pubsub_sup.erl b/apps/emqttd/src/emqttd_pubsub_sup.erl index 35bd20fb6..b6a7936f6 100644 --- a/apps/emqttd/src/emqttd_pubsub_sup.erl +++ b/apps/emqttd/src/emqttd_pubsub_sup.erl @@ -43,17 +43,14 @@ start_link(Opts) -> init([Opts]) -> Schedulers = erlang:system_info(schedulers), - PoolSize = proplists:get_value(pool, Opts, Schedulers), + PoolSize = proplists:get_value(pool_size, Opts, Schedulers), gproc_pool:new(pubsub, hash, [{size, PoolSize}]), Children = lists:map( fun(I) -> - gproc_pool:add_worker(pubsub, emqttd_pubsub:name(I), I), - child(I, Opts) + Name = {emqttd_pubsub, I}, + gproc_pool:add_worker(pubsub, Name, I), + {Name, {emqttd_pubsub, start_link, [I, Opts]}, + permanent, 5000, worker, [emqttd_pubsub]} end, lists:seq(1, PoolSize)), {ok, {{one_for_all, 10, 100}, Children}}. -child(I, Opts) -> - {{emqttd_pubsub, I}, - {emqttd_pubsub, start_link, [I, Opts]}, - permanent, 5000, worker, [emqttd_pubsub]}. - diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 8458a0075..2bca04699 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> %%------------------------------------------------------------------------------ -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). publish(Session, ClientId, {?QOS_0, Message}) -> - emqttd_router:route(ClientId, Message), Session; + emqttd_pubsub:publish(Message), Session; publish(Session, ClientId, {?QOS_1, Message}) -> - emqttd_router:route(ClientId, Message), Session; + emqttd_pubsub:publish(Message), Session; publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> @@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> puback(SessState = #session_state{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of - {ok, Msg} -> emqttd_router:route(ClientId, Msg); + {ok, Msg} -> emqttd_pubsub:publish(Msg); error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) end, SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; diff --git a/apps/emqttd/src/emqttd_sysmon.erl b/apps/emqttd/src/emqttd_sysmon.erl index 687de5b8c..ed676b1dd 100644 --- a/apps/emqttd/src/emqttd_sysmon.erl +++ b/apps/emqttd/src/emqttd_sysmon.erl @@ -55,7 +55,9 @@ start_link() -> %%%============================================================================= init([]) -> - erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]), + erlang:system_monitor(self(), [{long_gc, 5000}, + {large_heap, 8 * 1024 * 1024}, + busy_port]), {ok, #state{}}. handle_call(Request, _From, State) -> diff --git a/doc/pubsub.md b/doc/pubsub.md index 8978c29e3..69fa14a91 100644 --- a/doc/pubsub.md +++ b/doc/pubsub.md @@ -14,6 +14,7 @@ PubQos | SubQos | In Message | Out Message 2 | 1 | - | - 2 | 2 | - | - + ## Publish diff --git a/rel/files/vm.args b/rel/files/vm.args index 897fe97f3..4ff3744f2 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -24,5 +24,5 @@ #-env ERL_MAX_ETS_TABLES 1024 ## Tweak GC to run more often -##-env ERL_FULLSWEEP_AFTER 10 +##-env ERL_FULLSWEEP_AFTER 1000 # diff --git a/rel/reltool.config b/rel/reltool.config index 9d913f6d7..6a23dbb01 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -15,6 +15,7 @@ inets, goldrush, lager, + gproc, esockd, mochiweb, emqttd @@ -45,6 +46,7 @@ {app, inets, [{mod_cond, app},{incl_cond, include}]}, {app, goldrush, [{mod_cond, app}, {incl_cond, include}]}, {app, lager, [{mod_cond, app}, {incl_cond, include}]}, + {app, gproc, [{mod_cond, app}, {incl_cond, include}]}, {app, esockd, [{mod_cond, app}, {incl_cond, include}]}, {app, mochiweb, [{mod_cond, app}, {incl_cond, include}]}, {app, emqtt, [{mod_cond, app}, {incl_cond, include}]},