210 lines
7.1 KiB
Erlang
210 lines
7.1 KiB
Erlang
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
|
|
-module(emqx_cm).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-include("emqx.hrl").
|
|
-include("logger.hrl").
|
|
-include("types.hrl").
|
|
|
|
-export([start_link/0]).
|
|
|
|
-export([ register_connection/1
|
|
, register_connection/2
|
|
, unregister_connection/1
|
|
, unregister_connection/2
|
|
]).
|
|
|
|
-export([ get_conn_attrs/1
|
|
, get_conn_attrs/2
|
|
, set_conn_attrs/2
|
|
, set_conn_attrs/3
|
|
]).
|
|
|
|
-export([ get_conn_stats/1
|
|
, get_conn_stats/2
|
|
, set_conn_stats/2
|
|
, set_conn_stats/3
|
|
]).
|
|
|
|
-export([lookup_conn_pid/1]).
|
|
|
|
%% gen_server callbacks
|
|
-export([ init/1
|
|
, handle_call/3
|
|
, handle_cast/2
|
|
, handle_info/2
|
|
, terminate/2
|
|
, code_change/3
|
|
]).
|
|
|
|
%% internal export
|
|
-export([stats_fun/0]).
|
|
|
|
-define(CM, ?MODULE).
|
|
|
|
%% ETS tables for connection management.
|
|
-define(CONN_TAB, emqx_conn).
|
|
-define(CONN_ATTRS_TAB, emqx_conn_attrs).
|
|
-define(CONN_STATS_TAB, emqx_conn_stats).
|
|
|
|
-define(BATCH_SIZE, 100000).
|
|
|
|
%% @doc Start the connection manager.
|
|
-spec(start_link() -> startlink_ret()).
|
|
start_link() ->
|
|
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% API
|
|
%%------------------------------------------------------------------------------
|
|
|
|
%% @doc Register a connection.
|
|
-spec(register_connection(emqx_types:client_id()) -> ok).
|
|
register_connection(ClientId) when is_binary(ClientId) ->
|
|
register_connection(ClientId, self()).
|
|
|
|
-spec(register_connection(emqx_types:client_id(), pid()) -> ok).
|
|
register_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
true = ets:insert(?CONN_TAB, {ClientId, ConnPid}),
|
|
notify({registered, ClientId, ConnPid}).
|
|
|
|
%% @doc Unregister a connection.
|
|
-spec(unregister_connection(emqx_types:client_id()) -> ok).
|
|
unregister_connection(ClientId) when is_binary(ClientId) ->
|
|
unregister_connection(ClientId, self()).
|
|
|
|
-spec(unregister_connection(emqx_types:client_id(), pid()) -> ok).
|
|
unregister_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
true = do_unregister_connection({ClientId, ConnPid}),
|
|
notify({unregistered, ConnPid}).
|
|
|
|
do_unregister_connection(Conn) ->
|
|
true = ets:delete(?CONN_STATS_TAB, Conn),
|
|
true = ets:delete(?CONN_ATTRS_TAB, Conn),
|
|
true = ets:delete_object(?CONN_TAB, Conn).
|
|
|
|
%% @doc Get conn attrs
|
|
-spec(get_conn_attrs(emqx_types:client_id()) -> list()).
|
|
get_conn_attrs(ClientId) when is_binary(ClientId) ->
|
|
ConnPid = lookup_conn_pid(ClientId),
|
|
get_conn_attrs(ClientId, ConnPid).
|
|
|
|
-spec(get_conn_attrs(emqx_types:client_id(), pid()) -> list()).
|
|
get_conn_attrs(ClientId, ConnPid) when is_binary(ClientId) ->
|
|
emqx_tables:lookup_value(?CONN_ATTRS_TAB, {ClientId, ConnPid}, []).
|
|
|
|
%% @doc Set conn attrs
|
|
-spec(set_conn_attrs(emqx_types:client_id(), list()) -> true).
|
|
set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) ->
|
|
set_conn_attrs(ClientId, self(), Attrs).
|
|
|
|
-spec(set_conn_attrs(emqx_types:client_id(), pid(), list()) -> true).
|
|
set_conn_attrs(ClientId, ConnPid, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
Conn = {ClientId, ConnPid},
|
|
ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}).
|
|
|
|
%% @doc Get conn stats
|
|
-spec(get_conn_stats(emqx_types:client_id()) -> list(emqx_stats:stats())).
|
|
get_conn_stats(ClientId) when is_binary(ClientId) ->
|
|
ConnPid = lookup_conn_pid(ClientId),
|
|
get_conn_stats(ClientId, ConnPid).
|
|
|
|
-spec(get_conn_stats(emqx_types:client_id(), pid()) -> list(emqx_stats:stats())).
|
|
get_conn_stats(ClientId, ConnPid) when is_binary(ClientId) ->
|
|
Conn = {ClientId, ConnPid},
|
|
emqx_tables:lookup_value(?CONN_STATS_TAB, Conn, []).
|
|
|
|
%% @doc Set conn stats.
|
|
-spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> true).
|
|
set_conn_stats(ClientId, Stats) when is_binary(ClientId) ->
|
|
set_conn_stats(ClientId, self(), Stats).
|
|
|
|
-spec(set_conn_stats(emqx_types:client_id(), pid(), list(emqx_stats:stats())) -> true).
|
|
set_conn_stats(ClientId, ConnPid, Stats) when is_binary(ClientId), is_pid(ConnPid) ->
|
|
Conn = {ClientId, ConnPid},
|
|
ets:insert(?CONN_STATS_TAB, {Conn, Stats}).
|
|
|
|
%% @doc Lookup connection pid.
|
|
-spec(lookup_conn_pid(emqx_types:client_id()) -> maybe(pid())).
|
|
lookup_conn_pid(ClientId) when is_binary(ClientId) ->
|
|
emqx_tables:lookup_value(?CONN_TAB, ClientId).
|
|
|
|
notify(Msg) ->
|
|
gen_server:cast(?CM, {notify, Msg}).
|
|
|
|
%%-----------------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%-----------------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
TabOpts = [public, set, {write_concurrency, true}],
|
|
ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
|
|
ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
|
|
ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
|
|
ok = emqx_stats:update_interval(conn_stats, fun ?MODULE:stats_fun/0),
|
|
{ok, #{conn_pmon => emqx_pmon:new()}}.
|
|
|
|
handle_call(Req, _From, State) ->
|
|
?LOG(error, "[CM] Unexpected call: ~p", [Req]),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
|
{noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
|
|
|
|
handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
|
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
|
|
|
|
handle_cast(Msg, State) ->
|
|
?LOG(error, "[CM] Unexpected cast: ~p", [Msg]),
|
|
{noreply, State}.
|
|
|
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
|
|
ConnPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
|
{Items, PMon1} = emqx_pmon:erase_all(ConnPids, PMon),
|
|
ok = emqx_pool:async_submit(
|
|
fun lists:foreach/2, [fun clean_down/1, Items]),
|
|
{noreply, State#{conn_pmon := PMon1}};
|
|
|
|
handle_info(Info, State) ->
|
|
?LOG(error, "[CM] Unexpected info: ~p", [Info]),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
emqx_stats:cancel_update(conn_stats).
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
clean_down({Pid, ClientId}) ->
|
|
Conn = {ClientId, Pid},
|
|
case ets:member(?CONN_TAB, ClientId)
|
|
orelse ets:member(?CONN_ATTRS_TAB, Conn) of
|
|
true ->
|
|
do_unregister_connection(Conn);
|
|
false -> false
|
|
end.
|
|
|
|
stats_fun() ->
|
|
case ets:info(?CONN_TAB, size) of
|
|
undefined -> ok;
|
|
Size -> emqx_stats:setstat('connections.count', 'connections.max', Size)
|
|
end.
|