From db39ea7745fdaf9bb89824aafb409a34e1121cd8 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 7 Mar 2015 22:47:06 +0800 Subject: [PATCH] metrics, broker test --- apps/emqtt/include/emqtt_topic.hrl | 9 ++-- apps/emqtt/src/emqtt_app.erl | 3 +- apps/emqtt/src/emqtt_broker.erl | 2 +- apps/emqtt/src/emqtt_metrics.erl | 78 +++++++++++++++++++++++++---- apps/emqtt/src/emqtt_pubsub.erl | 10 +++- apps/emqtt/src/emqtt_serialiser.erl | 3 ++ 6 files changed, 86 insertions(+), 19 deletions(-) diff --git a/apps/emqtt/include/emqtt_topic.hrl b/apps/emqtt/include/emqtt_topic.hrl index 364606c58..7c3e9eddf 100644 --- a/apps/emqtt/include/emqtt_topic.hrl +++ b/apps/emqtt/include/emqtt_topic.hrl @@ -24,9 +24,9 @@ %% Core PubSub Topic %%------------------------------------------------------------------------------ -record(topic, { - name :: binary(), - type :: static | dynamic | bridge, - node :: node() + name :: binary(), + %type = dynamic :: static | dynamic | bridge, + node :: node() }). -type topic() :: #topic{}. @@ -40,8 +40,7 @@ -record(topic_trie_node, { node_id :: binary() | atom(), edge_count = 0 :: non_neg_integer(), - topic :: binary(), - type = dynamic :: dynamic | static + topic :: binary() }). -record(topic_trie_edge, { diff --git a/apps/emqtt/src/emqtt_app.erl b/apps/emqtt/src/emqtt_app.erl index e167c1324..d94347c62 100644 --- a/apps/emqtt/src/emqtt_app.erl +++ b/apps/emqtt/src/emqtt_app.erl @@ -62,6 +62,7 @@ start_servers(Sup) -> {ok, SessOpts} = application:get_env(session), {ok, RetainOpts} = application:get_env(retain), {ok, BrokerOpts} = application:get_env(broker), + {ok, MetricOpts} = application:get_env(metrics), lists:foreach( fun({Name, F}) when is_function(F) -> ?PRINT("~s is starting...", [Name]), @@ -85,7 +86,7 @@ start_servers(Sup) -> {"emqtt pubsub", emqtt_pubsub}, {"emqtt router", emqtt_router}, {"emqtt broker", emqtt_broker, BrokerOpts}, - {"emqtt metrics", emqtt_metrics}, + {"emqtt metrics", emqtt_metrics, MetricOpts}, {"emqtt monitor", emqtt_monitor} ]). diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl index 04b44d2ef..c195f1a4d 100644 --- a/apps/emqtt/src/emqtt_broker.erl +++ b/apps/emqtt/src/emqtt_broker.erl @@ -73,7 +73,7 @@ uptime() -> init([Options]) -> SysInterval = proplists:get_value(sys_interval, Options, 60), % Create $SYS Topics - [emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER], + [{atomic, _} = emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER], ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]), State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, {ok, tick(State)}. diff --git a/apps/emqtt/src/emqtt_metrics.erl b/apps/emqtt/src/emqtt_metrics.erl index 2a24faa95..53e08f7e1 100644 --- a/apps/emqtt/src/emqtt_metrics.erl +++ b/apps/emqtt/src/emqtt_metrics.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqtt_metrics). +-include("emqtt_packet.hrl"). + -include("emqtt_topic.hrl"). -behaviour(gen_server). @@ -58,32 +60,81 @@ start_link(Options) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). +%%------------------------------------------------------------------------------ +%% @doc +%% Get all metrics. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec all() -> [{atom(), non_neg_integer()}]. all() -> maps:to_list( - lists:foldl( + ets:foldl( fun({{Metric, _N}, Val}, Map) -> case maps:find(Metric, Map) of - {ok, Count} -> maps:put(Metric, Count+Val); - error -> maps:put(Metric, 0) + {ok, Count} -> maps:put(Metric, Count+Val, Map); + error -> maps:put(Metric, 0, Map) end - end, #{}, ets:tab2list(?TABLE))). + end, #{}, ?TABLE)). +%%------------------------------------------------------------------------------ +%% @doc +%% Get metric value +%% +%% @end +%%------------------------------------------------------------------------------ +-spec value(atom()) -> non_neg_integer(). value(Metric) -> lists:sum(ets:select(?TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). +%%------------------------------------------------------------------------------ +%% @doc +%% Increase metric value +%% +%% @end +%%------------------------------------------------------------------------------ +-spec inc(atom()) -> non_neg_integer(). inc(Metric) -> inc(Metric, 1). +%%------------------------------------------------------------------------------ +%% @doc +%% Increase metric value +%% +%% @end +%%------------------------------------------------------------------------------ +-spec inc(atom(), pos_integer()) -> pos_integer(). inc(Metric, Val) -> ets:update_counter(?TABLE, key(Metric), {2, Val}). +%%------------------------------------------------------------------------------ +%% @doc +%% Decrease metric value +%% +%% @end +%%------------------------------------------------------------------------------ +-spec dec(atom()) -> integer(). dec(Metric) -> dec(Metric, 1). +%%------------------------------------------------------------------------------ +%% @doc +%% Decrease metric value +%% +%% @end +%%------------------------------------------------------------------------------ +-spec dec(atom(), pos_integer()) -> integer(). dec(Metric, Val) -> %TODO: ok? ets:update_counter(?TABLE, key(Metric), {2, -Val}). +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% Metric Key +%% +%% @end +%%------------------------------------------------------------------------------ key(Metric) -> {Metric, erlang:system_info(scheduler_id)}. @@ -92,13 +143,13 @@ key(Metric) -> %% ------------------------------------------------------------------ init(Options) -> % $SYS Topics for metrics - [ok = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS], + [{atomic, _} = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS], % Create metrics table ets:new(?TABLE, [set, public, named_table, {write_concurrency, true}]), % Init metrics [new(Metric) || <<"$SYS/broker/", Metric/binary>> <- ?SYSTOP_METRICS], PubInterval = proplists:get_value(pub_interval, Options, 60), - {ok, tick(#state{pub_interval = PubInterval})}. + {ok, tick(#state{pub_interval = PubInterval}), hibernate}. handle_call(get_metrics, _From, State) -> {reply, [], State}; @@ -110,9 +161,14 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info(tick, State) -> - %TODO:... % publish metric message - {noreply, tick(State)}; + lists:foreach( + fun({Metric, Val}) -> + Topic = list_to_binary(atom_to_list(Metric)), + Payload = list_to_binary(integer_to_list(Val)), + publish(<<"$SYS/broker/", Topic/binary>>, Payload) + end, all()), + {noreply, tick(State), hibernate}; handle_info(_Info, State) -> {noreply, State}. @@ -128,10 +184,12 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ new(Metric) -> - Key = list_to_tuple([list_to_atom(binary_to_list(Token)) - || Token <- binary:split(Metric, <<"/">>, [global])]), + Key = list_to_atom(binary_to_list(Metric)), [ets:insert(?TABLE, {{Key, N}, 0}) || N <- lists:seq(1, erlang:system_info(schedulers))]. tick(State = #state{pub_interval = PubInterval}) -> State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}. +publish(Topic, Payload) -> + emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}). + diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index ec80235cf..fd5afe7d8 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -95,9 +95,11 @@ start_link() -> topics() -> mnesia:dirty_all_keys(topic). -%TODO +%% +%% @doc Create static topic. +%% create(Topic) -> - ok. + gen_server:call(?SERVER, {create, Topic}). %% %% @doc Subscribe Topic or Topics @@ -171,6 +173,10 @@ init([]) -> ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), {ok, #state{}}. +handle_call({create, Topic}, _From, State) -> + Result = mnesia:transaction(fun trie_add/1, [Topic]), + {reply, Result , State}; + handle_call({subscribe, Topics, SubPid}, _From, State) -> Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics], Reply = diff --git a/apps/emqtt/src/emqtt_serialiser.erl b/apps/emqtt/src/emqtt_serialiser.erl index ac953830f..c8dadb57f 100644 --- a/apps/emqtt/src/emqtt_serialiser.erl +++ b/apps/emqtt/src/emqtt_serialiser.erl @@ -125,6 +125,9 @@ serialise_variable(PubAck, #mqtt_packet_puback { packet_id = PacketId }, _Payloa serialise_variable(?PINGREQ, undefined, undefined) -> {<<>>, <<>>}; +serialise_variable(?PINGRESP, undefined, undefined) -> + {<<>>, <<>>}; + serialise_variable(?DISCONNECT, undefined, undefined) -> {<<>>, <<>>}.