emqx/apps/emqx_gateway/src/emqx_gateway_cm.erl

844 lines
26 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The Gateway Channel Manager
%%
%% For a certain type of protocol, this is a single instance of the manager.
%% It means that no matter how many instances of the stomp gateway are created,
%% they all share a single this Connection-Manager
-module(emqx_gateway_cm).
-behaviour(gen_server).
-include("include/emqx_gateway.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% APIs
-export([start_link/1]).
-export([
open_session/5,
open_session/6,
discard_session/2,
kick_session/2,
kick_session/3,
takeover_session/2,
register_channel/4,
unregister_channel/2,
insert_channel_info/4,
lookup_by_clientid/2,
set_chan_info/3,
set_chan_info/4,
get_chan_info/2,
get_chan_info/3,
set_chan_stats/3,
set_chan_stats/4,
get_chan_stats/2,
get_chan_stats/3,
connection_closed/2
]).
-export([
call/3,
call/4,
cast/3
]).
-export([
with_channel/3,
lookup_channels/2
]).
%% Internal funcs for getting tabname by GatewayId
-export([cmtabs/1, tabname/2]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% RPC targets
-export([
do_lookup_by_clientid/2,
do_get_chan_info/3,
do_set_chan_info/4,
do_get_chan_stats/3,
do_set_chan_stats/4,
do_kick_session/4,
do_takeover_session/3,
do_get_chann_conn_mod/3,
do_call/4,
do_call/5,
do_cast/4
]).
-export_type([gateway_name/0]).
-record(state, {
%% Gateway Name
gwname :: gateway_name(),
%% ClientId Locker for CM
locker :: pid(),
%% ClientId Registry server
registry :: pid(),
chan_pmon :: emqx_pmon:pmon()
}).
-type option() :: {gwname, gateway_name()}.
-type options() :: list(option()).
-define(T_KICK, 5000).
-define(T_GET_INFO, 5000).
-define(T_TAKEOVER, 15000).
-define(DEFAULT_BATCH_SIZE, 10000).
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec start_link(options()) -> {ok, pid()} | ignore | {error, any()}.
start_link(Options) ->
GwName = proplists:get_value(gwname, Options),
gen_server:start_link({local, procname(GwName)}, ?MODULE, Options, []).
procname(GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_cm'])).
-spec cmtabs(GwName :: gateway_name()) ->
{ChanTab :: atom(), ConnTab :: atom(), ChannInfoTab :: atom()}.
cmtabs(GwName) ->
%% Record: {ClientId, Pid}
{
tabname(chan, GwName),
%% Record: {{ClientId, Pid}, ConnMod}
tabname(conn, GwName),
%% Record: {{ClientId, Pid}, Info, Stats}
tabname(info, GwName)
}.
tabname(chan, GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel']));
tabname(conn, GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel_conn']));
tabname(info, GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel_info'])).
lockername(GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_locker'])).
-spec register_channel(
gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:conninfo()
) -> ok.
register_channel(GwName, ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(tabname(chan, GwName), Chan),
true = ets:insert(tabname(conn, GwName), {Chan, ConnMod}),
ok = emqx_gateway_cm_registry:register_channel(GwName, Chan),
cast(procname(GwName), {registered, Chan}).
%% @doc Unregister a channel.
-spec unregister_channel(gateway_name(), emqx_types:clientid()) -> ok.
unregister_channel(GwName, ClientId) when is_binary(ClientId) ->
true = do_unregister_channel(GwName, {ClientId, self()}, cmtabs(GwName)),
ok.
%% @doc Insert/Update the channel info and stats
-spec insert_channel_info(
gateway_name(),
emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()
) -> ok.
insert_channel_info(GwName, ClientId, Info, Stats) ->
Chan = {ClientId, self()},
true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}),
ok.
%% @doc Get info of a channel.
-spec get_chan_info(gateway_name(), emqx_types:clientid()) ->
emqx_types:infos() | undefined.
get_chan_info(GwName, ClientId) ->
with_channel(
GwName,
ClientId,
fun(ChanPid) ->
get_chan_info(GwName, ClientId, ChanPid)
end
).
-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()].
do_lookup_by_clientid(GwName, ClientId) ->
ChanTab = emqx_gateway_cm:tabname(chan, GwName),
[Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)].
-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid()) ->
emqx_types:infos() | undefined.
do_get_chan_info(GwName, ClientId, ChanPid) ->
Chan = {ClientId, ChanPid},
try
Info = ets:lookup_element(tabname(info, GwName), Chan, 2),
Info#{node => node()}
catch
error:badarg -> undefined
end.
-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) ->
emqx_types:infos() | undefined.
get_chan_info(GwName, ClientId, ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)
).
-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()].
lookup_by_clientid(GwName, ClientId) ->
Nodes = mria:running_nodes(),
case
emqx_gateway_cm_proto_v1:lookup_by_clientid(
Nodes, GwName, ClientId
)
of
{Pids, []} ->
lists:append(Pids);
{_, _BadNodes} ->
error(badrpc)
end.
%% @doc Update infos of the channel.
-spec set_chan_info(
gateway_name(),
emqx_types:clientid(),
emqx_types:infos()
) -> boolean().
set_chan_info(GwName, ClientId, Infos) ->
set_chan_info(GwName, ClientId, self(), Infos).
-spec do_set_chan_info(
gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:infos()
) -> boolean().
do_set_chan_info(GwName, ClientId, ChanPid, Infos) ->
Chan = {ClientId, ChanPid},
try
ets:update_element(tabname(info, GwName), Chan, {2, Infos})
catch
error:badarg -> false
end.
-spec set_chan_info(
gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:infos()
) -> boolean().
set_chan_info(GwName, ClientId, ChanPid, Infos) ->
wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_info(GwName, ClientId, ChanPid, Infos)).
%% @doc Get channel's stats.
-spec get_chan_stats(gateway_name(), emqx_types:clientid()) ->
emqx_types:stats() | undefined.
get_chan_stats(GwName, ClientId) ->
with_channel(
GwName,
ClientId,
fun(ChanPid) ->
get_chan_stats(GwName, ClientId, ChanPid)
end
).
-spec do_get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) ->
emqx_types:stats() | undefined.
do_get_chan_stats(GwName, ClientId, ChanPid) ->
Chan = {ClientId, ChanPid},
try
ets:lookup_element(tabname(info, GwName), Chan, 3)
catch
error:badarg -> undefined
end.
-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) ->
emqx_types:stats() | undefined.
get_chan_stats(GwName, ClientId, ChanPid) ->
wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_stats(GwName, ClientId, ChanPid)).
-spec set_chan_stats(
gateway_name(),
emqx_types:clientid(),
emqx_types:stats()
) -> boolean().
set_chan_stats(GwName, ClientId, Stats) ->
set_chan_stats(GwName, ClientId, self(), Stats).
-spec do_set_chan_stats(
gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:stats()
) -> boolean().
do_set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
Chan = {ClientId, ChanPid},
try
ets:update_element(tabname(info, GwName), Chan, {3, Stats})
catch
error:badarg -> false
end.
-spec set_chan_stats(
gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:stats()
) -> boolean().
set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_stats(GwName, ClientId, ChanPid, Stats)).
-spec connection_closed(gateway_name(), emqx_types:clientid()) -> true.
connection_closed(GwName, ClientId) ->
%% XXX: Why we need to delete conn_mod tab ???
Chan = {ClientId, self()},
ets:delete_object(tabname(conn, GwName), Chan).
-spec open_session(
GwName :: gateway_name(),
CleanStart :: boolean(),
ClientInfo :: emqx_types:clientinfo(),
ConnInfo :: emqx_types:conninfo(),
CreateSessionFun :: fun(
(
emqx_types:clientinfo(),
emqx_types:conninfo()
) -> Session
)
) ->
{ok, #{
session := Session,
present := boolean(),
pendings => list()
}}
| {error, any()}.
open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session).
open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
Self = self(),
ClientId = maps:get(clientid, ClientInfo),
Fun = fun(_) ->
_ = discard_session(GwName, ClientId),
Session = create_session(
GwName,
ClientInfo,
ConnInfo,
CreateSessionFun,
SessionMod
),
register_channel(GwName, ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end,
locker_trans(GwName, ClientId, Fun);
open_session(
GwName,
false = _CleanStart,
ClientInfo = #{clientid := ClientId},
ConnInfo,
CreateSessionFun,
SessionMod
) ->
Self = self(),
ResumeStart =
fun(_) ->
CreateSess =
fun() ->
Session = create_session(
GwName,
ClientInfo,
ConnInfo,
CreateSessionFun,
SessionMod
),
register_channel(
GwName, ClientId, Self, ConnInfo
),
{ok, #{session => Session, present => false}}
end,
case takeover_session(GwName, ClientId) of
{ok, ConnMod, ChanPid, SessionIn} ->
Session = SessionMod:resume(ClientInfo, SessionIn),
case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
{ok, Pendings} ->
register_channel(
GwName, ClientId, Self, ConnInfo
),
{ok, #{
session => Session,
present => true,
pendings => Pendings
}};
{error, _} ->
CreateSess()
end;
{error, _Reason} ->
CreateSess()
end
end,
locker_trans(GwName, ClientId, ResumeStart).
%% @private
create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
try
Session = emqx_gateway_utils:apply(
CreateSessionFun,
[ClientInfo, ConnInfo]
),
ok = emqx_gateway_metrics:inc(GwName, 'session.created'),
SessionInfo =
case
is_tuple(Session) andalso
element(1, Session) == session
of
true ->
SessionMod:info(Session);
_ ->
case is_map(Session) of
false ->
throw(session_structure_should_be_map);
_ ->
Session
end
end,
ok = emqx_hooks:run('session.created', [ClientInfo, SessionInfo]),
Session
catch
Class:Reason:Stk ->
?SLOG(error, #{
msg => "failed_create_session",
clientid => maps:get(clientid, ClientInfo, undefined),
username => maps:get(username, ClientInfo, undefined),
reason => {Class, Reason},
stacktrace => Stk
}),
throw(Reason)
end.
%% @doc Try to takeover a session.
-spec takeover_session(gateway_name(), emqx_types:clientid()) ->
{error, term()}
| {ok, atom(), pid(), emqx_session:session()}.
takeover_session(GwName, ClientId) ->
case lookup_channels(GwName, ClientId) of
[] ->
{error, not_found};
[ChanPid] ->
do_takeover_session(GwName, ClientId, ChanPid);
ChanPids ->
[ChanPid | StalePids] = lists:reverse(ChanPids),
?SLOG(warning, #{
msg => "more_than_one_channel_found",
chan_pids => ChanPids
}),
lists:foreach(
fun(StalePid) ->
catch discard_session(GwName, ClientId, StalePid)
end,
StalePids
),
do_takeover_session(GwName, ClientId, ChanPid)
end.
do_takeover_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined ->
{error, not_found};
ConnMod when is_atom(ConnMod) ->
case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
{ok, Session} ->
{ok, ConnMod, ChanPid, Session};
{error, Reason} ->
{error, Reason}
end
end;
do_takeover_session(GwName, ClientId, ChanPid) ->
wrap_rpc(emqx_gateway_cm_proto_v1:takeover_session(GwName, ClientId, ChanPid)).
%% @doc Discard all the sessions identified by the ClientId.
-spec discard_session(GwName :: gateway_name(), binary()) -> ok | {error, not_found}.
discard_session(GwName, ClientId) when is_binary(ClientId) ->
case lookup_channels(GwName, ClientId) of
[] -> {error, not_found};
ChanPids -> lists:foreach(fun(Pid) -> discard_session(GwName, ClientId, Pid) end, ChanPids)
end.
discard_session(GwName, ClientId, ChanPid) ->
kick_session(GwName, discard, ClientId, ChanPid).
-spec kick_session(gateway_name(), emqx_types:clientid()) -> ok | {error, not_found}.
kick_session(GwName, ClientId) ->
case lookup_channels(GwName, ClientId) of
[] ->
{error, not_found};
ChanPids ->
ChanPids > 1 andalso
begin
?SLOG(
warning,
#{
msg => "more_than_one_channel_found",
chan_pids => ChanPids
},
#{clientid => ClientId}
)
end,
lists:foreach(
fun(Pid) ->
_ = kick_session(GwName, ClientId, Pid)
end,
ChanPids
)
end.
kick_session(GwName, ClientId, ChanPid) ->
kick_session(GwName, kick, ClientId, ChanPid).
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
kick_session(GwName, Action, ClientId, ChanPid) ->
try
wrap_rpc(emqx_gateway_cm_proto_v1:kick_session(GwName, Action, ClientId, ChanPid))
catch
Error:Reason ->
%% This should mostly be RPC failures.
%% However, if the node is still running the old version
%% code (prior to emqx app 4.3.10) some of the RPC handler
%% exceptions may get propagated to a new version node
?SLOG(
error,
#{
msg => "failed_to_kick_session_on_remote_node",
node => node(ChanPid),
action => Action,
error => Error,
reason => Reason
},
#{clientid => ClientId}
)
end.
-spec do_kick_session(
gateway_name(),
kick | discard,
emqx_types:clientid(),
pid()
) -> ok.
do_kick_session(GwName, Action, ClientId, ChanPid) ->
case get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined ->
ok;
ConnMod when is_atom(ConnMod) ->
ok = request_stepdown(Action, ConnMod, ChanPid)
end.
%% @private call a local stale session to execute an Action.
%% If failed to response (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody.
-spec request_stepdown(Action, module(), pid()) ->
ok
| {ok, emqx_session:session() | list(emqx_types:deliver())}
| {error, term()}
when
Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
request_stepdown(Action, ConnMod, Pid) ->
Timeout =
case Action == kick orelse Action == discard of
true -> ?T_KICK;
_ -> ?T_TAKEOVER
end,
Return =
%% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel
try apply(ConnMod, call, [Pid, Action, Timeout]) of
ok -> ok;
Reply -> {ok, Reply}
catch
% emqx_ws_connection: call
_:noproc ->
ok = ?tp(debug, "session_already_gone", #{stale_pid => Pid, action => Action}),
{error, noproc};
% emqx_connection: gen_server:call
_:{noproc, _} ->
ok = ?tp(debug, "session_already_gone", #{stale_pid => Pid, action => Action}),
{error, noproc};
_:Reason = {shutdown, _} ->
ok = ?tp(debug, "session_already_shutdown", #{stale_pid => Pid, action => Action}),
{error, Reason};
_:Reason = {{shutdown, _}, _} ->
ok = ?tp(debug, "session_already_shutdown", #{stale_pid => Pid, action => Action}),
{error, Reason};
_:{timeout, {gen_server, call, _}} ->
?tp(
warning,
"session_stepdown_request_timeout",
#{
stale_pid => Pid,
action => Action,
stale_channel => stale_channel_info(Pid)
}
),
ok = force_kill(Pid),
{error, timeout};
_:Error:St ->
?tp(
error,
"session_stepdown_request_exception",
#{
stale_pid => Pid,
action => Action,
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}
),
ok = force_kill(Pid),
{error, Error}
end,
case Action == kick orelse Action == discard of
true -> ok;
_ -> Return
end.
force_kill(Pid) ->
exit(Pid, kill),
ok.
stale_channel_info(Pid) ->
process_info(Pid, [status, message_queue_len, current_stacktrace]).
with_channel(GwName, ClientId, Fun) ->
case lookup_channels(GwName, ClientId) of
[] -> undefined;
[Pid] -> Fun(Pid);
Pids -> Fun(lists:last(Pids))
end.
%% @doc Lookup channels.
-spec lookup_channels(gateway_name(), emqx_types:clientid()) -> list(pid()).
lookup_channels(GwName, ClientId) ->
emqx_gateway_cm_registry:lookup_channels(GwName, ClientId).
-spec do_get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
do_get_chann_conn_mod(GwName, ClientId, ChanPid) ->
Chan = {ClientId, ChanPid},
try
[ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2),
ConnMod
catch
error:badarg -> undefined
end.
-spec get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
get_chann_conn_mod(GwName, ClientId, ChanPid) ->
wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)).
-spec call(gateway_name(), emqx_types:clientid(), term()) ->
undefined | term().
call(GwName, ClientId, Req) ->
with_channel(
GwName,
ClientId,
fun(ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req)
)
end
).
-spec call(gateway_name(), emqx_types:clientid(), term(), timeout()) ->
undefined | term().
call(GwName, ClientId, Req, Timeout) ->
with_channel(
GwName,
ClientId,
fun(ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:call(
GwName, ClientId, ChanPid, Req, Timeout
)
)
end
).
do_call(GwName, ClientId, ChanPid, Req) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> undefined;
ConnMod -> ConnMod:call(ChanPid, Req)
end.
do_call(GwName, ClientId, ChanPid, Req, Timeout) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> undefined;
ConnMod -> ConnMod:call(ChanPid, Req, Timeout)
end.
-spec cast(gateway_name(), emqx_types:clientid(), term()) -> undefined | ok.
cast(GwName, ClientId, Req) ->
with_channel(
GwName,
ClientId,
fun(ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req)
)
end
),
ok.
do_cast(GwName, ClientId, ChanPid, Req) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> undefined;
ConnMod -> ConnMod:cast(ChanPid, Req)
end.
%% Locker
locker_trans(_GwName, undefined, Fun) ->
Fun([]);
locker_trans(GwName, ClientId, Fun) ->
Locker = lockername(GwName),
case locker_lock(Locker, ClientId) of
{true, Nodes} ->
try
Fun(Nodes)
after
locker_unlock(Locker, ClientId)
end;
{false, _Nodes} ->
{error, client_id_unavailable}
end.
locker_lock(Locker, ClientId) ->
ekka_locker:acquire(Locker, ClientId, quorum).
locker_unlock(Locker, ClientId) ->
ekka_locker:release(Locker, ClientId, quorum).
%% @private
wrap_rpc(Ret) ->
case Ret of
{badrpc, Reason} -> throw({badrpc, Reason});
Res -> Res
end.
cast(Name, Msg) ->
gen_server:cast(Name, Msg).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init(Options) ->
GwName = proplists:get_value(gwname, Options),
TabOpts = [public, {write_concurrency, true}],
{ChanTab, ConnTab, InfoTab} = cmtabs(GwName),
ok = emqx_utils_ets:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_utils_ets:new(ConnTab, [bag | TabOpts]),
ok = emqx_utils_ets:new(InfoTab, [ordered_set, compressed | TabOpts]),
%% Start link cm-registry process
%% XXX: Should I hang it under a higher level supervisor?
{ok, Registry} = emqx_gateway_cm_registry:start_link(GwName),
%% Start locker process
{ok, Locker} = ekka_locker:start_link(lockername(GwName)),
%% Interval update stats
%% TODO: v0.2
%ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #state{
gwname = GwName,
locker = Locker,
registry = Registry,
chan_pmon = emqx_pmon:new()
}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast({registered, {ClientId, ChanPid}}, State = #state{chan_pmon = PMon}) ->
PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
{noreply, State#state{chan_pmon = PMon1}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(
{'DOWN', _MRef, process, Pid, _Reason},
State = #state{gwname = GwName, chan_pmon = PMon}
) ->
ChanPids = [Pid | emqx_utils:drain_down(?DEFAULT_BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
CmTabs = cmtabs(GwName),
ok = emqx_pool:async_submit(fun do_unregister_channel_task/3, [Items, GwName, CmTabs]),
{noreply, State#state{chan_pmon = PMon1}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{registry = Registry, locker = Locker}) ->
_ = gen_server:stop(Registry),
_ = ekka_locker:stop(Locker),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
do_unregister_channel_task(Items, GwName, CmTabs) ->
lists:foreach(
fun({ChanPid, ClientId}) ->
try
do_unregister_channel(GwName, {ClientId, ChanPid}, CmTabs)
catch
error:badarg -> ok
end
end,
Items
).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
do_unregister_channel(GwName, Chan, {ChanTab, ConnTab, InfoTab}) ->
ok = emqx_gateway_cm_registry:unregister_channel(GwName, Chan),
true = ets:delete(ConnTab, Chan),
true = ets:delete(InfoTab, Chan),
ets:delete_object(ChanTab, Chan).