cluster, and fix broker stats

This commit is contained in:
Ery Lee 2015-04-12 20:36:07 +08:00
parent 37fcb85bd4
commit 16bff40b72
9 changed files with 264 additions and 147 deletions

View File

@ -61,10 +61,9 @@
Reason :: term().
start(_StartType, _StartArgs) ->
print_banner(),
emqttd_mnesia:init(),
emqttd_mnesia:start(),
{ok, Sup} = emqttd_sup:start_link(),
start_services(Sup),
ok = emqttd_mnesia:wait(),
{ok, Listeners} = application:get_env(listen),
emqttd:open(Listeners),
register(emqttd, self()),

View File

@ -42,7 +42,7 @@
-export([version/0, uptime/0, datetime/0, sysdescr/0]).
%% statistics API.
-export([getstats/0, getstat/1, setstat/2]).
-export([getstats/0, getstat/1, setstat/2, setstats/3]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -136,9 +136,24 @@ getstat(Name) ->
%%
%% @end
%%------------------------------------------------------------------------------
-spec setstat(atom(), pos_integer()) -> boolean().
setstat(Name, Val) ->
ets:insert(?BROKER_TABLE, {Name, Val}).
-spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean().
setstat(Stat, Val) ->
ets:update_element(?BROKER_TABLE, Stat, {2, Val}).
%%------------------------------------------------------------------------------
%% @doc
%% Set stats with max.
%%
%% @end
%%------------------------------------------------------------------------------
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
setstats(Stat, MaxStat, Val) ->
MaxVal = ets:lookup_element(?BROKER_TABLE, MaxStat, 2),
if
Val > MaxVal -> ets:update_element(?BROKER_TABLE, MaxStat, {2, Val});
true -> ok
end,
ets:update_element(?BROKER_TABLE, Stat, {2, Val}).
%%%=============================================================================
%%% gen_server callbacks

View File

@ -22,7 +22,6 @@
%%% @doc
%%% emqttd client manager.
%%%
%%% TODO: NEED PG_HASH?
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_cm).
@ -38,8 +37,9 @@
%% API Exports
-export([start_link/0]).
-export([lookup/1, register/2, unregister/2]).
-export([lookup/1, register/1, unregister/1]).
%% Stats
-export([getstats/0]).
%% gen_server Function Exports
@ -50,7 +50,7 @@
terminate/2,
code_change/3]).
-record(state, {max = 0}).
-record(state, {tab}).
%%%=============================================================================
%%% API
@ -80,15 +80,17 @@ lookup(ClientId) when is_binary(ClientId) ->
end.
%%------------------------------------------------------------------------------
%% @doc
%% Register clientId with pid.
%%
%% @doc Register clientId with pid.
%% @end
%%------------------------------------------------------------------------------
-spec register(ClientId :: binary(), Pid :: pid()) -> ok.
register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
%%TODO: infinify to block requests when too many clients, this will be redesinged in 0.9.x...
gen_server:call(?SERVER, {register, ClientId, Pid}, infinity).
-spec register(ClientId :: binary()) -> ok.
register(ClientId) when is_binary(ClientId) ->
Pid = self(),
%% this is atomic
case ets:insert_new(?CLIENT_TABLE, {ClientId, Pid, undefined}) of
true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid});
false -> gen_server:cast(?SERVER, {register, ClientId, Pid})
end.
%%------------------------------------------------------------------------------
%% @doc
@ -96,9 +98,9 @@ register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
%%
%% @end
%%------------------------------------------------------------------------------
-spec unregister(ClientId :: binary(), Pid :: pid()) -> ok.
unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
gen_server:cast(?SERVER, {unregister, ClientId, Pid}).
-spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) ->
gen_server:cast(?SERVER, {unregister, ClientId, self()}).
%%------------------------------------------------------------------------------
%% @doc
@ -107,37 +109,39 @@ unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
%% @end
%%------------------------------------------------------------------------------
getstats() ->
gen_server:call(?SERVER, getstats).
[{Name, emqttd_broker:getstat(Name)} ||
Name <- ['clients/count', 'clients/max']].
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([]) ->
ets:new(?CLIENT_TABLE, [set, named_table, protected]),
{ok, #state{}}.
TabId = ets:new(?CLIENT_TABLE, [set,
named_table,
public,
{write_concurrency, true}]),
{ok, #state{tab = TabId}}.
handle_call({register, ClientId, Pid}, _From, State) ->
case ets:lookup(?CLIENT_TABLE, ClientId) of
[{_, Pid, _}] ->
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
handle_call(Req, _From, State) ->
lager:error("unexpected request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast({register, ClientId, Pid}, State=#state{tab = Tab}) ->
case registerd(Tab, {ClientId, Pid}) of
true ->
ignore;
[{_, OldPid, MRef}] ->
OldPid ! {stop, duplicate_id, Pid},
erlang:demonitor(MRef),
insert(ClientId, Pid);
[] ->
insert(ClientId, Pid)
false ->
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
end,
{reply, ok, setstats(State)};
{noreply, setstats(State)};
handle_call(getstats, _From, State = #state{max = Max}) ->
Stats = [{'clients/count', ets:info(?CLIENT_TABLE, size)},
{'clients/max', Max}],
{reply, Stats, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) ->
case ets:update_element(Tab, ClientId, {3, erlang:monitor(process, Pid)}) of
true -> ok;
false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid])
end,
{noreply, State};
handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(?CLIENT_TABLE, ClientId) of
@ -170,19 +174,23 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
insert(ClientId, Pid) ->
ets:insert(?CLIENT_TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}).
setstats(State = #state{max = Max}) ->
Count = ets:info(?CLIENT_TABLE, size),
emqttd_broker:setstat('clients/count', Count),
if
Count > Max ->
emqttd_broker:setstat('clients/max', Count),
State#state{max = Count};
true ->
State
registerd(Tab, {ClientId, Pid}) ->
case ets:lookup(Tab, ClientId) of
[{_, Pid, _}] ->
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
true;
[{_, OldPid, MRef}] ->
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
OldPid ! {stop, duplicate_id, Pid},
erlang:demonitor(MRef),
false;
[] ->
false
end.
setstats(State) ->
emqttd_broker:setstats('clients/count',
'clients/max',
ets:info(?CLIENT_TABLE, size)), State.

View File

@ -68,9 +68,7 @@ cluster([SNode]) ->
pong ->
application:stop(emqttd),
application:stop(esockd),
mnesia:stop(),
mnesia:start(),
mnesia:change_config(extra_db_nodes, [Node]),
emqtt_mnesia:cluster(Node),
application:start(esockd),
application:start(emqttd),
?PRINT("cluster with ~p successfully.~n", [Node]);

View File

@ -30,26 +30,166 @@
-include("emqttd.hrl").
-export([init/0, wait/0]).
-include("emqttd_topic.hrl").
init() ->
%case mnesia:system_info(extra_db_nodes) of
% [] ->
% mnesia:stop(),
% mnesia:create_schema([node()]);
% _ ->
% ok
%end,
%ok = mnesia:start(),
create_tables().
-export([start/0, cluster/1]).
start() ->
case init_schema() of
ok ->
ok;
{error, {_Node, {already_exists, _Node}}} ->
ok;
{error, Reason} ->
lager:error("mnesia init_schema error: ~p", [Reason])
end,
ok = mnesia:start(),
init_tables(),
wait_for_tables().
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% init mnesia schema.
%%
%% @end
%%------------------------------------------------------------------------------
init_schema() ->
case mnesia:system_info(extra_db_nodes) of
[] ->
%% create schema
mnesia:create_schema([node()]);
__ ->
ok
end.
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% init mnesia tables.
%%
%% @end
%%------------------------------------------------------------------------------
init_tables() ->
case mnesia:system_info(extra_db_nodes) of
[] ->
create_tables();
_ ->
copy_tables()
end.
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% create tables.
%%
%% @end
%%------------------------------------------------------------------------------
create_tables() ->
mnesia:create_table(mqtt_retained, [
%% trie tree tables
ok = create_table(topic_trie_node, [
{ram_copies, [node()]},
{record_name, topic_trie_node},
{attributes, record_info(fields, topic_trie_node)}]),
ok = create_table(topic_trie, [
{ram_copies, [node()]},
{record_name, topic_trie},
{attributes, record_info(fields, topic_trie)}]),
%% topic table
ok = create_table(topic, [
{type, bag},
{ram_copies, [node()]},
{record_name, topic},
{attributes, record_info(fields, topic)}]),
%% local subscriber table, not shared with other nodes
ok = create_table(topic_subscriber, [
{type, bag},
{ram_copies, [node()]},
{attributes, record_info(fields, topic_subscriber)},
{index, [subpid]},
{local_content, true}]),
%% TODO: retained messages, this table should not be copied...
ok = create_table(mqtt_retained, [
{type, ordered_set},
{ram_copies, [node()]},
{attributes, record_info(fields, mqtt_retained)}]),
mnesia:add_table_copy(mqtt_retained, node(), ram_copies).
{attributes, record_info(fields, mqtt_retained)}]).
wait() ->
create_table(Table, Attrs) ->
case mnesia:create_table(Table, Attrs) of
{atomic, ok} -> ok;
{aborted, {already_exists, Table}} -> ok;
Error -> Error
end.
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% copy tables.
%%
%% @end
%%------------------------------------------------------------------------------
copy_tables() ->
{atomic, ok} = mnesia:add_table_copy(topic, node(), ram_copies),
{atomic, ok} = mnesia:add_table_copy(topic_trie, node(), ram_copies),
{atomic, ok} = mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
{atomic, ok} = mnesia:add_table_copy(topic_subscriber, node(), ram_copies),
{atomic, ok} = mnesia:add_table_copy(mqtt_retained, node(), ram_copies).
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% wait for tables.
%%
%% @end
%%------------------------------------------------------------------------------
wait_for_tables() ->
lager:info("local_tables: ~p", [mnesia:system_info(local_tables)]),
%%TODO: is not right?
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% Simple cluster with another nodes.
%%
%% @end
%%------------------------------------------------------------------------------
cluster(Node) ->
%% stop mnesia
mnesia:stop(),
ok = wait_for_mnesia(stop),
%% delete mnesia
ok = mnesia:delete_schema(node()),
%% start mnesia
ok = mnesia:start(),
%% connect with extra_db_nodes
case mnesia:change_config(extra_db_nodes, [Node]) of
{ok, []} ->
throw({error, failed_to_connect_extra_db_nodes});
{ok, Nodes} ->
case lists:member(Node, Nodes) of
true -> lager:info("mnesia connected to extra_db_node '~s' successfully!", [Node]);
false -> lager:error("mnesia failed to connect extra_db_node '~s'!", [Node])
end
end,
init_tables(),
wait_for_tables().
wait_for_mnesia(stop) ->
case mnesia:system_info(is_running) of
no ->
ok;
stopping ->
lager:info("Waiting for mnesia to stop..."),
timer:sleep(1000),
wait_for_mnesia(stop);
yes ->
{error, mnesia_unexpectedly_running};
starting ->
{error, mnesia_unexpectedly_starting}
end.

View File

@ -124,7 +124,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
ok ->
ClientId1 = clientid(ClientId, State),
start_keepalive(KeepAlive),
emqttd_cm:register(ClientId1, self()),
emqttd_cm:register(ClientId1),
{?CONNACK_ACCEPT, State1#proto_state{client_id = ClientId1,
will_msg = willmsg(Var)}};
{error, Reason}->
@ -332,7 +332,7 @@ validate_qos(Qos) when Qos =< ?QOS_2 -> true;
validate_qos(_) -> false.
try_unregister(undefined, _) -> ok;
try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId, self()).
try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId).
sent_stats(?PACKET(Type)) ->
emqttd_metrics:inc('packets/sent'),

View File

@ -65,7 +65,7 @@
terminate/2,
code_change/3]).
-record(state, {max_subs = 0}).
-record(state, {}).
%%%=============================================================================
%%% API
@ -89,7 +89,9 @@ start_link() ->
%%------------------------------------------------------------------------------
-spec getstats() -> [{atom(), non_neg_integer()}].
getstats() ->
gen_server:call(?SERVER, getstats).
[{'topics/count', mnesia:table_info(topic, size)},
{'subscribers/count', mnesia:table_info(topic_subscriber, size)},
{'subscribers/max', emqttd_broker:getstat('subscribers/max')}].
%%------------------------------------------------------------------------------
%% @doc
@ -223,43 +225,11 @@ match(Topic) when is_binary(Topic) ->
%% ------------------------------------------------------------------
init([]) ->
%% trie and topic tables, will be copied by all nodes.
mnesia:create_table(topic_trie_node, [
{ram_copies, [node()]},
{attributes, record_info(fields, topic_trie_node)}]),
mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
mnesia:create_table(topic_trie, [
{ram_copies, [node()]},
{attributes, record_info(fields, topic_trie)}]),
mnesia:add_table_copy(topic_trie, node(), ram_copies),
Result =
mnesia:create_table(topic, [
{type, bag},
{record_name, topic},
{ram_copies, [node()]},
{attributes, record_info(fields, topic)}]),
io:format("~p~n", [Result]),
mnesia:add_table_copy(topic, node(), ram_copies),
mnesia:subscribe({table, topic, simple}),
%% local table, not shared with other table
Result1 =
mnesia:create_table(topic_subscriber, [
{type, bag},
{ram_copies, [node()]},
{attributes, record_info(fields, topic_subscriber)},
{index, [subpid]},
{local_content, true}]),
mnesia:add_table_copy(topic_subscriber, node(), ram_copies),
%% trie and topic tables, will be copied by all nodes.
mnesia:subscribe({table, topic_subscriber, simple}),
io:format("~p~n", [Result1]),
{ok, #state{}}.
handle_call(getstats, _From, State = #state{max_subs = Max}) ->
Stats = [{'topics/count', mnesia:table_info(topic, size)},
{'subscribers/count', mnesia:table_info(topic_subscriber, size)},
{'subscribers/max', Max}],
{reply, Stats, State};
handle_call(Req, _From, State) ->
lager:error("Bad Req: ~p", [Req]),
{reply, error, State}.
@ -293,7 +263,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
{noreply, setstats(State)};
handle_info(Info, State) ->
lager:error("Bad Info: ~p", [Info]),
lager:error("Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
@ -405,17 +375,11 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) ->
throw({notfound, NodeId})
end.
setstats(State = #state{max_subs = Max}) ->
setstats(State) ->
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)),
SubCount = mnesia:table_info(topic_subscriber, size),
emqttd_broker:setstat('subscribers/count', SubCount),
if
SubCount > Max ->
emqttd_broker:setstat('subscribers/max', SubCount),
State#state{max_subs = SubCount};
true ->
State
end.
emqttd_broker:setstats('subscribers/count',
'subscribers/max',
mnesia:table_info(topic_subscriber, size)), State.
dropped(true) ->
emqttd_metrics:inc('messages/dropped');

View File

@ -55,7 +55,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {max = 0}).
-record(state, {tab}).
%%%=============================================================================
%%% API
@ -103,20 +103,20 @@ destroy_session(ClientId) ->
init([]) ->
process_flag(trap_exit, true),
ets:new(?SESSION_TABLE, [set, protected, named_table]),
{ok, #state{}}.
TabId = ets:new(?SESSION_TABLE, [set, protected, named_table]),
{ok, #state{tab = TabId}}.
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) ->
Reply =
case ets:lookup(?SESSION_TABLE, ClientId) of
case ets:lookup(Tab, ClientId) of
[{_, SessPid, _MRef}] ->
emqttd_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid};
[] ->
case emqttd_session_sup:start_session(ClientId, ClientPid) of
{ok, SessPid} ->
MRef = erlang:monitor(process, SessPid),
ets:insert(?SESSION_TABLE, {ClientId, SessPid, MRef}),
ets:insert(Tab, {ClientId, SessPid,
erlang:monitor(process, SessPid)}),
{ok, SessPid};
{error, Error} ->
{error, Error}
@ -124,12 +124,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
end,
{reply, Reply, setstats(State)};
handle_call({destroy_session, ClientId}, _From, State) ->
case ets:lookup(?SESSION_TABLE, ClientId) of
handle_call({destroy_session, ClientId}, _From, State = #state{tab = Tab}) ->
case ets:lookup(Tab, ClientId) of
[{_, SessPid, MRef}] ->
erlang:demonitor(MRef),
emqttd_session:destroy(SessPid, ClientId),
ets:delete(?SESSION_TABLE, ClientId);
erlang:demonitor(MRef, [flush]),
ets:delete(Tab, ClientId);
[] ->
ignore
end,
@ -141,8 +141,8 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(?SESSION_TABLE, {'_', DownPid, MRef}),
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = Tab}) ->
ets:match_delete(Tab, {'_', DownPid, MRef}),
{noreply, setstats(State)};
handle_info(_Info, State) ->
@ -157,15 +157,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
setstats(State = #state{max = Max}) ->
Count = ets:info(?SESSION_TABLE, size),
emqttd_broker:setstat('sessions/count', Count),
if
Count > Max ->
emqttd_broker:setstat('sessions/max', Count),
State#state{max = Count};
true ->
State
end.
setstats(State) ->
emqttd_broker:setstats('sessions/count',
'sessions/max',
ets:info(?SESSION_TABLE, size)), State.

View File

@ -10,7 +10,7 @@
syntax_tools,
ssl,
crypto,
mnesia,
%mnesia,
os_mon,
inets,
goldrush,