integrate gproc

This commit is contained in:
Ery Lee 2015-04-19 19:35:09 +08:00
parent fd8024821b
commit ab84b6ff09
15 changed files with 230 additions and 231 deletions

View File

@ -1,7 +1,7 @@
{application, emqttd, {application, emqttd,
[ [
{description, "Erlang MQTT Broker"}, {description, "Erlang MQTT Broker"},
{vsn, "0.6.0"}, {vsn, "0.6.1"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, {applications, [kernel,

View File

@ -33,19 +33,6 @@
%% Application callbacks %% Application callbacks
-export([start/2, stop/1]). -export([start/2, stop/1]).
%% Servers
-define(SERVERS, [config,
event,
client,
session,
pubsub,
router,
broker,
metrics,
bridge,
access_control,
sysmon]).
-define(PRINT_MSG(Msg), io:format(Msg)). -define(PRINT_MSG(Msg), io:format(Msg)).
-define(PRINT(Format, Args), io:format(Format, Args)). -define(PRINT(Format, Args), io:format(Format, Args)).
@ -79,7 +66,25 @@ print_vsn() ->
?PRINT("~s ~s is running now~n", [Desc, Vsn]). ?PRINT("~s ~s is running now~n", [Desc, Vsn]).
start_servers(Sup) -> start_servers(Sup) ->
Servers = lists:flatten([server(Srv) || Srv <- ?SERVERS]), {ok, SessOpts} = application:get_env(session),
{ok, PubSubOpts} = application:get_env(pubsub),
{ok, BrokerOpts} = application:get_env(broker),
{ok, MetricOpts} = application:get_env(metrics),
{ok, AccessOpts} = application:get_env(access_control),
Servers = [
{"emqttd config", emqttd_config},
{"emqttd event", emqttd_event},
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
{"emqttd client manager", emqttd_cm},
{"emqttd session manager", emqttd_sm},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
%{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker, BrokerOpts},
{"emqttd metrics", emqttd_metrics, MetricOpts},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd access control", emqttd_access_control, AccessOpts},
{"emqttd system monitor", emqttd_sysmon}],
[start_server(Sup, Server) || Server <- Servers]. [start_server(Sup, Server) || Server <- Servers].
start_server(_Sup, {Name, F}) when is_function(F) -> start_server(_Sup, {Name, F}) when is_function(F) ->
@ -97,35 +102,6 @@ start_server(Sup, {Name, Server, Opts}) ->
start_child(Sup, Server, Opts), start_child(Sup, Server, Opts),
?PRINT_MSG("[done]~n"). ?PRINT_MSG("[done]~n").
%%TODO: redesign later...
server(config) ->
{"emqttd config", emqttd_config};
server(event) ->
{"emqttd event", emqttd_event};
server(client) ->
{"emqttd client manager", emqttd_cm};
server(session) ->
{ok, SessOpts} = application:get_env(session),
[{"emqttd session manager", emqttd_sm},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}];
server(pubsub) ->
{"emqttd pubsub", emqttd_pubsub};
server(router) ->
{"emqttd router", emqttd_router};
server(broker) ->
{ok, BrokerOpts} = application:get_env(broker),
{"emqttd broker", emqttd_broker, BrokerOpts};
server(metrics) ->
{ok, MetricOpts} = application:get_env(metrics),
{"emqttd metrics", emqttd_metrics, MetricOpts};
server(bridge) ->
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}};
server(access_control) ->
{ok, AcOpts} = application:get_env(access_control),
{"emqttd access control", emqttd_access_control, AcOpts};
server(sysmon) ->
{"emqttd system monitor", emqttd_sysmon}.
start_child(Sup, {supervisor, Name}) -> start_child(Sup, {supervisor, Name}) ->
supervisor:start_child(Sup, supervisor_spec(Name)); supervisor:start_child(Sup, supervisor_spec(Name));
start_child(Sup, Name) when is_atom(Name) -> start_child(Sup, Name) when is_atom(Name) ->

View File

@ -113,7 +113,7 @@ handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down}
{noreply, State}; {noreply, State};
handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) -> handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) ->
rpc:cast(Node, emqttd_router, route, [transform(Msg, State)]), rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
{noreply, State}; {noreply, State};
handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
@ -172,5 +172,3 @@ transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos,
end, end,
Msg1#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}. Msg1#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.

View File

@ -75,13 +75,13 @@ init([]) ->
handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)},
emqttd_router:route(event, Msg), emqttd_pubsub:publish(Msg),
{ok, State}; {ok, State};
handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
emqttd_router:route(event, Msg), emqttd_pubsub:publish(Msg),
{ok, State}; {ok, State};
handle_event({subscribed, ClientId, TopicTable}, State) -> handle_event({subscribed, ClientId, TopicTable}, State) ->

View File

@ -55,7 +55,7 @@ handle('POST', "/mqtt/publish", Req) ->
Message = list_to_binary(get_value("message", Params)), Message = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} -> {true, true} ->
emqttd_router:route(http, #mqtt_message{qos = Qos, emqttd_pubsub:publish(#mqtt_message{qos = Qos,
retain = Retain, retain = Retain,
topic = Topic, topic = Topic,
payload = Message}), payload = Message}),

View File

@ -220,7 +220,7 @@ systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
publish(Topic, Payload) -> publish(Topic, Payload) ->
emqttd_router:route(metrics, #mqtt_message{topic = Topic, payload = Payload}). emqttd_pubsub:publish(#mqtt_message{topic = Topic, payload = Payload}).
new_metric({gauge, Name}) -> new_metric({gauge, Name}) ->
ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); ets:insert(?METRIC_TABLE, {{Name, 0}, 0});

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqttd pooler supervisor. %%% emqttd pooler.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -30,10 +30,8 @@
-behaviour(gen_server). -behaviour(gen_server).
-define(SERVER, ?MODULE).
%% API Exports %% API Exports
-export([start_link/1]). -export([start_link/1, submit/1, async_submit/1]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -48,13 +46,26 @@
start_link(I) -> start_link(I) ->
gen_server:start_link(?MODULE, [I], []). gen_server:start_link(?MODULE, [I], []).
submit(Fun) ->
gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity).
async_submit(Fun) ->
gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}).
init([I]) -> init([I]) ->
gproc_pool:connect_worker(pooler, {pooler, I}), gproc_pool:connect_worker(pooler, {pooler, I}),
{ok, #state{id = I}}. {ok, #state{id = I}}.
handle_call({submit, Fun}, _From, State) ->
{reply, run(Fun), State};
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast({async_submit, Fun}, State) ->
run(Fun),
{noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
@ -67,3 +78,13 @@ terminate(_Reason, #state{id = I}) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
run({M, F, A}) ->
erlang:apply(M, F, A);
run(Fun) when is_function(Fun) ->
Fun().

View File

@ -266,7 +266,7 @@ send_willmsg(_ClientId, undefined) ->
ignore; ignore;
%%TODO:should call session... %%TODO:should call session...
send_willmsg(ClientId, WillMsg) -> send_willmsg(ClientId, WillMsg) ->
emqttd_router:route(ClientId, WillMsg). emqttd_pubsub:publish(WillMsg).
start_keepalive(0) -> ignore; start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 -> start_keepalive(Sec) when Sec > 0 ->

View File

@ -28,10 +28,10 @@
-author('feng@emqtt.io'). -author('feng@emqtt.io').
-include_lib("emqtt/include/emqtt.hrl").
-include("emqttd.hrl"). -include("emqttd.hrl").
-include_lib("emqtt/include/emqtt.hrl").
%% Mnesia Callbacks %% Mnesia Callbacks
-export([mnesia/1]). -export([mnesia/1]).
@ -41,10 +41,10 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API Exports %% API Exports
-export([start_link/0, name/1]). -export([start_link/2]).
-export([create/1, -export([create/1,
subscribe/1, subscribe/2, subscribe/1,
unsubscribe/1, unsubscribe/1,
publish/1, publish/2, publish/1, publish/2,
%local node %local node
@ -54,9 +54,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-define(SUBACK_ERR, 128). -define(POOL, pubsub).
-record(state, {submap :: map()}). -record(state, {id, submap :: map()}).
%%%============================================================================= %%%=============================================================================
%%% Mnesia callbacks %%% Mnesia callbacks
@ -83,93 +83,60 @@ mnesia(copy) ->
%%%============================================================================= %%%=============================================================================
%%% API %%% API
%%%
%%%============================================================================= %%%=============================================================================
%%%
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc Start one pubsub.
%% Start Pubsub.
%%
%% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link(Opts) -> {ok, pid()} | ignore | {error, any()}. -spec start_link(Id, Opts) -> {ok, pid()} | ignore | {error, any()} when
start_link(Opts) -> Id :: pos_integer(),
gen_server:start_link(?MODULE, [], []). Opts :: list().
start_link(Id, Opts) ->
name(I) -> gen_server:start_link(?MODULE, [Id, Opts], []).
list_to_atom("emqttd_pubsub_" ++ integer_to_list(I)).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc Create topic. Notice That this transaction is not protected by pubsub pool.
%% Create topic.
%%
%% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec create(binary()) -> {atomic, ok} | {aborted, Reason :: any()}. -spec create(Topic :: binary()) -> ok | {error, Error :: any()}.
create(Topic) when is_binary(Topic) -> create(Topic) when is_binary(Topic) ->
TopicRecord = #mqtt_topic{topic = Topic, node = node()}, TopicR = #mqtt_topic{topic = Topic, node = node()},
Result = mnesia:transaction(fun create_topic/1, [TopicRecord]), case mnesia:transaction(fun add_topic/1, [TopicR]) of
setstats(topics), Result. {atomic, ok} -> setstats(topics), ok;
{aborted, Error} -> {error, Error}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc Subscribe topic.
%% Subscribe topic or topics.
%%
%% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when -spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when
Topic :: binary(), Topic :: binary(),
Qos :: mqtt_qos(). Qos :: mqtt_qos().
subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
call({subscribe, self(), Topic, Qos});
subscribe(Topics = [{_Topic, _Qos} | _]) -> subscribe(Topics = [{_Topic, _Qos} | _]) ->
{ok, lists:map(fun({Topic, Qos}) -> call({subscribe, self(), Topics}).
case subscribe(Topic, Qos) of
{ok, GrantedQos} ->
GrantedQos;
{error, Error} ->
lager:error("subscribe '~s' error: ~p", [Topic, Error]),
?SUBACK_ERR
end
end, Topics)}.
-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()} | {error, any()}. %% @doc Unsubscribe Topic or Topics
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
case create(Topic) of
{atomic, ok} ->
Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = self()},
ets:insert_new(?SUBSCRIBER_TAB, Subscriber),
{ok, Qos}; % Grant all qos
{aborted, Reason} ->
{error, Reason}.
%%------------------------------------------------------------------------------
%% @doc
%% Unsubscribe Topic or Topics
%%
%% @end
%%------------------------------------------------------------------------------
-spec unsubscribe(binary() | list(binary())) -> ok. -spec unsubscribe(binary() | list(binary())) -> ok.
unsubscribe(Topic) when is_binary(Topic) -> unsubscribe(Topic) when is_binary(Topic) ->
Pattern = #mqtt_subscriber{topic = Topic, _ = '_', pid = self()}, cast({unsubscribe, self(), Topic});
ets:match_delete(?SUBSCRIBER_TAB, Pattern),
TopicRecord = #mqtt_topic{topic = Topic, node = node()},
F = fun() ->
%%TODO record name...
[mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)],
try_remove_topic(TopicRecord)
end,
%{atomic, _} = mneisa:transaction(F),
ok;
unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
lists:foreach(fun(T) -> unsubscribe(T) end, Topics). cast({unsubscribe, self(), Topics}).
call(Req) ->
Pid = gproc_pool:pick_worker(?POOL, self()),
lager:info("~p call ~p", [self(), Pid]),
gen_server:call(Pid, Req, infinity).
cast(Msg) ->
Pid = gproc_pool:pick_worker(?POOL, self()),
gen_server:cast(Pid, Msg).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc Publish to cluster nodes.
%% Publish to cluster node.
%%
%% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(Msg :: mqtt_message()) -> ok. -spec publish(Msg :: mqtt_message()) -> ok.
publish(Msg=#mqtt_message{topic=Topic}) -> publish(Msg=#mqtt_message{topic=Topic}) ->
@ -184,107 +151,113 @@ publish(Topic, Msg) when is_binary(Topic) ->
end end
end, match(Topic)). end, match(Topic)).
%%------------------------------------------------------------------------------ %% @doc Dispatch message locally. should only be called by publish.
%% @doc
%% Dispatch Locally. Should only be called by publish.
%%
%% @end
%%------------------------------------------------------------------------------
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
case ets:lookup:(?SUBSCRIBER_TAB, Topic) of Subscribers = mnesia:dirty_read(subscriber, Topic),
[] -> setstats(dropped, Subscribers =:= []), %%TODO:...
%%TODO: not right when clusted...
setstats(dropped);
Subscribers ->
lists:foreach( lists:foreach(
fun(#mqtt_subscriber{qos = SubQos, subpid=SubPid}) -> fun(#mqtt_subscriber{qos = SubQos, pid=SubPid}) ->
Msg1 = if Msg1 = if
Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
true -> Msg true -> Msg
end, end,
SubPid ! {dispatch, {self(), Msg1}} SubPid ! {dispatch, {self(), Msg1}}
end, Subscribers) end, Subscribers),
end. length(Subscribers).
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% Match topic.
%%
%% @end
%%------------------------------------------------------------------------------
-spec match(Topic :: binary()) -> [mqtt_topic()]. -spec match(Topic :: binary()) -> [mqtt_topic()].
match(Topic) when is_binary(Topic) -> match(Topic) when is_binary(Topic) ->
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]), MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]),
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). lists:append([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
init([]) -> init([Id, _Opts]) ->
%%TODO: really need?
process_flag(priority, high),
process_flag(min_heap_size, 1024*1024), process_flag(min_heap_size, 1024*1024),
mnesia:subscribe({table, topic, simple}), gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
mnesia:subscribe({table, subscriber, simple}), {ok, #state{id = Id, submap = maps:new()}}.
{ok, #state{submap = maps:new()}}.
handle_call({subscribe, SubPid, Topics}, _From, State) ->
TopicSubs = lists:map(fun({Topic, Qos}) ->
{#mqtt_topic{topic = Topic, node = node()},
#mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}
end, Topics),
F = fun() ->
lists:map(fun add_subscriber/1, TopicSubs)
end,
case mnesia:transaction(F) of
{atomic, _Result} ->
setstats(all),
NewState = monitor_subscriber(SubPid, State),
%% grant all qos
{reply, {ok, [Qos || {_Topic, Qos} <- Topics]}, NewState};
{aborted, Error} ->
{reply, {error, Error}, State}
end;
handle_call({subscribe, SubPid, Topic, Qos}, _From, State) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid},
case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of
{atomic, ok} ->
setstats(all),
{reply, {ok, Qos}, monitor_subscriber(SubPid, State)};
{aborted, Error} ->
{reply, {error, Error}, State}
end;
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("Bad Request: ~p", [Req]), lager:error("Bad Request: ~p", [Req]),
{reply, {error, badreq}, State}. {reply, {error, badreq}, State}.
handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
TopicSubs = lists:map(fun(Topic) ->
{#mqtt_topic{topic = Topic, node = node()},
#mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}}
end, Topics),
F = fun() -> lists:foreach(fun remove_subscriber/1, TopicSubs) end,
case mnesia:transaction(F) of
{atomic, _} -> ok;
{aborted, Error} -> lager:error("unsubscribe ~p error: ~p", [Topics, Error])
end,
setstats(all),
{noreply, State};
handle_cast({unsubscribe, SubPid, Topic}, State) ->
TopicR = #mqtt_topic{topic = Topic, node = node()},
Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid},
case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of
{atomic, _} -> ok;
{aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error])
end,
setstats(all),
{noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Bad Msg: ~p", [Msg]), lager:error("Bad Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({mnesia_table_event, {write, #mqtt_subscriber{subpid = Pid}, _ActivityId}},
State = #state{submap = SubMap}) ->
NewSubMap =
case maps:is_key(Pid, SubMap) of
false ->
maps:put(Pid, erlang:monitor(process, Pid), SubMap);
true ->
SubMap
end,
setstats(subscribers),
{noreply, State#state{submap = NewSubMap}};
handle_info({mnesia_table_event, {write, #mqtt_topic{}, _ActivityId}}, State) ->
%%TODO: this is not right when clusterd.
setstats(topics),
{noreply, State};
%% {write, #topic{}, _ActivityId}
%% {delete_object, _OldRecord, _ActivityId}
%% {delete, {Tab, Key}, ActivityId}
handle_info({mnesia_table_event, _Event}, State) ->
setstats(topics),
setstats(subscribers),
{noreply, State};
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) -> handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) ->
case maps:is_key(DownPid, SubMap) of case maps:is_key(DownPid, SubMap) of
true -> true ->
Node = node(), Node = node(),
F = fun() -> F = fun() ->
Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid), Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid),
lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) -> lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) ->
mnesia:delete_object(subscriber, Sub, write), mnesia:delete_object(subscriber, Sub, write),
try_remove_topic(#mqtt_topic{topic = Topic, node = Node}) try_remove_topic(#mqtt_topic{topic = Topic, node = Node})
end, Subscribers) end, Subscribers)
end, end,
NewState =
case catch mnesia:transaction(F) of case catch mnesia:transaction(F) of
{atomic, _} -> {atomic, _} -> ok;
State#state{submap = maps:remove(DownPid, SubMap)};
{aborted, Reason} -> {aborted, Reason} ->
lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]), lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason])
State
end, end,
setstats(topics), setstats(subscribers), setstats(all),
{noreply, NewState}; {noreply, State#state{submap = maps:remove(DownPid, SubMap)}};
false -> false ->
lager:error("Unexpected 'DOWN' from ~p", [DownPid]), lager:error("Unexpected 'DOWN' from ~p", [DownPid]),
{noreply, State} {noreply, State}
@ -295,10 +268,13 @@ handle_info(Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
mnesia:unsubscribe({table, topic, simple}), TopicR = #mqtt_topic{_ = '_', node = node()},
mnesia:unsubscribe({table, subscriber, simple}), F = fun() ->
%%TODO: clear topics belongs to this node??? [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)]
ok. %%TODO: remove trie??
end,
mnesia:transaction(F),
setstats(all).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -307,28 +283,44 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
-spec create_topic(#mqtt_topic{}) -> {atomic, ok} | {aborted, any()}. add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
create_topic(TopicRecord = #mqtt_topic{topic = Topic}) ->
case mnesia:wread({topic, Topic}) of case mnesia:wread({topic, Topic}) of
[] -> [] ->
ok = emqttd_trie:insert(Topic), ok = emqttd_trie:insert(Topic),
mnesia:write(topic, TopicRecord, write); mnesia:write(topic, TopicR, write);
Records -> Records ->
case lists:member(TopicRecord, Records) of case lists:member(TopicR, Records) of
true -> true -> ok;
ok; false -> mnesia:write(topic, TopicR, write)
false ->
mnesia:write(topic, TopicRecord, write)
end end
end. end.
insert_subscriber(Subscriber) -> add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
mnesia:write(subscriber, Subscriber, write). case add_topic(TopicR) of
ok ->
mnesia:write(subscriber, Subscriber, write);
Error ->
Error
end.
try_remove_topic(Record = #mqtt_topic{topic = Topic}) -> monitor_subscriber(SubPid, State = #state{submap = SubMap}) ->
NewSubMap = case maps:is_key(SubPid, SubMap) of
false ->
maps:put(SubPid, erlang:monitor(process, SubPid), SubMap);
true ->
SubMap
end,
State#state{submap = NewSubMap}.
remove_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
[mnesia:delete_object(subscriber, Sub, write) ||
Sub <- mnesia:match_object(subscriber, Subscriber, write)],
try_remove_topic(TopicR).
try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
case mnesia:read({subscriber, Topic}) of case mnesia:read({subscriber, Topic}) of
[] -> [] ->
mnesia:delete_object(topic, Record, write), mnesia:delete_object(topic, TopicR, write),
case mnesia:read(topic, Topic) of case mnesia:read(topic, Topic) of
[] -> emqttd_trie:delete(Topic); [] -> emqttd_trie:delete(Topic);
_ -> ok _ -> ok
@ -337,13 +329,23 @@ try_remove_topic(Record = #mqtt_topic{topic = Topic}) ->
ok ok
end. end.
%%%=============================================================================
%%% Stats functions
%%%=============================================================================
setstats(all) ->
setstats(topics),
setstats(subscribers);
setstats(topics) -> setstats(topics) ->
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)); emqttd_broker:setstat('topics/count',
mnesia:table_info(topic, size));
setstats(subscribers) -> setstats(subscribers) ->
emqttd_broker:setstats('subscribers/count', emqttd_broker:setstats('subscribers/count',
'subscribers/max', 'subscribers/max',
mnesia:table_info(subscriber, size)); mnesia:table_info(subscriber, size)).
setstats(dropped) ->
setstats(dropped, false) ->
ignore;
setstats(dropped, true) ->
emqttd_metrics:inc('messages/dropped'). emqttd_metrics:inc('messages/dropped').

View File

@ -43,17 +43,14 @@ start_link(Opts) ->
init([Opts]) -> init([Opts]) ->
Schedulers = erlang:system_info(schedulers), Schedulers = erlang:system_info(schedulers),
PoolSize = proplists:get_value(pool, Opts, Schedulers), PoolSize = proplists:get_value(pool_size, Opts, Schedulers),
gproc_pool:new(pubsub, hash, [{size, PoolSize}]), gproc_pool:new(pubsub, hash, [{size, PoolSize}]),
Children = lists:map( Children = lists:map(
fun(I) -> fun(I) ->
gproc_pool:add_worker(pubsub, emqttd_pubsub:name(I), I), Name = {emqttd_pubsub, I},
child(I, Opts) gproc_pool:add_worker(pubsub, Name, I),
{Name, {emqttd_pubsub, start_link, [I, Opts]},
permanent, 5000, worker, [emqttd_pubsub]}
end, lists:seq(1, PoolSize)), end, lists:seq(1, PoolSize)),
{ok, {{one_for_all, 10, 100}, Children}}. {ok, {{one_for_all, 10, 100}, Children}}.
child(I, Opts) ->
{{emqttd_pubsub, I},
{emqttd_pubsub, start_link, [I, Opts]},
permanent, 5000, worker, [emqttd_pubsub]}.

View File

@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session().
publish(Session, ClientId, {?QOS_0, Message}) -> publish(Session, ClientId, {?QOS_0, Message}) ->
emqttd_router:route(ClientId, Message), Session; emqttd_pubsub:publish(Message), Session;
publish(Session, ClientId, {?QOS_1, Message}) -> publish(Session, ClientId, {?QOS_1, Message}) ->
emqttd_router:route(ClientId, Message), Session; emqttd_pubsub:publish(Message), Session;
publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId,
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
puback(SessState = #session_state{clientid = ClientId, puback(SessState = #session_state{clientid = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of case maps:find(PacketId, Awaiting) of
{ok, Msg} -> emqttd_router:route(ClientId, Msg); {ok, Msg} -> emqttd_pubsub:publish(Msg);
error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)};

View File

@ -55,7 +55,9 @@ start_link() ->
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]), erlang:system_monitor(self(), [{long_gc, 5000},
{large_heap, 8 * 1024 * 1024},
busy_port]),
{ok, #state{}}. {ok, #state{}}.
handle_call(Request, _From, State) -> handle_call(Request, _From, State) ->

View File

@ -14,6 +14,7 @@ PubQos | SubQos | In Message | Out Message
2 | 1 | - | - 2 | 1 | - | -
2 | 2 | - | - 2 | 2 | - | -
## Publish ## Publish

View File

@ -24,5 +24,5 @@
#-env ERL_MAX_ETS_TABLES 1024 #-env ERL_MAX_ETS_TABLES 1024
## Tweak GC to run more often ## Tweak GC to run more often
##-env ERL_FULLSWEEP_AFTER 10 ##-env ERL_FULLSWEEP_AFTER 1000
# #

View File

@ -15,6 +15,7 @@
inets, inets,
goldrush, goldrush,
lager, lager,
gproc,
esockd, esockd,
mochiweb, mochiweb,
emqttd emqttd
@ -45,6 +46,7 @@
{app, inets, [{mod_cond, app},{incl_cond, include}]}, {app, inets, [{mod_cond, app},{incl_cond, include}]},
{app, goldrush, [{mod_cond, app}, {incl_cond, include}]}, {app, goldrush, [{mod_cond, app}, {incl_cond, include}]},
{app, lager, [{mod_cond, app}, {incl_cond, include}]}, {app, lager, [{mod_cond, app}, {incl_cond, include}]},
{app, gproc, [{mod_cond, app}, {incl_cond, include}]},
{app, esockd, [{mod_cond, app}, {incl_cond, include}]}, {app, esockd, [{mod_cond, app}, {incl_cond, include}]},
{app, mochiweb, [{mod_cond, app}, {incl_cond, include}]}, {app, mochiweb, [{mod_cond, app}, {incl_cond, include}]},
{app, emqtt, [{mod_cond, app}, {incl_cond, include}]}, {app, emqtt, [{mod_cond, app}, {incl_cond, include}]},