From d35be39df745b33e9ccb89f7d6f85b4e6acfb910 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 10 Aug 2015 00:25:11 +0800 Subject: [PATCH] session statistics --- src/emqttd_mqueue.erl | 5 ++++- src/emqttd_session.erl | 22 +++++++++++++++++++-- src/emqttd_sm.erl | 45 +++++++++++++++++++++++++++++++++++++++++- src/emqttd_sm_sup.erl | 6 ++++++ 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index d9f3c3c58..803104c5e 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -58,7 +58,8 @@ -export([new/3, name/1, is_empty/1, is_full/1, - len/1, in/2, out/1]). + len/1, max_len/1, + in/2, out/1]). -define(LOW_WM, 0.2). @@ -108,6 +109,8 @@ is_full(_MQ) -> false. len(#mqueue{len = Len}) -> Len. +max_len(#mqueue{max_len= MaxLen}) -> MaxLen. + %%------------------------------------------------------------------------------ %% @doc Queue one message. %% @end diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f4f6d1c2d..dbae32bfb 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -232,6 +232,7 @@ init([CleanSess, ClientId, ClientPid]) -> max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, timestamp = os:timestamp()}, + emqttd_sm:register_session(CleanSess, ClientId, info(Session)), {ok, Session, hibernate}. handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, @@ -510,8 +511,8 @@ handle_info(Info, Session = #session{client_id = ClientId}) -> lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), {noreply, Session}. -terminate(_Reason, _Session) -> - ok. +terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> + emqttd_sm:unregister_session(CleanSess, ClientId). code_change(_OldVsn, Session, _Extra) -> {ok, Session}. @@ -629,3 +630,20 @@ cancel_timer(Ref) -> noreply(State) -> {noreply, State, hibernate}. +info(#session{subscriptions = Subscriptions, + inflight_queue = InflightQueue, + max_inflight = MaxInflight, + message_queue = MessageQueue, + awaiting_rel = AwaitingRel, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + timestamp = CreatedAt}) -> + [{pid, self()}, {subscriptions, Subscriptions}, + {max_inflight, MaxInflight}, + {inflight_queue, lists:length(InflightQueue)}, + {message_queue, emqttd_mqueue:len(MessageQueue)}, + {awaiting_rel, maps:size(AwaitingRel)}, + {awaiting_ack, maps:size(AwaitingAck)}, + {awaiting_comp, maps:size(AwaitingComp)}, + {created_at, CreatedAt}]. + diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 0d7ddf9c8..6c79182a9 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -42,6 +42,8 @@ -export([start_session/2, lookup_session/1]). +-export([register_session/3, unregister_session/2]). + -behaviour(gen_server). %% gen_server Function Exports @@ -108,6 +110,29 @@ lookup_session(ClientId) -> [] -> undefined end. +%%------------------------------------------------------------------------------ +%% @doc Register a session with info. +%% @end +%%------------------------------------------------------------------------------ +-spec register_session(CleanSess, ClientId, Info) -> ok when + CleanSess :: boolean(), + ClientId :: binary(), + Info :: [tuple()]. +register_session(CleanSess, ClientId, Info) -> + SM = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:cast(SM, {register, CleanSess, ClientId, Info}). + +%%------------------------------------------------------------------------------ +%% @doc Unregister a session. +%% @end +%%------------------------------------------------------------------------------ +-spec unregister_session(CleanSess, ClientId) -> ok when + CleanSess :: boolean(), + ClientId :: binary(). +unregister_session(CleanSess, ClientId) -> + SM = gproc_pool:pick_worker(?SM_POOL, ClientId), + gen_server:cast(SM, {unregister, CleanSess, ClientId}). + call(SM, Req) -> gen_server:call(SM, Req, infinity). %%%============================================================================= @@ -144,7 +169,25 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call(_Request, _From, State) -> {reply, ok, State}. -handle_cast(_Msg, State) -> +%% transient session +handle_cast({register, true, ClientId, Info}, State) -> + ets:insert(mqtt_transient_session, {ClientId, Info}), + {noreply, State}; + +handle_cast({register, false, ClientId, Info}, State) -> + ets:insert(mqtt_persistent_session, {ClientId, Info}), + {noreply, setstats(State)}; + +handle_cast({unregister, true, ClientId}, State) -> + ets:delete(mqtt_transient_session, ClientId), + {noreply, State}; + +handle_cast({unregister, false, ClientId}, State) -> + ets:delete(mqtt_persistent_session, ClientId), + {noreply, State}; + +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index a597d6dad..468016795 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -43,6 +43,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + init_session_ets(), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), @@ -55,3 +56,8 @@ init([]) -> end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. +init_session_ets() -> + Tables = [mqtt_transient_session, mqtt_persistent_session], + Attrs = [ordered_set, named_table, public, {write_concurrency, true}], + lists:foreach(fun(Tab) -> ets:new(Tab, Attrs) end, Tables). +