diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 08667cc0d..3bbd6fbcb 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -61,10 +61,9 @@ Reason :: term(). start(_StartType, _StartArgs) -> print_banner(), - emqttd_mnesia:init(), + emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_services(Sup), - ok = emqttd_mnesia:wait(), {ok, Listeners} = application:get_env(listen), emqttd:open(Listeners), register(emqttd, self()), diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 0f0f66d69..ad32e3463 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -42,7 +42,7 @@ -export([version/0, uptime/0, datetime/0, sysdescr/0]). %% statistics API. --export([getstats/0, getstat/1, setstat/2]). +-export([getstats/0, getstat/1, setstat/2, setstats/3]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -136,9 +136,24 @@ getstat(Name) -> %% %% @end %%------------------------------------------------------------------------------ --spec setstat(atom(), pos_integer()) -> boolean(). -setstat(Name, Val) -> - ets:insert(?BROKER_TABLE, {Name, Val}). +-spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean(). +setstat(Stat, Val) -> + ets:update_element(?BROKER_TABLE, Stat, {2, Val}). + +%%------------------------------------------------------------------------------ +%% @doc +%% Set stats with max. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean(). +setstats(Stat, MaxStat, Val) -> + MaxVal = ets:lookup_element(?BROKER_TABLE, MaxStat, 2), + if + Val > MaxVal -> ets:update_element(?BROKER_TABLE, MaxStat, {2, Val}); + true -> ok + end, + ets:update_element(?BROKER_TABLE, Stat, {2, Val}). %%%============================================================================= %%% gen_server callbacks diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index fbc822203..951425e6c 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -22,7 +22,6 @@ %%% @doc %%% emqttd client manager. %%% -%%% TODO: NEED PG_HASH? %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_cm). @@ -38,8 +37,9 @@ %% API Exports -export([start_link/0]). --export([lookup/1, register/2, unregister/2]). +-export([lookup/1, register/1, unregister/1]). +%% Stats -export([getstats/0]). %% gen_server Function Exports @@ -50,7 +50,7 @@ terminate/2, code_change/3]). --record(state, {max = 0}). +-record(state, {tab}). %%%============================================================================= %%% API @@ -80,15 +80,17 @@ lookup(ClientId) when is_binary(ClientId) -> end. %%------------------------------------------------------------------------------ -%% @doc -%% Register clientId with pid. -%% +%% @doc Register clientId with pid. %% @end %%------------------------------------------------------------------------------ --spec register(ClientId :: binary(), Pid :: pid()) -> ok. -register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> - %%TODO: infinify to block requests when too many clients, this will be redesinged in 0.9.x... - gen_server:call(?SERVER, {register, ClientId, Pid}, infinity). +-spec register(ClientId :: binary()) -> ok. +register(ClientId) when is_binary(ClientId) -> + Pid = self(), + %% this is atomic + case ets:insert_new(?CLIENT_TABLE, {ClientId, Pid, undefined}) of + true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid}); + false -> gen_server:cast(?SERVER, {register, ClientId, Pid}) + end. %%------------------------------------------------------------------------------ %% @doc @@ -96,9 +98,9 @@ register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> %% %% @end %%------------------------------------------------------------------------------ --spec unregister(ClientId :: binary(), Pid :: pid()) -> ok. -unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> - gen_server:cast(?SERVER, {unregister, ClientId, Pid}). +-spec unregister(ClientId :: binary()) -> ok. +unregister(ClientId) when is_binary(ClientId) -> + gen_server:cast(?SERVER, {unregister, ClientId, self()}). %%------------------------------------------------------------------------------ %% @doc @@ -107,37 +109,39 @@ unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> %% @end %%------------------------------------------------------------------------------ getstats() -> - gen_server:call(?SERVER, getstats). + [{Name, emqttd_broker:getstat(Name)} || + Name <- ['clients/count', 'clients/max']]. %%%============================================================================= %%% gen_server callbacks %%%============================================================================= init([]) -> - ets:new(?CLIENT_TABLE, [set, named_table, protected]), - {ok, #state{}}. + TabId = ets:new(?CLIENT_TABLE, [set, + named_table, + public, + {write_concurrency, true}]), + {ok, #state{tab = TabId}}. -handle_call({register, ClientId, Pid}, _From, State) -> - case ets:lookup(?CLIENT_TABLE, ClientId) of - [{_, Pid, _}] -> - lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), +handle_call(Req, _From, State) -> + lager:error("unexpected request: ~p", [Req]), + {reply, {error, badreq}, State}. + +handle_cast({register, ClientId, Pid}, State=#state{tab = Tab}) -> + case registerd(Tab, {ClientId, Pid}) of + true -> ignore; - [{_, OldPid, MRef}] -> - OldPid ! {stop, duplicate_id, Pid}, - erlang:demonitor(MRef), - insert(ClientId, Pid); - [] -> - insert(ClientId, Pid) - end, - {reply, ok, setstats(State)}; + false -> + ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) + end, + {noreply, setstats(State)}; -handle_call(getstats, _From, State = #state{max = Max}) -> - Stats = [{'clients/count', ets:info(?CLIENT_TABLE, size)}, - {'clients/max', Max}], - {reply, Stats, State}; - -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) -> + case ets:update_element(Tab, ClientId, {3, erlang:monitor(process, Pid)}) of + true -> ok; + false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid]) + end, + {noreply, State}; handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(?CLIENT_TABLE, ClientId) of @@ -170,19 +174,23 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= +registerd(Tab, {ClientId, Pid}) -> + case ets:lookup(Tab, ClientId) of + [{_, Pid, _}] -> + lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), + true; + [{_, OldPid, MRef}] -> + lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), + OldPid ! {stop, duplicate_id, Pid}, + erlang:demonitor(MRef), + false; + [] -> + false + end. -insert(ClientId, Pid) -> - ets:insert(?CLIENT_TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}). - -setstats(State = #state{max = Max}) -> - Count = ets:info(?CLIENT_TABLE, size), - emqttd_broker:setstat('clients/count', Count), - if - Count > Max -> - emqttd_broker:setstat('clients/max', Count), - State#state{max = Count}; - true -> - State - end. +setstats(State) -> + emqttd_broker:setstats('clients/count', + 'clients/max', + ets:info(?CLIENT_TABLE, size)), State. diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 23106844c..f99048114 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -68,9 +68,7 @@ cluster([SNode]) -> pong -> application:stop(emqttd), application:stop(esockd), - mnesia:stop(), - mnesia:start(), - mnesia:change_config(extra_db_nodes, [Node]), + emqtt_mnesia:cluster(Node), application:start(esockd), application:start(emqttd), ?PRINT("cluster with ~p successfully.~n", [Node]); diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index f6b6df91b..8cb688936 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -30,26 +30,166 @@ -include("emqttd.hrl"). --export([init/0, wait/0]). +-include("emqttd_topic.hrl"). -init() -> - %case mnesia:system_info(extra_db_nodes) of - % [] -> - % mnesia:stop(), - % mnesia:create_schema([node()]); - % _ -> - % ok - %end, - %ok = mnesia:start(), - create_tables(). +-export([start/0, cluster/1]). +start() -> + case init_schema() of + ok -> + ok; + {error, {_Node, {already_exists, _Node}}} -> + ok; + {error, Reason} -> + lager:error("mnesia init_schema error: ~p", [Reason]) + end, + ok = mnesia:start(), + init_tables(), + wait_for_tables(). + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% init mnesia schema. +%% +%% @end +%%------------------------------------------------------------------------------ +init_schema() -> + case mnesia:system_info(extra_db_nodes) of + [] -> + %% create schema + mnesia:create_schema([node()]); + __ -> + ok + end. + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% init mnesia tables. +%% +%% @end +%%------------------------------------------------------------------------------ +init_tables() -> + case mnesia:system_info(extra_db_nodes) of + [] -> + create_tables(); + _ -> + copy_tables() + end. + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% create tables. +%% +%% @end +%%------------------------------------------------------------------------------ create_tables() -> - mnesia:create_table(mqtt_retained, [ - {type, ordered_set}, - {ram_copies, [node()]}, - {attributes, record_info(fields, mqtt_retained)}]), - mnesia:add_table_copy(mqtt_retained, node(), ram_copies). + %% trie tree tables + ok = create_table(topic_trie_node, [ + {ram_copies, [node()]}, + {record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}]), + ok = create_table(topic_trie, [ + {ram_copies, [node()]}, + {record_name, topic_trie}, + {attributes, record_info(fields, topic_trie)}]), + %% topic table + ok = create_table(topic, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, topic}, + {attributes, record_info(fields, topic)}]), + %% local subscriber table, not shared with other nodes + ok = create_table(topic_subscriber, [ + {type, bag}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_subscriber)}, + {index, [subpid]}, + {local_content, true}]), + %% TODO: retained messages, this table should not be copied... + ok = create_table(mqtt_retained, [ + {type, ordered_set}, + {ram_copies, [node()]}, + {attributes, record_info(fields, mqtt_retained)}]). -wait() -> +create_table(Table, Attrs) -> + case mnesia:create_table(Table, Attrs) of + {atomic, ok} -> ok; + {aborted, {already_exists, Table}} -> ok; + Error -> Error + end. + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% copy tables. +%% +%% @end +%%------------------------------------------------------------------------------ +copy_tables() -> + {atomic, ok} = mnesia:add_table_copy(topic, node(), ram_copies), + {atomic, ok} = mnesia:add_table_copy(topic_trie, node(), ram_copies), + {atomic, ok} = mnesia:add_table_copy(topic_trie_node, node(), ram_copies), + {atomic, ok} = mnesia:add_table_copy(topic_subscriber, node(), ram_copies), + {atomic, ok} = mnesia:add_table_copy(mqtt_retained, node(), ram_copies). + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% wait for tables. +%% +%% @end +%%------------------------------------------------------------------------------ +wait_for_tables() -> + lager:info("local_tables: ~p", [mnesia:system_info(local_tables)]), + %%TODO: is not right? mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity). +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% Simple cluster with another nodes. +%% +%% @end +%%------------------------------------------------------------------------------ +cluster(Node) -> + %% stop mnesia + mnesia:stop(), + ok = wait_for_mnesia(stop), + %% delete mnesia + ok = mnesia:delete_schema(node()), + %% start mnesia + ok = mnesia:start(), + %% connect with extra_db_nodes + case mnesia:change_config(extra_db_nodes, [Node]) of + {ok, []} -> + throw({error, failed_to_connect_extra_db_nodes}); + {ok, Nodes} -> + case lists:member(Node, Nodes) of + true -> lager:info("mnesia connected to extra_db_node '~s' successfully!", [Node]); + false -> lager:error("mnesia failed to connect extra_db_node '~s'!", [Node]) + end + + end, + init_tables(), + wait_for_tables(). + +wait_for_mnesia(stop) -> + case mnesia:system_info(is_running) of + no -> + ok; + stopping -> + lager:info("Waiting for mnesia to stop..."), + timer:sleep(1000), + wait_for_mnesia(stop); + yes -> + {error, mnesia_unexpectedly_running}; + starting -> + {error, mnesia_unexpectedly_starting} + end. + + + + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index e86f3733b..c70e27d77 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -124,7 +124,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = ok -> ClientId1 = clientid(ClientId, State), start_keepalive(KeepAlive), - emqttd_cm:register(ClientId1, self()), + emqttd_cm:register(ClientId1), {?CONNACK_ACCEPT, State1#proto_state{client_id = ClientId1, will_msg = willmsg(Var)}}; {error, Reason}-> @@ -332,7 +332,7 @@ validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(_) -> false. try_unregister(undefined, _) -> ok; -try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId, self()). +try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId). sent_stats(?PACKET(Type)) -> emqttd_metrics:inc('packets/sent'), diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index eb2bf8565..a8381f4cb 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -65,7 +65,7 @@ terminate/2, code_change/3]). --record(state, {max_subs = 0}). +-record(state, {}). %%%============================================================================= %%% API @@ -89,7 +89,9 @@ start_link() -> %%------------------------------------------------------------------------------ -spec getstats() -> [{atom(), non_neg_integer()}]. getstats() -> - gen_server:call(?SERVER, getstats). + [{'topics/count', mnesia:table_info(topic, size)}, + {'subscribers/count', mnesia:table_info(topic_subscriber, size)}, + {'subscribers/max', emqttd_broker:getstat('subscribers/max')}]. %%------------------------------------------------------------------------------ %% @doc @@ -223,43 +225,11 @@ match(Topic) when is_binary(Topic) -> %% ------------------------------------------------------------------ init([]) -> - %% trie and topic tables, will be copied by all nodes. - mnesia:create_table(topic_trie_node, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, topic_trie_node)}]), - mnesia:add_table_copy(topic_trie_node, node(), ram_copies), - mnesia:create_table(topic_trie, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, topic_trie)}]), - mnesia:add_table_copy(topic_trie, node(), ram_copies), - Result = - mnesia:create_table(topic, [ - {type, bag}, - {record_name, topic}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic)}]), - io:format("~p~n", [Result]), - mnesia:add_table_copy(topic, node(), ram_copies), mnesia:subscribe({table, topic, simple}), - %% local table, not shared with other table - Result1 = - mnesia:create_table(topic_subscriber, [ - {type, bag}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic_subscriber)}, - {index, [subpid]}, - {local_content, true}]), - mnesia:add_table_copy(topic_subscriber, node(), ram_copies), + %% trie and topic tables, will be copied by all nodes. mnesia:subscribe({table, topic_subscriber, simple}), - io:format("~p~n", [Result1]), {ok, #state{}}. -handle_call(getstats, _From, State = #state{max_subs = Max}) -> - Stats = [{'topics/count', mnesia:table_info(topic, size)}, - {'subscribers/count', mnesia:table_info(topic_subscriber, size)}, - {'subscribers/max', Max}], - {reply, Stats, State}; - handle_call(Req, _From, State) -> lager:error("Bad Req: ~p", [Req]), {reply, error, State}. @@ -293,7 +263,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> {noreply, setstats(State)}; handle_info(Info, State) -> - lager:error("Bad Info: ~p", [Info]), + lager:error("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -405,17 +375,11 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) -> throw({notfound, NodeId}) end. -setstats(State = #state{max_subs = Max}) -> +setstats(State) -> emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)), - SubCount = mnesia:table_info(topic_subscriber, size), - emqttd_broker:setstat('subscribers/count', SubCount), - if - SubCount > Max -> - emqttd_broker:setstat('subscribers/max', SubCount), - State#state{max_subs = SubCount}; - true -> - State - end. + emqttd_broker:setstats('subscribers/count', + 'subscribers/max', + mnesia:table_info(topic_subscriber, size)), State. dropped(true) -> emqttd_metrics:inc('messages/dropped'); diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index d106c04c9..977db4b6b 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -55,7 +55,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {max = 0}). +-record(state, {tab}). %%%============================================================================= %%% API @@ -103,20 +103,20 @@ destroy_session(ClientId) -> init([]) -> process_flag(trap_exit, true), - ets:new(?SESSION_TABLE, [set, protected, named_table]), - {ok, #state{}}. + TabId = ets:new(?SESSION_TABLE, [set, protected, named_table]), + {ok, #state{tab = TabId}}. -handle_call({start_session, ClientId, ClientPid}, _From, State) -> +handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) -> Reply = - case ets:lookup(?SESSION_TABLE, ClientId) of + case ets:lookup(Tab, ClientId) of [{_, SessPid, _MRef}] -> emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> case emqttd_session_sup:start_session(ClientId, ClientPid) of {ok, SessPid} -> - MRef = erlang:monitor(process, SessPid), - ets:insert(?SESSION_TABLE, {ClientId, SessPid, MRef}), + ets:insert(Tab, {ClientId, SessPid, + erlang:monitor(process, SessPid)}), {ok, SessPid}; {error, Error} -> {error, Error} @@ -124,12 +124,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) -> end, {reply, Reply, setstats(State)}; -handle_call({destroy_session, ClientId}, _From, State) -> - case ets:lookup(?SESSION_TABLE, ClientId) of +handle_call({destroy_session, ClientId}, _From, State = #state{tab = Tab}) -> + case ets:lookup(Tab, ClientId) of [{_, SessPid, MRef}] -> - erlang:demonitor(MRef), emqttd_session:destroy(SessPid, ClientId), - ets:delete(?SESSION_TABLE, ClientId); + erlang:demonitor(MRef, [flush]), + ets:delete(Tab, ClientId); [] -> ignore end, @@ -141,8 +141,8 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?SESSION_TABLE, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = Tab}) -> + ets:match_delete(Tab, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -157,15 +157,8 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= - -setstats(State = #state{max = Max}) -> - Count = ets:info(?SESSION_TABLE, size), - emqttd_broker:setstat('sessions/count', Count), - if - Count > Max -> - emqttd_broker:setstat('sessions/max', Count), - State#state{max = Count}; - true -> - State - end. +setstats(State) -> + emqttd_broker:setstats('sessions/count', + 'sessions/max', + ets:info(?SESSION_TABLE, size)), State. diff --git a/rel/reltool.config b/rel/reltool.config index 954f80aac..fd226f03a 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -10,7 +10,7 @@ syntax_tools, ssl, crypto, - mnesia, + %mnesia, os_mon, inets, goldrush,