diff --git a/src/emqx_app.erl b/src/emqx_app.erl index d5ca8f6ae..4a39e46aa 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -50,8 +50,9 @@ print_banner() -> io:format("Starting ~s on node ~s~n", [?APP, node()]). print_vsn() -> + {ok, Descr} = application:get_key(description), {ok, Vsn} = application:get_key(vsn), - io:format("~s ~s is running now!~n", [?APP, Vsn]). + io:format("~s ~s is running now!~n", [Descr, Vsn]). %%-------------------------------------------------------------------- %% Autocluster diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index f05d63a29..5e921a1c0 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -20,90 +20,98 @@ -export([start_link/0]). --export([lookup_client/1]). --export([register_client/1, register_client/2, unregister_client/1]). - --export([get_client_attrs/1, lookup_client_pid/1]). --export([get_client_stats/1, set_client_stats/2]). +-export([lookup_connection/1]). +-export([register_connection/1, register_connection/2]). +-export([unregister_connection/1]). +-export([get_conn_attrs/1, set_conn_attrs/2]). +-export([get_conn_stats/1, set_conn_stats/2]). +-export([lookup_conn_pid/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {client_pmon}). - -define(CM, ?MODULE). -%% ETS Tables. --define(CLIENT, emqx_client). --define(CLIENT_ATTRS, emqx_client_attrs). --define(CLIENT_STATS, emqx_client_stats). -%% @doc Start the client manager. --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +%% ETS Tables. +-define(CONN_TAB, emqx_conn). +-define(CONN_ATTRS_TAB, emqx_conn_attrs). +-define(CONN_STATS_TAB, emqx_conn_stats). + +%% @doc Start the connection manager. +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?CM}, ?MODULE, [], []). -%% @doc Lookup a client. --spec(lookup_client(client_id()) -> list({client_id(), pid()})). -lookup_client(ClientId) when is_binary(ClientId) -> - ets:lookup(?CLIENT, ClientId). +%% @doc Lookup a connection. +-spec(lookup_connection(client_id()) -> list({client_id(), pid()})). +lookup_connection(ClientId) when is_binary(ClientId) -> + ets:lookup(?CONN_TAB, ClientId). -%% @doc Register a client. --spec(register_client(client_id() | {client_id(), pid()}) -> ok). -register_client(ClientId) when is_binary(ClientId) -> - register_client({ClientId, self()}); +%% @doc Register a connection. +-spec(register_connection(client_id() | {client_id(), pid()}) -> ok). +register_connection(ClientId) when is_binary(ClientId) -> + register_connection({ClientId, self()}); -register_client({ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) -> - register_client({ClientId, ClientPid}, []). +register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> + _ = ets:insert(?CONN_TAB, Conn), + notify({registered, ClientId, ConnPid}). --spec(register_client({client_id(), pid()}, list()) -> ok). -register_client(CObj = {ClientId, ClientPid}, Attrs) when is_binary(ClientId), is_pid(ClientPid) -> - _ = ets:insert(?CLIENT, CObj), - _ = ets:insert(?CLIENT_ATTRS, {CObj, Attrs}), - notify({registered, ClientId, ClientPid}). +-spec(register_connection(client_id() | {client_id(), pid()}, list()) -> ok). +register_connection(ClientId, Attrs) when is_binary(ClientId) -> + register_connection({ClientId, self()}, Attrs); +register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) -> + set_conn_attrs(Conn, Attrs), + register_connection(Conn). -%% @doc Get client attrs --spec(get_client_attrs({client_id(), pid()}) -> list()). -get_client_attrs(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) -> +%% @doc Get conn attrs +-spec(get_conn_attrs({client_id(), pid()}) -> list()). +get_conn_attrs(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> try - ets:lookup_element(?CLIENT_ATTRS, CObj, 2) + ets:lookup_element(?CONN_ATTRS_TAB, Conn, 2) catch error:badarg -> [] end. -%% @doc Unregister a client. --spec(unregister_client(client_id() | {client_id(), pid()}) -> ok). -unregister_client(ClientId) when is_binary(ClientId) -> - unregister_client({ClientId, self()}); +%% @doc Set conn attrs +set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) -> + set_conn_attrs({ClientId, self()}, Attrs); +set_conn_attrs(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) -> + ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}). -unregister_client(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) -> - _ = ets:delete(?CLIENT_STATS, CObj), - _ = ets:delete(?CLIENT_ATTRS, CObj), - _ = ets:delete_object(?CLIENT, CObj), - notify({unregistered, ClientId, ClientPid}). +%% @doc Unregister a conn. +-spec(unregister_connection(client_id() | {client_id(), pid()}) -> ok). +unregister_connection(ClientId) when is_binary(ClientId) -> + unregister_connection({ClientId, self()}); -%% @doc Lookup client pid --spec(lookup_client_pid(client_id()) -> pid() | undefined). -lookup_client_pid(ClientId) when is_binary(ClientId) -> - case ets:lookup(?CLIENT, ClientId) of +unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> + _ = ets:delete(?CONN_STATS_TAB, Conn), + _ = ets:delete(?CONN_ATTRS_TAB, Conn), + _ = ets:delete_object(?CONN_TAB, Conn), + notify({unregistered, ClientId, ConnPid}). + +%% @doc Lookup connection pid +-spec(lookup_conn_pid(client_id()) -> pid() | undefined). +lookup_conn_pid(ClientId) when is_binary(ClientId) -> + case ets:lookup(?CONN_TAB, ClientId) of [] -> undefined; [{_, Pid}] -> Pid end. -%% @doc Get client stats --spec(get_client_stats({client_id(), pid()}) -> list(emqx_stats:stats())). -get_client_stats(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) -> - try ets:lookup_element(?CLIENT_STATS, CObj, 2) +%% @doc Get conn stats +-spec(get_conn_stats({client_id(), pid()}) -> list(emqx_stats:stats())). +get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) -> + try ets:lookup_element(?CONN_STATS_TAB, Conn, 2) catch error:badarg -> [] end. -%% @doc Set client stats. --spec(set_client_stats(client_id(), list(emqx_stats:stats())) -> boolean()). -set_client_stats(ClientId, Stats) when is_binary(ClientId) -> - set_client_stats({ClientId, self()}, Stats); +%% @doc Set conn stats. +-spec(set_conn_stats(client_id(), list(emqx_stats:stats())) -> boolean()). +set_conn_stats(ClientId, Stats) when is_binary(ClientId) -> + set_conn_stats({ClientId, self()}, Stats); -set_client_stats(CObj = {ClientId, ClientPid}, Stats) when is_binary(ClientId), is_pid(ClientPid) -> - ets:insert(?CLIENT_STATS, {CObj, Stats}). +set_conn_stats(Conn = {ClientId, ConnPid}, Stats) when is_binary(ClientId), is_pid(ConnPid) -> + ets:insert(?CONN_STATS_TAB, {Conn, Stats}). notify(Msg) -> gen_server:cast(?CM, {notify, Msg}). @@ -114,52 +122,52 @@ notify(Msg) -> init([]) -> TabOpts = [public, set, {write_concurrency, true}], - _ = emqx_tables:new(?CLIENT, [{read_concurrency, true} | TabOpts]), - _ = emqx_tables:new(?CLIENT_ATTRS, TabOpts), - _ = emqx_tables:new(?CLIENT_STATS, TabOpts), - ok = emqx_stats:update_interval(cm_stats, fun update_client_stats/0), - {ok, #state{client_pmon = emqx_pmon:new()}}. + _ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]), + _ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts), + _ = emqx_tables:new(?CONN_STATS_TAB, TabOpts), + ok = emqx_stats:update_interval(cm_stats, fun update_conn_stats/0), + {ok, #{conn_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_logger:error("[CM] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({notify, {registered, ClientId, Pid}}, State = #state{client_pmon = PMon}) -> - {noreply, State#state{client_pmon = emqx_pmon:monitor(Pid, ClientId, PMon)}}; +handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> + {noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}}; -handle_cast({notify, {unregistered, _ClientId, Pid}}, State = #state{client_pmon = PMon}) -> - {noreply, State#state{client_pmon = emqx_pmon:demonitor(Pid, PMon)}}; +handle_cast({notify, {unregistered, _ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> + {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[CM] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{client_pmon = PMon}) -> - case emqx_pmon:find(DownPid, PMon) of - undefined -> {noreply, State}; - ClientId -> - unregister_client({ClientId, DownPid}), - {noreply, State#state{client_pmon = emqx_pmon:erase(DownPid, PMon)}} +handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := PMon}) -> + case emqx_pmon:find(ConnPid, PMon) of + undefined -> + {noreply, State}; + ClientId -> + unregister_connection({ClientId, ConnPid}), + {noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}} end; handle_info(Info, State) -> emqx_logger:error("[CM] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, _State = #state{}) -> +terminate(_Reason, _State) -> emqx_stats:cancel_update(cm_stats). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -update_client_stats() -> - case ets:info(?CLIENT, size) of +update_conn_stats() -> + case ets:info(?CONN_TAB, size) of undefined -> ok; - Size -> - emqx_stats:setstat('clients/count', 'clients/max', Size) + Size -> emqx_stats:setstat('connections/count', 'connections/max', Size) end. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index d0d0bc90b..c5299d638 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -96,16 +96,16 @@ init([Transport, RawSocket, Options]) -> sendfun => SendFun}, Options), ParserState = emqx_protocol:parser(ProtoState), State = run_socket(#state{transport = Transport, - socket = Socket, - peername = Peername, - await_recv = false, - conn_state = running, - rate_limit = RateLimit, - publish_limit = PubLimit, - proto_state = ProtoState, - parser_state = ParserState, - enable_stats = EnableStats, - idle_timeout = IdleTimout}), + socket = Socket, + peername = Peername, + await_recv = false, + conn_state = running, + rate_limit = RateLimit, + publish_limit = PubLimit, + proto_state = ProtoState, + parser_state = ParserState, + enable_stats = EnableStats, + idle_timeout = IdleTimout}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -187,7 +187,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> handle_info(emit_stats, State = #state{proto_state = ProtoState}) -> Stats = element(2, handle_call(stats, undefined, State)), - emqx_cm:set_client_stats(emqx_protocol:client_id(ProtoState), Stats), + emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info(timeout, State) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 7b4c80754..2945954b1 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -237,7 +237,7 @@ process(?CONNECT_PACKET( case try_open_session(PState3) of {ok, SPid, SP} -> PState4 = PState3#pstate{session = SPid}, - ok = emqx_cm:register_client({client_id(PState4), self()}, info(PState4)), + ok = emqx_cm:register_connection(client_id(PState4), info(PState4)), %% Start keepalive start_keepalive(Keepalive, PState4), %% TODO: 'Run hooks' before open_session? @@ -580,10 +580,10 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> shutdown(_Error, #pstate{client_id = undefined}) -> ignore; shutdown(conflict, #pstate{client_id = ClientId}) -> - emqx_cm:unregister_client(ClientId), + emqx_cm:unregister_connection(ClientId), ignore; shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> - emqx_cm:unregister_client(ClientId), + emqx_cm:unregister_connection(ClientId), ignore; shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], PState), @@ -593,7 +593,7 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> false -> send_willmsg(WillMsg) end, emqx_hooks:run('client.disconnected', [Error], client(PState)), - emqx_cm:unregister_client(ClientId). + emqx_cm:unregister_connection(ClientId). willmsg(Packet, PState = #pstate{client_id = ClientId}) when is_record(Packet, mqtt_packet_connect) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 2dac00263..e31adb141 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -24,7 +24,8 @@ -export([lookup_session/1, lookup_session_pid/1]). -export([resume_session/1, resume_session/2]). -export([discard_session/1, discard_session/2]). --export([register_session/2, get_session_attrs/1, unregister_session/1]). +-export([register_session/2, unregister_session/1]). +-export([get_session_attrs/1, set_session_attrs/2]). -export([get_session_stats/1, set_session_stats/2]). %% Internal functions for rpc @@ -39,10 +40,10 @@ -define(SM, ?MODULE). %% ETS Tables --define(SESSION, emqx_session). --define(SESSION_P, emqx_persistent_session). --define(SESSION_ATTRS, emqx_session_attrs). --define(SESSION_STATS, emqx_session_stats). +-define(SESSION_TAB, emqx_session). +-define(SESSION_P_TAB, emqx_persistent_session). +-define(SESSION_ATTRS_TAB, emqx_session_attrs). +-define(SESSION_STATS_TAB, emqx_session_stats). -spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> @@ -125,41 +126,44 @@ register_session(ClientId, Attrs) when is_binary(ClientId) -> register_session(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) -> - ets:insert(?SESSION, Session), - ets:insert(?SESSION_ATTRS, {Session, Attrs}), + ets:insert(?SESSION_TAB, Session), + ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}), case proplists:get_value(clean_start, Attrs, true) of true -> ok; - false -> ets:insert(?SESSION_P, Session) + false -> ets:insert(?SESSION_P_TAB, Session) end, emqx_sm_registry:register_session(Session), notify({registered, ClientId, SPid}). %% @doc Get session attrs --spec(get_session_attrs({client_id(), pid()}) - -> list(emqx_session:attribute())). -get_session_attrs(Session = {ClientId, SPid}) - when is_binary(ClientId), is_pid(SPid) -> - safe_lookup_element(?SESSION_ATTRS, Session, []). +-spec(get_session_attrs({client_id(), pid()}) -> list(emqx_session:attribute())). +get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> + safe_lookup_element(?SESSION_ATTRS_TAB, Session, []). + +%% @doc Set session attrs +set_session_attrs(ClientId, Attrs) when is_binary(ClientId) -> + set_session_attrs({ClientId, self()}, Attrs); +set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) -> + ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}). %% @doc Unregister a session -spec(unregister_session(client_id() | {client_id(), pid()}) -> ok). unregister_session(ClientId) when is_binary(ClientId) -> unregister_session({ClientId, self()}); -unregister_session(Session = {ClientId, SPid}) - when is_binary(ClientId), is_pid(SPid) -> +unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> emqx_sm_registry:unregister_session(Session), - ets:delete(?SESSION_STATS, Session), - ets:delete(?SESSION_ATTRS, Session), - ets:delete_object(?SESSION_P, Session), - ets:delete_object(?SESSION, Session), + ets:delete(?SESSION_STATS_TAB, Session), + ets:delete(?SESSION_ATTRS_TAB, Session), + ets:delete_object(?SESSION_P_TAB, Session), + ets:delete_object(?SESSION_TAB, Session), notify({unregistered, ClientId, SPid}). %% @doc Get session stats -spec(get_session_stats({client_id(), pid()}) -> list(emqx_stats:stats())). get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> - safe_lookup_element(?SESSION_STATS, Session, []). + safe_lookup_element(?SESSION_STATS_TAB, Session, []). %% @doc Set session stats -spec(set_session_stats(client_id() | {client_id(), pid()}, @@ -169,14 +173,14 @@ set_session_stats(ClientId, Stats) when is_binary(ClientId) -> set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) -> - ets:insert(?SESSION_STATS, {Session, Stats}). + ets:insert(?SESSION_STATS_TAB, {Session, Stats}). %% @doc Lookup a session from registry -spec(lookup_session(client_id()) -> list({client_id(), pid()})). lookup_session(ClientId) -> case emqx_sm_registry:is_enabled() of true -> emqx_sm_registry:lookup_session(ClientId); - false -> ets:lookup(?SESSION, ClientId) + false -> ets:lookup(?SESSION_TAB, ClientId) end. %% @doc Dispatch a message to the session. @@ -192,7 +196,7 @@ dispatch(ClientId, Topic, Msg) -> %% @doc Lookup session pid. -spec(lookup_session_pid(client_id()) -> pid() | undefined). lookup_session_pid(ClientId) -> - safe_lookup_element(?SESSION, ClientId, undefined). + safe_lookup_element(?SESSION_TAB, ClientId, undefined). safe_lookup_element(Tab, Key, Default) -> try ets:lookup_element(Tab, Key, 2) @@ -209,12 +213,12 @@ notify(Event) -> init([]) -> TabOpts = [public, set, {write_concurrency, true}], - _ = emqx_tables:new(?SESSION, [{read_concurrency, true} | TabOpts]), - _ = emqx_tables:new(?SESSION_P, TabOpts), - _ = emqx_tables:new(?SESSION_ATTRS, TabOpts), - _ = emqx_tables:new(?SESSION_STATS, TabOpts), + _ = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]), + _ = emqx_tables:new(?SESSION_P_TAB, TabOpts), + _ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), + _ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), emqx_stats:update_interval(sm_stats, stats_fun()), - {ok, #state{session_pmon = emqx_pmon:new()}}. + {ok, #{session_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_logger:error("[SM] unexpected call: ~p", [Req]), @@ -230,12 +234,12 @@ handle_cast(Msg, State) -> emqx_logger:error("[SM] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{session_pmon = PMon}) -> +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) -> case emqx_pmon:find(DownPid, PMon) of undefined -> {noreply, State}; ClientId -> unregister_session({ClientId, DownPid}), - {noreply, State#state{session_pmon = emqx_pmon:erase(DownPid, PMon)}} + {noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}} end; handle_info(Info, State) -> @@ -254,8 +258,8 @@ code_change(_OldVsn, State, _Extra) -> stats_fun() -> fun() -> - safe_update_stats(?SESSION, 'sessions/count', 'sessions/max'), - safe_update_stats(?SESSION_P, 'sessions/persistent/count', 'sessions/persistent/max') + safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'), + safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max') end. safe_update_stats(Tab, Stat, MaxStat) -> diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 479021bf0..8c74e05bc 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -20,11 +20,10 @@ -export([start_link/0]). -%% Get all stats --export([all/0]). - %% Stats API. --export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]). +-export([getstats/0, getstat/1]). +-export([setstat/2, setstat/3]). +-export([statsfun/1, statsfun/2]). -export([update_interval/2, update_interval/3, cancel_update/1]). %% gen_server callbacks @@ -35,13 +34,12 @@ -record(state, {timer, updates :: #update{}}). -type(stats() :: list({atom(), non_neg_integer()})). - -export_type([stats/0]). -%% Client stats --define(CLIENT_STATS, [ - 'clients/count', % clients connected current - 'clients/max' % maximum clients connected +%% Connection stats +-define(CONNECTION_STATS, [ + 'connections/count', % current connections + 'connections/max' % maximum connections connected ]). %% Session stats @@ -77,14 +75,10 @@ -define(SERVER, ?MODULE). %% @doc Start stats server --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%% Get all stats. --spec(all() -> stats()). -all() -> getstats(). - %% @doc Generate stats fun -spec(statsfun(Stat :: atom()) -> fun()). statsfun(Stat) -> @@ -139,13 +133,13 @@ rec(Name, Secs, UpFun) -> cast(Msg) -> gen_server:cast(?SERVER, Msg). -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([]) -> _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), - Stats = lists:append([?CLIENT_STATS, ?SESSION_STATS, ?PUBSUB_STATS, + Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS, ?ROUTE_STATS, ?RETAINED_STATS]), true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]), {ok, start_timer(#state{updates = []}), hibernate}. @@ -185,19 +179,19 @@ handle_cast(Msg, State) -> emqx_logger:error("[Stats] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({timeout, TRef, tick}, State = #state{timer= TRef, updates = Updates}) -> - lists:foldl( - fun(Update = #update{name = Name, countdown = C, interval = I, - func = UpFun}, Acc) when C =< 0 -> - try UpFun() - catch _:Error -> - emqx_logger:error("[Stats] update ~s error: ~p", [Name, Error]) - end, - [Update#update{countdown = I} | Acc]; - (Update = #update{countdown = C}, Acc) -> - [Update#update{countdown = C - 1} | Acc] - end, [], Updates), - {noreply, start_timer(State), hibernate}; +handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) -> + Updates1 = lists:foldl( + fun(Update = #update{name = Name, countdown = C, interval = I, + func = UpFun}, Acc) when C =< 0 -> + try UpFun() + catch _:Error -> + emqx_logger:error("[Stats] update ~s error: ~p", [Name, Error]) + end, + [Update#update{countdown = I} | Acc]; + (Update = #update{countdown = C}, Acc) -> + [Update#update{countdown = C - 1} | Acc] + end, [], Updates), + {noreply, start_timer(State#state{updates = Updates1}), hibernate}; handle_info(Info, State) -> emqx_logger:error("[Stats] unexpected info: ~p", [Info]), @@ -209,9 +203,9 @@ terminate(_Reason, #state{timer = TRef}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ safe_update_element(Key, Val) -> try ets:update_element(?TAB, Key, {2, Val}) diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 57dc41703..5001f733b 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -114,7 +114,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi publish(version, Version), publish(sysdescr, Descr), publish(brokers, ekka_mnesia:running_nodes()), - publish(stats, emqx_stats:all()), + publish(stats, emqx_stats:getstats()), publish(metrics, emqx_metrics:all()), {noreply, tick(State), hibernate}; diff --git a/src/emqx_types.erl b/src/emqx_types.erl index eeca513a6..32654b121 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -16,11 +16,14 @@ %%-include("emqx.hrl"). +-export_type([startlink_ret/0]). -export_type([zone/0, client_id/0, username/0, password/0, peername/0, protocol/0, credentials/0]). -export_type([payload/0]). %%-export_type([payload/0, message/0, delivery/0]). +-type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}). + -type(zone() :: atom()). -type(client_id() :: binary() | atom()). -type(username() :: binary() | undefined). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index d488097bd..0a29d298c 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -47,7 +47,7 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(WSLOG(Level, Format, Args, State), - lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). + emqx_logger:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). %%------------------------------------------------------------------------------ %% API @@ -84,7 +84,6 @@ call(WSPid, Req) -> %%------------------------------------------------------------------------------ init(Req, Opts) -> - io:format("Opts: ~p~n", [Opts]), case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> {cowboy_websocket, Req, #state{}}; @@ -200,7 +199,7 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> websocket_info(emit_stats, State = #state{proto_state = ProtoState}) -> Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]), - emqx_cm:set_client_stats(emqx_protocol:clientid(ProtoState), Stats), + emqx_cm:set_conn_stats(emqx_protocol:clientid(ProtoState), Stats), {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) ->