diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 647d3a6a7..92ca70e10 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -63,6 +63,19 @@ , code_change/3 ]). +%% RPC targets +-export([ do_get_chan_info/3 + , do_set_chan_info/4 + , do_get_chan_stats/3 + , do_set_chan_stats/4 + , do_discard_session/3 + , do_kick_session/3 + , do_get_chann_conn_mod/3 + ]). + +-export_type([ gateway_name/0 + ]). + -record(state, { gwname :: gateway_name(), %% Gateway Name locker :: pid(), %% ClientId Locker for CM @@ -146,16 +159,19 @@ get_chan_info(GwName, ClientId) -> get_chan_info(GwName, ClientId, ChanPid) end). --spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) +-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:infos() | undefined. -get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_info(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(tabname(info, GwName), Chan, 2) catch error:badarg -> undefined - end; + end. + +-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:infos() | undefined. get_chan_info(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). %% @doc Update infos of the channel. -spec set_chan_info(gateway_name(), @@ -164,18 +180,23 @@ get_chan_info(GwName, ClientId, ChanPid) -> set_chan_info(GwName, ClientId, Infos) -> set_chan_info(GwName, ClientId, self(), Infos). --spec set_chan_info(gateway_name(), - emqx_types:clientid(), - pid(), - emqx_types:infos()) -> boolean(). -set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() -> +-spec do_set_chan_info(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean(). +do_set_chan_info(GwName, ClientId, ChanPid, Infos) -> Chan = {ClientId, ChanPid}, try ets:update_element(tabname(info, GwName), Chan, {2, Infos}) catch error:badarg -> false - end; + end. + +-spec set_chan_info(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean(). set_chan_info(GwName, ClientId, ChanPid, Infos) -> - rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]). + wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_info(GwName, ClientId, ChanPid, Infos)). %% @doc Get channel's stats. -spec get_chan_stats(gateway_name(), emqx_types:clientid()) @@ -186,16 +207,19 @@ get_chan_stats(GwName, ClientId) -> get_chan_stats(GwName, ClientId, ChanPid) end). --spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) +-spec do_get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:stats() | undefined. -get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_stats(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(tabname(info, GwName), Chan, 3) catch error:badarg -> undefined - end; + end. + +-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:stats() | undefined. get_chan_stats(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_stats(GwName, ClientId, ChanPid)). -spec set_chan_stats(gateway_name(), emqx_types:clientid(), @@ -203,19 +227,23 @@ get_chan_stats(GwName, ClientId, ChanPid) -> set_chan_stats(GwName, ClientId, Stats) -> set_chan_stats(GwName, ClientId, self(), Stats). --spec set_chan_stats(gateway_name(), - emqx_types:clientid(), - pid(), - emqx_types:stats()) -> boolean(). -set_chan_stats(GwName, ClientId, ChanPid, Stats) - when node(ChanPid) == node() -> +-spec do_set_chan_stats(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:stats()) -> boolean(). +do_set_chan_stats(GwName, ClientId, ChanPid, Stats) -> Chan = {ClientId, self()}, try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) catch error:badarg -> false - end; + end. + +-spec set_chan_stats(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:stats()) -> boolean(). set_chan_stats(GwName, ClientId, ChanPid, Stats) -> - rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]). + wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_stats(GwName, ClientId, ChanPid, Stats)). -spec connection_closed(gateway_name(), emqx_types:clientid()) -> true. connection_closed(GwName, ClientId) -> @@ -297,11 +325,11 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> discard_session(GwName, ClientId) when is_binary(ClientId) -> case lookup_channels(GwName, ClientId) of [] -> ok; - ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> safe_discard_session(GwName, ClientId, Pid) end, ChanPids) end. %% @private -do_discard_session(GwName, ClientId, Pid) -> +safe_discard_session(GwName, ClientId, Pid) -> try discard_session(GwName, ClientId, Pid) catch @@ -315,17 +343,20 @@ do_discard_session(GwName, ClientId, Pid) -> ok end. -%% @private -discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_discard_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. +do_discard_session(GwName, ClientId, ChanPid) -> case get_chann_conn_mod(GwName, ClientId, ChanPid) of undefined -> ok; ConnMod when is_atom(ConnMod) -> ConnMod:call(ChanPid, discard, ?T_TAKEOVER) - end; + end. %% @private +-spec discard_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. discard_session(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:discard_session(GwName, ClientId, ChanPid)). -spec kick_session(gateway_name(), emqx_types:clientid()) -> {error, any()} @@ -346,16 +377,20 @@ kick_session(GwName, ClientId) -> kick_session(GwName, ClientId, ChanPid) end. -kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_kick_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. +do_kick_session(GwName, ClientId, ChanPid) -> case get_chan_info(GwName, ClientId, ChanPid) of #{conninfo := #{conn_mod := ConnMod}} -> ConnMod:call(ChanPid, kick, ?T_TAKEOVER); undefined -> {error, not_found} - end; + end. +-spec kick_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. kick_session(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:kick_session(GwName, ClientId, ChanPid)). with_channel(GwName, ClientId, Fun) -> case lookup_channels(GwName, ClientId) of @@ -369,14 +404,17 @@ with_channel(GwName, ClientId, Fun) -> lookup_channels(GwName, ClientId) -> emqx_gateway_cm_registry:lookup_channels(GwName, ClientId). -get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom(). +do_get_chann_conn_mod(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod catch error:badarg -> undefined - end; + end. + +-spec get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom(). get_chann_conn_mod(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)). %% Locker @@ -398,8 +436,8 @@ locker_unlock(Locker, ClientId) -> ekka_locker:release(Locker, ClientId, quorum). %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of +wrap_rpc(Ret) -> + case Ret of {badrpc, Reason} -> error(Reason); Res -> Res end. diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl new file mode 100644 index 000000000..49db5d311 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_cm_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , get_chan_info/3 + , set_chan_info/4 + , get_chan_stats/3 + , set_chan_stats/4 + , discard_session/3 + , kick_session/3 + , get_chann_conn_mod/3 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:infos() | undefined | {badrpc, _}. +get_chan_info(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_info, [GwName, ClientId, ChanPid]). + +-spec set_chan_info(emqx_gateway_cm:gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean() | {badrpc, _}. +set_chan_info(GwName, ClientId, ChanPid, Infos) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_info, [GwName, ClientId, ChanPid, Infos]). + +-spec get_chan_stats(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:stats() | undefined | {badrpc, _}. +get_chan_stats(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_stats, [GwName, ClientId, ChanPid]). + +-spec set_chan_stats(emqx_gateway_cm:gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:stats()) -> boolean() | {badrpc, _}. +set_chan_stats(GwName, ClientId, ChanPid, Stats) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_stats, [GwName, ClientId, ChanPid, Stats]). + +-spec discard_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _. +discard_session(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_discard_session, [GwName, ClientId, ChanPid]). + +-spec kick_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _. +kick_session(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_kick_session, [GwName, ClientId, ChanPid]). + +-spec get_chann_conn_mod(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> atom() | {badrpc, _}. +get_chann_conn_mod(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chann_conn_mod, [GwName, ClientId, ChanPid]).