Update connection, session, stats modules
This commit is contained in:
parent
a19daee353
commit
1607e576de
|
@ -50,8 +50,9 @@ print_banner() ->
|
||||||
io:format("Starting ~s on node ~s~n", [?APP, node()]).
|
io:format("Starting ~s on node ~s~n", [?APP, node()]).
|
||||||
|
|
||||||
print_vsn() ->
|
print_vsn() ->
|
||||||
|
{ok, Descr} = application:get_key(description),
|
||||||
{ok, Vsn} = application:get_key(vsn),
|
{ok, Vsn} = application:get_key(vsn),
|
||||||
io:format("~s ~s is running now!~n", [?APP, Vsn]).
|
io:format("~s ~s is running now!~n", [Descr, Vsn]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Autocluster
|
%% Autocluster
|
||||||
|
|
162
src/emqx_cm.erl
162
src/emqx_cm.erl
|
@ -20,90 +20,98 @@
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([lookup_client/1]).
|
-export([lookup_connection/1]).
|
||||||
-export([register_client/1, register_client/2, unregister_client/1]).
|
-export([register_connection/1, register_connection/2]).
|
||||||
|
-export([unregister_connection/1]).
|
||||||
-export([get_client_attrs/1, lookup_client_pid/1]).
|
-export([get_conn_attrs/1, set_conn_attrs/2]).
|
||||||
-export([get_client_stats/1, set_client_stats/2]).
|
-export([get_conn_stats/1, set_conn_stats/2]).
|
||||||
|
-export([lookup_conn_pid/1]).
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
-record(state, {client_pmon}).
|
|
||||||
|
|
||||||
-define(CM, ?MODULE).
|
-define(CM, ?MODULE).
|
||||||
%% ETS Tables.
|
|
||||||
-define(CLIENT, emqx_client).
|
|
||||||
-define(CLIENT_ATTRS, emqx_client_attrs).
|
|
||||||
-define(CLIENT_STATS, emqx_client_stats).
|
|
||||||
|
|
||||||
%% @doc Start the client manager.
|
%% ETS Tables.
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
-define(CONN_TAB, emqx_conn).
|
||||||
|
-define(CONN_ATTRS_TAB, emqx_conn_attrs).
|
||||||
|
-define(CONN_STATS_TAB, emqx_conn_stats).
|
||||||
|
|
||||||
|
%% @doc Start the connection manager.
|
||||||
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
||||||
|
|
||||||
%% @doc Lookup a client.
|
%% @doc Lookup a connection.
|
||||||
-spec(lookup_client(client_id()) -> list({client_id(), pid()})).
|
-spec(lookup_connection(client_id()) -> list({client_id(), pid()})).
|
||||||
lookup_client(ClientId) when is_binary(ClientId) ->
|
lookup_connection(ClientId) when is_binary(ClientId) ->
|
||||||
ets:lookup(?CLIENT, ClientId).
|
ets:lookup(?CONN_TAB, ClientId).
|
||||||
|
|
||||||
%% @doc Register a client.
|
%% @doc Register a connection.
|
||||||
-spec(register_client(client_id() | {client_id(), pid()}) -> ok).
|
-spec(register_connection(client_id() | {client_id(), pid()}) -> ok).
|
||||||
register_client(ClientId) when is_binary(ClientId) ->
|
register_connection(ClientId) when is_binary(ClientId) ->
|
||||||
register_client({ClientId, self()});
|
register_connection({ClientId, self()});
|
||||||
|
|
||||||
register_client({ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
register_client({ClientId, ClientPid}, []).
|
_ = ets:insert(?CONN_TAB, Conn),
|
||||||
|
notify({registered, ClientId, ConnPid}).
|
||||||
|
|
||||||
-spec(register_client({client_id(), pid()}, list()) -> ok).
|
-spec(register_connection(client_id() | {client_id(), pid()}, list()) -> ok).
|
||||||
register_client(CObj = {ClientId, ClientPid}, Attrs) when is_binary(ClientId), is_pid(ClientPid) ->
|
register_connection(ClientId, Attrs) when is_binary(ClientId) ->
|
||||||
_ = ets:insert(?CLIENT, CObj),
|
register_connection({ClientId, self()}, Attrs);
|
||||||
_ = ets:insert(?CLIENT_ATTRS, {CObj, Attrs}),
|
register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
notify({registered, ClientId, ClientPid}).
|
set_conn_attrs(Conn, Attrs),
|
||||||
|
register_connection(Conn).
|
||||||
|
|
||||||
%% @doc Get client attrs
|
%% @doc Get conn attrs
|
||||||
-spec(get_client_attrs({client_id(), pid()}) -> list()).
|
-spec(get_conn_attrs({client_id(), pid()}) -> list()).
|
||||||
get_client_attrs(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
get_conn_attrs(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
try
|
try
|
||||||
ets:lookup_element(?CLIENT_ATTRS, CObj, 2)
|
ets:lookup_element(?CONN_ATTRS_TAB, Conn, 2)
|
||||||
catch
|
catch
|
||||||
error:badarg -> []
|
error:badarg -> []
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Unregister a client.
|
%% @doc Set conn attrs
|
||||||
-spec(unregister_client(client_id() | {client_id(), pid()}) -> ok).
|
set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) ->
|
||||||
unregister_client(ClientId) when is_binary(ClientId) ->
|
set_conn_attrs({ClientId, self()}, Attrs);
|
||||||
unregister_client({ClientId, self()});
|
set_conn_attrs(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
|
ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}).
|
||||||
|
|
||||||
unregister_client(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
%% @doc Unregister a conn.
|
||||||
_ = ets:delete(?CLIENT_STATS, CObj),
|
-spec(unregister_connection(client_id() | {client_id(), pid()}) -> ok).
|
||||||
_ = ets:delete(?CLIENT_ATTRS, CObj),
|
unregister_connection(ClientId) when is_binary(ClientId) ->
|
||||||
_ = ets:delete_object(?CLIENT, CObj),
|
unregister_connection({ClientId, self()});
|
||||||
notify({unregistered, ClientId, ClientPid}).
|
|
||||||
|
|
||||||
%% @doc Lookup client pid
|
unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
-spec(lookup_client_pid(client_id()) -> pid() | undefined).
|
_ = ets:delete(?CONN_STATS_TAB, Conn),
|
||||||
lookup_client_pid(ClientId) when is_binary(ClientId) ->
|
_ = ets:delete(?CONN_ATTRS_TAB, Conn),
|
||||||
case ets:lookup(?CLIENT, ClientId) of
|
_ = ets:delete_object(?CONN_TAB, Conn),
|
||||||
|
notify({unregistered, ClientId, ConnPid}).
|
||||||
|
|
||||||
|
%% @doc Lookup connection pid
|
||||||
|
-spec(lookup_conn_pid(client_id()) -> pid() | undefined).
|
||||||
|
lookup_conn_pid(ClientId) when is_binary(ClientId) ->
|
||||||
|
case ets:lookup(?CONN_TAB, ClientId) of
|
||||||
[] -> undefined;
|
[] -> undefined;
|
||||||
[{_, Pid}] -> Pid
|
[{_, Pid}] -> Pid
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Get client stats
|
%% @doc Get conn stats
|
||||||
-spec(get_client_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
|
-spec(get_conn_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
|
||||||
get_client_stats(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid(ClientPid) ->
|
get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
try ets:lookup_element(?CLIENT_STATS, CObj, 2)
|
try ets:lookup_element(?CONN_STATS_TAB, Conn, 2)
|
||||||
catch
|
catch
|
||||||
error:badarg -> []
|
error:badarg -> []
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Set client stats.
|
%% @doc Set conn stats.
|
||||||
-spec(set_client_stats(client_id(), list(emqx_stats:stats())) -> boolean()).
|
-spec(set_conn_stats(client_id(), list(emqx_stats:stats())) -> boolean()).
|
||||||
set_client_stats(ClientId, Stats) when is_binary(ClientId) ->
|
set_conn_stats(ClientId, Stats) when is_binary(ClientId) ->
|
||||||
set_client_stats({ClientId, self()}, Stats);
|
set_conn_stats({ClientId, self()}, Stats);
|
||||||
|
|
||||||
set_client_stats(CObj = {ClientId, ClientPid}, Stats) when is_binary(ClientId), is_pid(ClientPid) ->
|
set_conn_stats(Conn = {ClientId, ConnPid}, Stats) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
ets:insert(?CLIENT_STATS, {CObj, Stats}).
|
ets:insert(?CONN_STATS_TAB, {Conn, Stats}).
|
||||||
|
|
||||||
notify(Msg) ->
|
notify(Msg) ->
|
||||||
gen_server:cast(?CM, {notify, Msg}).
|
gen_server:cast(?CM, {notify, Msg}).
|
||||||
|
@ -114,52 +122,52 @@ notify(Msg) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
TabOpts = [public, set, {write_concurrency, true}],
|
TabOpts = [public, set, {write_concurrency, true}],
|
||||||
_ = emqx_tables:new(?CLIENT, [{read_concurrency, true} | TabOpts]),
|
_ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
|
||||||
_ = emqx_tables:new(?CLIENT_ATTRS, TabOpts),
|
_ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?CLIENT_STATS, TabOpts),
|
_ = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
|
||||||
ok = emqx_stats:update_interval(cm_stats, fun update_client_stats/0),
|
ok = emqx_stats:update_interval(cm_stats, fun update_conn_stats/0),
|
||||||
{ok, #state{client_pmon = emqx_pmon:new()}}.
|
{ok, #{conn_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[CM] unexpected call: ~p", [Req]),
|
emqx_logger:error("[CM] unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({notify, {registered, ClientId, Pid}}, State = #state{client_pmon = PMon}) ->
|
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
||||||
{noreply, State#state{client_pmon = emqx_pmon:monitor(Pid, ClientId, PMon)}};
|
{noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
|
||||||
|
|
||||||
handle_cast({notify, {unregistered, _ClientId, Pid}}, State = #state{client_pmon = PMon}) ->
|
handle_cast({notify, {unregistered, _ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
||||||
{noreply, State#state{client_pmon = emqx_pmon:demonitor(Pid, PMon)}};
|
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[CM] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[CM] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{client_pmon = PMon}) ->
|
handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := PMon}) ->
|
||||||
case emqx_pmon:find(DownPid, PMon) of
|
case emqx_pmon:find(ConnPid, PMon) of
|
||||||
undefined -> {noreply, State};
|
undefined ->
|
||||||
ClientId ->
|
{noreply, State};
|
||||||
unregister_client({ClientId, DownPid}),
|
ClientId ->
|
||||||
{noreply, State#state{client_pmon = emqx_pmon:erase(DownPid, PMon)}}
|
unregister_connection({ClientId, ConnPid}),
|
||||||
|
{noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
emqx_logger:error("[CM] unexpected info: ~p", [Info]),
|
emqx_logger:error("[CM] unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State = #state{}) ->
|
terminate(_Reason, _State) ->
|
||||||
emqx_stats:cancel_update(cm_stats).
|
emqx_stats:cancel_update(cm_stats).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
update_client_stats() ->
|
update_conn_stats() ->
|
||||||
case ets:info(?CLIENT, size) of
|
case ets:info(?CONN_TAB, size) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
Size ->
|
Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
|
||||||
emqx_stats:setstat('clients/count', 'clients/max', Size)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -96,16 +96,16 @@ init([Transport, RawSocket, Options]) ->
|
||||||
sendfun => SendFun}, Options),
|
sendfun => SendFun}, Options),
|
||||||
ParserState = emqx_protocol:parser(ProtoState),
|
ParserState = emqx_protocol:parser(ProtoState),
|
||||||
State = run_socket(#state{transport = Transport,
|
State = run_socket(#state{transport = Transport,
|
||||||
socket = Socket,
|
socket = Socket,
|
||||||
peername = Peername,
|
peername = Peername,
|
||||||
await_recv = false,
|
await_recv = false,
|
||||||
conn_state = running,
|
conn_state = running,
|
||||||
rate_limit = RateLimit,
|
rate_limit = RateLimit,
|
||||||
publish_limit = PubLimit,
|
publish_limit = PubLimit,
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
parser_state = ParserState,
|
parser_state = ParserState,
|
||||||
enable_stats = EnableStats,
|
enable_stats = EnableStats,
|
||||||
idle_timeout = IdleTimout}),
|
idle_timeout = IdleTimout}),
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
|
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
|
||||||
State, self(), IdleTimout);
|
State, self(), IdleTimout);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -187,7 +187,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
|
||||||
|
|
||||||
handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
|
handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
|
||||||
Stats = element(2, handle_call(stats, undefined, State)),
|
Stats = element(2, handle_call(stats, undefined, State)),
|
||||||
emqx_cm:set_client_stats(emqx_protocol:client_id(ProtoState), Stats),
|
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats),
|
||||||
{noreply, State#state{stats_timer = undefined}, hibernate};
|
{noreply, State#state{stats_timer = undefined}, hibernate};
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
|
|
|
@ -237,7 +237,7 @@ process(?CONNECT_PACKET(
|
||||||
case try_open_session(PState3) of
|
case try_open_session(PState3) of
|
||||||
{ok, SPid, SP} ->
|
{ok, SPid, SP} ->
|
||||||
PState4 = PState3#pstate{session = SPid},
|
PState4 = PState3#pstate{session = SPid},
|
||||||
ok = emqx_cm:register_client({client_id(PState4), self()}, info(PState4)),
|
ok = emqx_cm:register_connection(client_id(PState4), info(PState4)),
|
||||||
%% Start keepalive
|
%% Start keepalive
|
||||||
start_keepalive(Keepalive, PState4),
|
start_keepalive(Keepalive, PState4),
|
||||||
%% TODO: 'Run hooks' before open_session?
|
%% TODO: 'Run hooks' before open_session?
|
||||||
|
@ -580,10 +580,10 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
|
||||||
shutdown(_Error, #pstate{client_id = undefined}) ->
|
shutdown(_Error, #pstate{client_id = undefined}) ->
|
||||||
ignore;
|
ignore;
|
||||||
shutdown(conflict, #pstate{client_id = ClientId}) ->
|
shutdown(conflict, #pstate{client_id = ClientId}) ->
|
||||||
emqx_cm:unregister_client(ClientId),
|
emqx_cm:unregister_connection(ClientId),
|
||||||
ignore;
|
ignore;
|
||||||
shutdown(mnesia_conflict, #pstate{client_id = ClientId}) ->
|
shutdown(mnesia_conflict, #pstate{client_id = ClientId}) ->
|
||||||
emqx_cm:unregister_client(ClientId),
|
emqx_cm:unregister_connection(ClientId),
|
||||||
ignore;
|
ignore;
|
||||||
shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
|
shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
|
||||||
?LOG(info, "Shutdown for ~p", [Error], PState),
|
?LOG(info, "Shutdown for ~p", [Error], PState),
|
||||||
|
@ -593,7 +593,7 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
|
||||||
false -> send_willmsg(WillMsg)
|
false -> send_willmsg(WillMsg)
|
||||||
end,
|
end,
|
||||||
emqx_hooks:run('client.disconnected', [Error], client(PState)),
|
emqx_hooks:run('client.disconnected', [Error], client(PState)),
|
||||||
emqx_cm:unregister_client(ClientId).
|
emqx_cm:unregister_connection(ClientId).
|
||||||
|
|
||||||
willmsg(Packet, PState = #pstate{client_id = ClientId})
|
willmsg(Packet, PState = #pstate{client_id = ClientId})
|
||||||
when is_record(Packet, mqtt_packet_connect) ->
|
when is_record(Packet, mqtt_packet_connect) ->
|
||||||
|
|
|
@ -24,7 +24,8 @@
|
||||||
-export([lookup_session/1, lookup_session_pid/1]).
|
-export([lookup_session/1, lookup_session_pid/1]).
|
||||||
-export([resume_session/1, resume_session/2]).
|
-export([resume_session/1, resume_session/2]).
|
||||||
-export([discard_session/1, discard_session/2]).
|
-export([discard_session/1, discard_session/2]).
|
||||||
-export([register_session/2, get_session_attrs/1, unregister_session/1]).
|
-export([register_session/2, unregister_session/1]).
|
||||||
|
-export([get_session_attrs/1, set_session_attrs/2]).
|
||||||
-export([get_session_stats/1, set_session_stats/2]).
|
-export([get_session_stats/1, set_session_stats/2]).
|
||||||
|
|
||||||
%% Internal functions for rpc
|
%% Internal functions for rpc
|
||||||
|
@ -39,10 +40,10 @@
|
||||||
-define(SM, ?MODULE).
|
-define(SM, ?MODULE).
|
||||||
|
|
||||||
%% ETS Tables
|
%% ETS Tables
|
||||||
-define(SESSION, emqx_session).
|
-define(SESSION_TAB, emqx_session).
|
||||||
-define(SESSION_P, emqx_persistent_session).
|
-define(SESSION_P_TAB, emqx_persistent_session).
|
||||||
-define(SESSION_ATTRS, emqx_session_attrs).
|
-define(SESSION_ATTRS_TAB, emqx_session_attrs).
|
||||||
-define(SESSION_STATS, emqx_session_stats).
|
-define(SESSION_STATS_TAB, emqx_session_stats).
|
||||||
|
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
|
@ -125,41 +126,44 @@ register_session(ClientId, Attrs) when is_binary(ClientId) ->
|
||||||
|
|
||||||
register_session(Session = {ClientId, SPid}, Attrs)
|
register_session(Session = {ClientId, SPid}, Attrs)
|
||||||
when is_binary(ClientId), is_pid(SPid) ->
|
when is_binary(ClientId), is_pid(SPid) ->
|
||||||
ets:insert(?SESSION, Session),
|
ets:insert(?SESSION_TAB, Session),
|
||||||
ets:insert(?SESSION_ATTRS, {Session, Attrs}),
|
ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}),
|
||||||
case proplists:get_value(clean_start, Attrs, true) of
|
case proplists:get_value(clean_start, Attrs, true) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> ets:insert(?SESSION_P, Session)
|
false -> ets:insert(?SESSION_P_TAB, Session)
|
||||||
end,
|
end,
|
||||||
emqx_sm_registry:register_session(Session),
|
emqx_sm_registry:register_session(Session),
|
||||||
notify({registered, ClientId, SPid}).
|
notify({registered, ClientId, SPid}).
|
||||||
|
|
||||||
%% @doc Get session attrs
|
%% @doc Get session attrs
|
||||||
-spec(get_session_attrs({client_id(), pid()})
|
-spec(get_session_attrs({client_id(), pid()}) -> list(emqx_session:attribute())).
|
||||||
-> list(emqx_session:attribute())).
|
get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
||||||
get_session_attrs(Session = {ClientId, SPid})
|
safe_lookup_element(?SESSION_ATTRS_TAB, Session, []).
|
||||||
when is_binary(ClientId), is_pid(SPid) ->
|
|
||||||
safe_lookup_element(?SESSION_ATTRS, Session, []).
|
%% @doc Set session attrs
|
||||||
|
set_session_attrs(ClientId, Attrs) when is_binary(ClientId) ->
|
||||||
|
set_session_attrs({ClientId, self()}, Attrs);
|
||||||
|
set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) ->
|
||||||
|
ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}).
|
||||||
|
|
||||||
%% @doc Unregister a session
|
%% @doc Unregister a session
|
||||||
-spec(unregister_session(client_id() | {client_id(), pid()}) -> ok).
|
-spec(unregister_session(client_id() | {client_id(), pid()}) -> ok).
|
||||||
unregister_session(ClientId) when is_binary(ClientId) ->
|
unregister_session(ClientId) when is_binary(ClientId) ->
|
||||||
unregister_session({ClientId, self()});
|
unregister_session({ClientId, self()});
|
||||||
|
|
||||||
unregister_session(Session = {ClientId, SPid})
|
unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
||||||
when is_binary(ClientId), is_pid(SPid) ->
|
|
||||||
emqx_sm_registry:unregister_session(Session),
|
emqx_sm_registry:unregister_session(Session),
|
||||||
ets:delete(?SESSION_STATS, Session),
|
ets:delete(?SESSION_STATS_TAB, Session),
|
||||||
ets:delete(?SESSION_ATTRS, Session),
|
ets:delete(?SESSION_ATTRS_TAB, Session),
|
||||||
ets:delete_object(?SESSION_P, Session),
|
ets:delete_object(?SESSION_P_TAB, Session),
|
||||||
ets:delete_object(?SESSION, Session),
|
ets:delete_object(?SESSION_TAB, Session),
|
||||||
notify({unregistered, ClientId, SPid}).
|
notify({unregistered, ClientId, SPid}).
|
||||||
|
|
||||||
%% @doc Get session stats
|
%% @doc Get session stats
|
||||||
-spec(get_session_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
|
-spec(get_session_stats({client_id(), pid()}) -> list(emqx_stats:stats())).
|
||||||
get_session_stats(Session = {ClientId, SPid})
|
get_session_stats(Session = {ClientId, SPid})
|
||||||
when is_binary(ClientId), is_pid(SPid) ->
|
when is_binary(ClientId), is_pid(SPid) ->
|
||||||
safe_lookup_element(?SESSION_STATS, Session, []).
|
safe_lookup_element(?SESSION_STATS_TAB, Session, []).
|
||||||
|
|
||||||
%% @doc Set session stats
|
%% @doc Set session stats
|
||||||
-spec(set_session_stats(client_id() | {client_id(), pid()},
|
-spec(set_session_stats(client_id() | {client_id(), pid()},
|
||||||
|
@ -169,14 +173,14 @@ set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
|
||||||
|
|
||||||
set_session_stats(Session = {ClientId, SPid}, Stats)
|
set_session_stats(Session = {ClientId, SPid}, Stats)
|
||||||
when is_binary(ClientId), is_pid(SPid) ->
|
when is_binary(ClientId), is_pid(SPid) ->
|
||||||
ets:insert(?SESSION_STATS, {Session, Stats}).
|
ets:insert(?SESSION_STATS_TAB, {Session, Stats}).
|
||||||
|
|
||||||
%% @doc Lookup a session from registry
|
%% @doc Lookup a session from registry
|
||||||
-spec(lookup_session(client_id()) -> list({client_id(), pid()})).
|
-spec(lookup_session(client_id()) -> list({client_id(), pid()})).
|
||||||
lookup_session(ClientId) ->
|
lookup_session(ClientId) ->
|
||||||
case emqx_sm_registry:is_enabled() of
|
case emqx_sm_registry:is_enabled() of
|
||||||
true -> emqx_sm_registry:lookup_session(ClientId);
|
true -> emqx_sm_registry:lookup_session(ClientId);
|
||||||
false -> ets:lookup(?SESSION, ClientId)
|
false -> ets:lookup(?SESSION_TAB, ClientId)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Dispatch a message to the session.
|
%% @doc Dispatch a message to the session.
|
||||||
|
@ -192,7 +196,7 @@ dispatch(ClientId, Topic, Msg) ->
|
||||||
%% @doc Lookup session pid.
|
%% @doc Lookup session pid.
|
||||||
-spec(lookup_session_pid(client_id()) -> pid() | undefined).
|
-spec(lookup_session_pid(client_id()) -> pid() | undefined).
|
||||||
lookup_session_pid(ClientId) ->
|
lookup_session_pid(ClientId) ->
|
||||||
safe_lookup_element(?SESSION, ClientId, undefined).
|
safe_lookup_element(?SESSION_TAB, ClientId, undefined).
|
||||||
|
|
||||||
safe_lookup_element(Tab, Key, Default) ->
|
safe_lookup_element(Tab, Key, Default) ->
|
||||||
try ets:lookup_element(Tab, Key, 2)
|
try ets:lookup_element(Tab, Key, 2)
|
||||||
|
@ -209,12 +213,12 @@ notify(Event) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
TabOpts = [public, set, {write_concurrency, true}],
|
TabOpts = [public, set, {write_concurrency, true}],
|
||||||
_ = emqx_tables:new(?SESSION, [{read_concurrency, true} | TabOpts]),
|
_ = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]),
|
||||||
_ = emqx_tables:new(?SESSION_P, TabOpts),
|
_ = emqx_tables:new(?SESSION_P_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?SESSION_ATTRS, TabOpts),
|
_ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?SESSION_STATS, TabOpts),
|
_ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
|
||||||
emqx_stats:update_interval(sm_stats, stats_fun()),
|
emqx_stats:update_interval(sm_stats, stats_fun()),
|
||||||
{ok, #state{session_pmon = emqx_pmon:new()}}.
|
{ok, #{session_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[SM] unexpected call: ~p", [Req]),
|
emqx_logger:error("[SM] unexpected call: ~p", [Req]),
|
||||||
|
@ -230,12 +234,12 @@ handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{session_pmon = PMon}) ->
|
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) ->
|
||||||
case emqx_pmon:find(DownPid, PMon) of
|
case emqx_pmon:find(DownPid, PMon) of
|
||||||
undefined -> {noreply, State};
|
undefined -> {noreply, State};
|
||||||
ClientId ->
|
ClientId ->
|
||||||
unregister_session({ClientId, DownPid}),
|
unregister_session({ClientId, DownPid}),
|
||||||
{noreply, State#state{session_pmon = emqx_pmon:erase(DownPid, PMon)}}
|
{noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -254,8 +258,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
stats_fun() ->
|
stats_fun() ->
|
||||||
fun() ->
|
fun() ->
|
||||||
safe_update_stats(?SESSION, 'sessions/count', 'sessions/max'),
|
safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'),
|
||||||
safe_update_stats(?SESSION_P, 'sessions/persistent/count', 'sessions/persistent/max')
|
safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max')
|
||||||
end.
|
end.
|
||||||
|
|
||||||
safe_update_stats(Tab, Stat, MaxStat) ->
|
safe_update_stats(Tab, Stat, MaxStat) ->
|
||||||
|
|
|
@ -20,11 +20,10 @@
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
%% Get all stats
|
|
||||||
-export([all/0]).
|
|
||||||
|
|
||||||
%% Stats API.
|
%% Stats API.
|
||||||
-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]).
|
-export([getstats/0, getstat/1]).
|
||||||
|
-export([setstat/2, setstat/3]).
|
||||||
|
-export([statsfun/1, statsfun/2]).
|
||||||
-export([update_interval/2, update_interval/3, cancel_update/1]).
|
-export([update_interval/2, update_interval/3, cancel_update/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -35,13 +34,12 @@
|
||||||
-record(state, {timer, updates :: #update{}}).
|
-record(state, {timer, updates :: #update{}}).
|
||||||
|
|
||||||
-type(stats() :: list({atom(), non_neg_integer()})).
|
-type(stats() :: list({atom(), non_neg_integer()})).
|
||||||
|
|
||||||
-export_type([stats/0]).
|
-export_type([stats/0]).
|
||||||
|
|
||||||
%% Client stats
|
%% Connection stats
|
||||||
-define(CLIENT_STATS, [
|
-define(CONNECTION_STATS, [
|
||||||
'clients/count', % clients connected current
|
'connections/count', % current connections
|
||||||
'clients/max' % maximum clients connected
|
'connections/max' % maximum connections connected
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Session stats
|
%% Session stats
|
||||||
|
@ -77,14 +75,10 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% @doc Start stats server
|
%% @doc Start stats server
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
%% Get all stats.
|
|
||||||
-spec(all() -> stats()).
|
|
||||||
all() -> getstats().
|
|
||||||
|
|
||||||
%% @doc Generate stats fun
|
%% @doc Generate stats fun
|
||||||
-spec(statsfun(Stat :: atom()) -> fun()).
|
-spec(statsfun(Stat :: atom()) -> fun()).
|
||||||
statsfun(Stat) ->
|
statsfun(Stat) ->
|
||||||
|
@ -139,13 +133,13 @@ rec(Name, Secs, UpFun) ->
|
||||||
cast(Msg) ->
|
cast(Msg) ->
|
||||||
gen_server:cast(?SERVER, Msg).
|
gen_server:cast(?SERVER, Msg).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
|
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
|
||||||
Stats = lists:append([?CLIENT_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
|
Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
|
||||||
?ROUTE_STATS, ?RETAINED_STATS]),
|
?ROUTE_STATS, ?RETAINED_STATS]),
|
||||||
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
|
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
|
||||||
{ok, start_timer(#state{updates = []}), hibernate}.
|
{ok, start_timer(#state{updates = []}), hibernate}.
|
||||||
|
@ -185,19 +179,19 @@ handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[Stats] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[Stats] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, TRef, tick}, State = #state{timer= TRef, updates = Updates}) ->
|
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
|
||||||
lists:foldl(
|
Updates1 = lists:foldl(
|
||||||
fun(Update = #update{name = Name, countdown = C, interval = I,
|
fun(Update = #update{name = Name, countdown = C, interval = I,
|
||||||
func = UpFun}, Acc) when C =< 0 ->
|
func = UpFun}, Acc) when C =< 0 ->
|
||||||
try UpFun()
|
try UpFun()
|
||||||
catch _:Error ->
|
catch _:Error ->
|
||||||
emqx_logger:error("[Stats] update ~s error: ~p", [Name, Error])
|
emqx_logger:error("[Stats] update ~s error: ~p", [Name, Error])
|
||||||
end,
|
end,
|
||||||
[Update#update{countdown = I} | Acc];
|
[Update#update{countdown = I} | Acc];
|
||||||
(Update = #update{countdown = C}, Acc) ->
|
(Update = #update{countdown = C}, Acc) ->
|
||||||
[Update#update{countdown = C - 1} | Acc]
|
[Update#update{countdown = C - 1} | Acc]
|
||||||
end, [], Updates),
|
end, [], Updates),
|
||||||
{noreply, start_timer(State), hibernate};
|
{noreply, start_timer(State#state{updates = Updates1}), hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
emqx_logger:error("[Stats] unexpected info: ~p", [Info]),
|
emqx_logger:error("[Stats] unexpected info: ~p", [Info]),
|
||||||
|
@ -209,9 +203,9 @@ terminate(_Reason, #state{timer = TRef}) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
safe_update_element(Key, Val) ->
|
safe_update_element(Key, Val) ->
|
||||||
try ets:update_element(?TAB, Key, {2, Val})
|
try ets:update_element(?TAB, Key, {2, Val})
|
||||||
|
|
|
@ -114,7 +114,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi
|
||||||
publish(version, Version),
|
publish(version, Version),
|
||||||
publish(sysdescr, Descr),
|
publish(sysdescr, Descr),
|
||||||
publish(brokers, ekka_mnesia:running_nodes()),
|
publish(brokers, ekka_mnesia:running_nodes()),
|
||||||
publish(stats, emqx_stats:all()),
|
publish(stats, emqx_stats:getstats()),
|
||||||
publish(metrics, emqx_metrics:all()),
|
publish(metrics, emqx_metrics:all()),
|
||||||
{noreply, tick(State), hibernate};
|
{noreply, tick(State), hibernate};
|
||||||
|
|
||||||
|
|
|
@ -16,11 +16,14 @@
|
||||||
|
|
||||||
%%-include("emqx.hrl").
|
%%-include("emqx.hrl").
|
||||||
|
|
||||||
|
-export_type([startlink_ret/0]).
|
||||||
-export_type([zone/0, client_id/0, username/0, password/0, peername/0,
|
-export_type([zone/0, client_id/0, username/0, password/0, peername/0,
|
||||||
protocol/0, credentials/0]).
|
protocol/0, credentials/0]).
|
||||||
-export_type([payload/0]).
|
-export_type([payload/0]).
|
||||||
%%-export_type([payload/0, message/0, delivery/0]).
|
%%-export_type([payload/0, message/0, delivery/0]).
|
||||||
|
|
||||||
|
-type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}).
|
||||||
|
|
||||||
-type(zone() :: atom()).
|
-type(zone() :: atom()).
|
||||||
-type(client_id() :: binary() | atom()).
|
-type(client_id() :: binary() | atom()).
|
||||||
-type(username() :: binary() | undefined).
|
-type(username() :: binary() | undefined).
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||||
|
|
||||||
-define(WSLOG(Level, Format, Args, State),
|
-define(WSLOG(Level, Format, Args, State),
|
||||||
lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
|
emqx_logger:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -84,7 +84,6 @@ call(WSPid, Req) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init(Req, Opts) ->
|
init(Req, Opts) ->
|
||||||
io:format("Opts: ~p~n", [Opts]),
|
|
||||||
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{cowboy_websocket, Req, #state{}};
|
{cowboy_websocket, Req, #state{}};
|
||||||
|
@ -200,7 +199,7 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
|
||||||
websocket_info(emit_stats, State = #state{proto_state = ProtoState}) ->
|
websocket_info(emit_stats, State = #state{proto_state = ProtoState}) ->
|
||||||
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(),
|
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(),
|
||||||
emqx_protocol:stats(ProtoState)]),
|
emqx_protocol:stats(ProtoState)]),
|
||||||
emqx_cm:set_client_stats(emqx_protocol:clientid(ProtoState), Stats),
|
emqx_cm:set_conn_stats(emqx_protocol:clientid(ProtoState), Stats),
|
||||||
{ok, State#state{stats_timer = undefined}, hibernate};
|
{ok, State#state{stats_timer = undefined}, hibernate};
|
||||||
|
|
||||||
websocket_info({keepalive, start, Interval}, State) ->
|
websocket_info({keepalive, start, Interval}, State) ->
|
||||||
|
|
Loading…
Reference in New Issue