metrics, broker test

This commit is contained in:
Feng Lee 2015-03-07 22:47:06 +08:00
parent 82772e4e38
commit db39ea7745
6 changed files with 86 additions and 19 deletions

View File

@ -24,9 +24,9 @@
%% Core PubSub Topic
%%------------------------------------------------------------------------------
-record(topic, {
name :: binary(),
type :: static | dynamic | bridge,
node :: node()
name :: binary(),
%type = dynamic :: static | dynamic | bridge,
node :: node()
}).
-type topic() :: #topic{}.
@ -40,8 +40,7 @@
-record(topic_trie_node, {
node_id :: binary() | atom(),
edge_count = 0 :: non_neg_integer(),
topic :: binary(),
type = dynamic :: dynamic | static
topic :: binary()
}).
-record(topic_trie_edge, {

View File

@ -62,6 +62,7 @@ start_servers(Sup) ->
{ok, SessOpts} = application:get_env(session),
{ok, RetainOpts} = application:get_env(retain),
{ok, BrokerOpts} = application:get_env(broker),
{ok, MetricOpts} = application:get_env(metrics),
lists:foreach(
fun({Name, F}) when is_function(F) ->
?PRINT("~s is starting...", [Name]),
@ -85,7 +86,7 @@ start_servers(Sup) ->
{"emqtt pubsub", emqtt_pubsub},
{"emqtt router", emqtt_router},
{"emqtt broker", emqtt_broker, BrokerOpts},
{"emqtt metrics", emqtt_metrics},
{"emqtt metrics", emqtt_metrics, MetricOpts},
{"emqtt monitor", emqtt_monitor}
]).

View File

@ -73,7 +73,7 @@ uptime() ->
init([Options]) ->
SysInterval = proplists:get_value(sys_interval, Options, 60),
% Create $SYS Topics
[emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER],
[{atomic, _} = emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER],
ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]),
State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
{ok, tick(State)}.

View File

@ -26,6 +26,8 @@
%%%-----------------------------------------------------------------------------
-module(emqtt_metrics).
-include("emqtt_packet.hrl").
-include("emqtt_topic.hrl").
-behaviour(gen_server).
@ -58,32 +60,81 @@
start_link(Options) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
%%------------------------------------------------------------------------------
%% @doc
%% Get all metrics.
%%
%% @end
%%------------------------------------------------------------------------------
-spec all() -> [{atom(), non_neg_integer()}].
all() ->
maps:to_list(
lists:foldl(
ets:foldl(
fun({{Metric, _N}, Val}, Map) ->
case maps:find(Metric, Map) of
{ok, Count} -> maps:put(Metric, Count+Val);
error -> maps:put(Metric, 0)
{ok, Count} -> maps:put(Metric, Count+Val, Map);
error -> maps:put(Metric, 0, Map)
end
end, #{}, ets:tab2list(?TABLE))).
end, #{}, ?TABLE)).
%%------------------------------------------------------------------------------
%% @doc
%% Get metric value
%%
%% @end
%%------------------------------------------------------------------------------
-spec value(atom()) -> non_neg_integer().
value(Metric) ->
lists:sum(ets:select(?TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
%%------------------------------------------------------------------------------
%% @doc
%% Increase metric value
%%
%% @end
%%------------------------------------------------------------------------------
-spec inc(atom()) -> non_neg_integer().
inc(Metric) ->
inc(Metric, 1).
%%------------------------------------------------------------------------------
%% @doc
%% Increase metric value
%%
%% @end
%%------------------------------------------------------------------------------
-spec inc(atom(), pos_integer()) -> pos_integer().
inc(Metric, Val) ->
ets:update_counter(?TABLE, key(Metric), {2, Val}).
%%------------------------------------------------------------------------------
%% @doc
%% Decrease metric value
%%
%% @end
%%------------------------------------------------------------------------------
-spec dec(atom()) -> integer().
dec(Metric) ->
dec(Metric, 1).
%%------------------------------------------------------------------------------
%% @doc
%% Decrease metric value
%%
%% @end
%%------------------------------------------------------------------------------
-spec dec(atom(), pos_integer()) -> integer().
dec(Metric, Val) ->
%TODO: ok?
ets:update_counter(?TABLE, key(Metric), {2, -Val}).
%%------------------------------------------------------------------------------
%% @doc
%% @private
%% Metric Key
%%
%% @end
%%------------------------------------------------------------------------------
key(Metric) ->
{Metric, erlang:system_info(scheduler_id)}.
@ -92,13 +143,13 @@ key(Metric) ->
%% ------------------------------------------------------------------
init(Options) ->
% $SYS Topics for metrics
[ok = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS],
[{atomic, _} = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS],
% Create metrics table
ets:new(?TABLE, [set, public, named_table, {write_concurrency, true}]),
% Init metrics
[new(Metric) || <<"$SYS/broker/", Metric/binary>> <- ?SYSTOP_METRICS],
PubInterval = proplists:get_value(pub_interval, Options, 60),
{ok, tick(#state{pub_interval = PubInterval})}.
{ok, tick(#state{pub_interval = PubInterval}), hibernate}.
handle_call(get_metrics, _From, State) ->
{reply, [], State};
@ -110,9 +161,14 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(tick, State) ->
%TODO:...
% publish metric message
{noreply, tick(State)};
lists:foreach(
fun({Metric, Val}) ->
Topic = list_to_binary(atom_to_list(Metric)),
Payload = list_to_binary(integer_to_list(Val)),
publish(<<"$SYS/broker/", Topic/binary>>, Payload)
end, all()),
{noreply, tick(State), hibernate};
handle_info(_Info, State) ->
{noreply, State}.
@ -128,10 +184,12 @@ code_change(_OldVsn, State, _Extra) ->
%% ------------------------------------------------------------------
new(Metric) ->
Key = list_to_tuple([list_to_atom(binary_to_list(Token))
|| Token <- binary:split(Metric, <<"/">>, [global])]),
Key = list_to_atom(binary_to_list(Metric)),
[ets:insert(?TABLE, {{Key, N}, 0}) || N <- lists:seq(1, erlang:system_info(schedulers))].
tick(State = #state{pub_interval = PubInterval}) ->
State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}.
publish(Topic, Payload) ->
emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}).

View File

@ -95,9 +95,11 @@ start_link() ->
topics() ->
mnesia:dirty_all_keys(topic).
%TODO
%%
%% @doc Create static topic.
%%
create(Topic) ->
ok.
gen_server:call(?SERVER, {create, Topic}).
%%
%% @doc Subscribe Topic or Topics
@ -171,6 +173,10 @@ init([]) ->
ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
{ok, #state{}}.
handle_call({create, Topic}, _From, State) ->
Result = mnesia:transaction(fun trie_add/1, [Topic]),
{reply, Result , State};
handle_call({subscribe, Topics, SubPid}, _From, State) ->
Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics],
Reply =

View File

@ -125,6 +125,9 @@ serialise_variable(PubAck, #mqtt_packet_puback { packet_id = PacketId }, _Payloa
serialise_variable(?PINGREQ, undefined, undefined) ->
{<<>>, <<>>};
serialise_variable(?PINGRESP, undefined, undefined) ->
{<<>>, <<>>};
serialise_variable(?DISCONNECT, undefined, undefined) ->
{<<>>, <<>>}.