refactor(emqx_gateway): Decorate RPCs

This commit is contained in:
k32 2022-01-20 10:08:02 +01:00
parent 9fad296385
commit ab8ab21383
2 changed files with 146 additions and 37 deletions

View File

@ -63,6 +63,19 @@
, code_change/3 , code_change/3
]). ]).
%% RPC targets
-export([ do_get_chan_info/3
, do_set_chan_info/4
, do_get_chan_stats/3
, do_set_chan_stats/4
, do_discard_session/3
, do_kick_session/3
, do_get_chann_conn_mod/3
]).
-export_type([ gateway_name/0
]).
-record(state, { -record(state, {
gwname :: gateway_name(), %% Gateway Name gwname :: gateway_name(), %% Gateway Name
locker :: pid(), %% ClientId Locker for CM locker :: pid(), %% ClientId Locker for CM
@ -146,16 +159,19 @@ get_chan_info(GwName, ClientId) ->
get_chan_info(GwName, ClientId, ChanPid) get_chan_info(GwName, ClientId, ChanPid)
end). end).
-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:infos() | undefined. -> emqx_types:infos() | undefined.
get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> do_get_chan_info(GwName, ClientId, ChanPid) ->
Chan = {ClientId, ChanPid}, Chan = {ClientId, ChanPid},
try ets:lookup_element(tabname(info, GwName), Chan, 2) try ets:lookup_element(tabname(info, GwName), Chan, 2)
catch catch
error:badarg -> undefined error:badarg -> undefined
end; end.
-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:infos() | undefined.
get_chan_info(GwName, ClientId, ChanPid) -> get_chan_info(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]). wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)).
%% @doc Update infos of the channel. %% @doc Update infos of the channel.
-spec set_chan_info(gateway_name(), -spec set_chan_info(gateway_name(),
@ -164,18 +180,23 @@ get_chan_info(GwName, ClientId, ChanPid) ->
set_chan_info(GwName, ClientId, Infos) -> set_chan_info(GwName, ClientId, Infos) ->
set_chan_info(GwName, ClientId, self(), Infos). set_chan_info(GwName, ClientId, self(), Infos).
-spec set_chan_info(gateway_name(), -spec do_set_chan_info(gateway_name(),
emqx_types:clientid(), emqx_types:clientid(),
pid(), pid(),
emqx_types:infos()) -> boolean(). emqx_types:infos()) -> boolean().
set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() -> do_set_chan_info(GwName, ClientId, ChanPid, Infos) ->
Chan = {ClientId, ChanPid}, Chan = {ClientId, ChanPid},
try ets:update_element(tabname(info, GwName), Chan, {2, Infos}) try ets:update_element(tabname(info, GwName), Chan, {2, Infos})
catch catch
error:badarg -> false error:badarg -> false
end; end.
-spec set_chan_info(gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:infos()) -> boolean().
set_chan_info(GwName, ClientId, ChanPid, Infos) -> set_chan_info(GwName, ClientId, ChanPid, Infos) ->
rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]). wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_info(GwName, ClientId, ChanPid, Infos)).
%% @doc Get channel's stats. %% @doc Get channel's stats.
-spec get_chan_stats(gateway_name(), emqx_types:clientid()) -spec get_chan_stats(gateway_name(), emqx_types:clientid())
@ -186,16 +207,19 @@ get_chan_stats(GwName, ClientId) ->
get_chan_stats(GwName, ClientId, ChanPid) get_chan_stats(GwName, ClientId, ChanPid)
end). end).
-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) -spec do_get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:stats() | undefined. -> emqx_types:stats() | undefined.
get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> do_get_chan_stats(GwName, ClientId, ChanPid) ->
Chan = {ClientId, ChanPid}, Chan = {ClientId, ChanPid},
try ets:lookup_element(tabname(info, GwName), Chan, 3) try ets:lookup_element(tabname(info, GwName), Chan, 3)
catch catch
error:badarg -> undefined error:badarg -> undefined
end; end.
-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:stats() | undefined.
get_chan_stats(GwName, ClientId, ChanPid) -> get_chan_stats(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]). wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_stats(GwName, ClientId, ChanPid)).
-spec set_chan_stats(gateway_name(), -spec set_chan_stats(gateway_name(),
emqx_types:clientid(), emqx_types:clientid(),
@ -203,19 +227,23 @@ get_chan_stats(GwName, ClientId, ChanPid) ->
set_chan_stats(GwName, ClientId, Stats) -> set_chan_stats(GwName, ClientId, Stats) ->
set_chan_stats(GwName, ClientId, self(), Stats). set_chan_stats(GwName, ClientId, self(), Stats).
-spec set_chan_stats(gateway_name(), -spec do_set_chan_stats(gateway_name(),
emqx_types:clientid(), emqx_types:clientid(),
pid(), pid(),
emqx_types:stats()) -> boolean(). emqx_types:stats()) -> boolean().
set_chan_stats(GwName, ClientId, ChanPid, Stats) do_set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
when node(ChanPid) == node() ->
Chan = {ClientId, self()}, Chan = {ClientId, self()},
try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
catch catch
error:badarg -> false error:badarg -> false
end; end.
-spec set_chan_stats(gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:stats()) -> boolean().
set_chan_stats(GwName, ClientId, ChanPid, Stats) -> set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]). wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_stats(GwName, ClientId, ChanPid, Stats)).
-spec connection_closed(gateway_name(), emqx_types:clientid()) -> true. -spec connection_closed(gateway_name(), emqx_types:clientid()) -> true.
connection_closed(GwName, ClientId) -> connection_closed(GwName, ClientId) ->
@ -297,11 +325,11 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
discard_session(GwName, ClientId) when is_binary(ClientId) -> discard_session(GwName, ClientId) when is_binary(ClientId) ->
case lookup_channels(GwName, ClientId) of case lookup_channels(GwName, ClientId) of
[] -> ok; [] -> ok;
ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids) ChanPids -> lists:foreach(fun(Pid) -> safe_discard_session(GwName, ClientId, Pid) end, ChanPids)
end. end.
%% @private %% @private
do_discard_session(GwName, ClientId, Pid) -> safe_discard_session(GwName, ClientId, Pid) ->
try try
discard_session(GwName, ClientId, Pid) discard_session(GwName, ClientId, Pid)
catch catch
@ -315,17 +343,20 @@ do_discard_session(GwName, ClientId, Pid) ->
ok ok
end. end.
%% @private -spec do_discard_session(gateway_name(), emqx_types:clientid(), pid()) ->
discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> _.
do_discard_session(GwName, ClientId, ChanPid) ->
case get_chann_conn_mod(GwName, ClientId, ChanPid) of case get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> ok; undefined -> ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard, ?T_TAKEOVER) ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end; end.
%% @private %% @private
-spec discard_session(gateway_name(), emqx_types:clientid(), pid()) ->
_.
discard_session(GwName, ClientId, ChanPid) -> discard_session(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]). wrap_rpc(emqx_gateway_cm_proto_v1:discard_session(GwName, ClientId, ChanPid)).
-spec kick_session(gateway_name(), emqx_types:clientid()) -spec kick_session(gateway_name(), emqx_types:clientid())
-> {error, any()} -> {error, any()}
@ -346,16 +377,20 @@ kick_session(GwName, ClientId) ->
kick_session(GwName, ClientId, ChanPid) kick_session(GwName, ClientId, ChanPid)
end. end.
kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> -spec do_kick_session(gateway_name(), emqx_types:clientid(), pid()) ->
_.
do_kick_session(GwName, ClientId, ChanPid) ->
case get_chan_info(GwName, ClientId, ChanPid) of case get_chan_info(GwName, ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} -> #{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick, ?T_TAKEOVER); ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined -> undefined ->
{error, not_found} {error, not_found}
end; end.
-spec kick_session(gateway_name(), emqx_types:clientid(), pid()) ->
_.
kick_session(GwName, ClientId, ChanPid) -> kick_session(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]). wrap_rpc(emqx_gateway_cm_proto_v1:kick_session(GwName, ClientId, ChanPid)).
with_channel(GwName, ClientId, Fun) -> with_channel(GwName, ClientId, Fun) ->
case lookup_channels(GwName, ClientId) of case lookup_channels(GwName, ClientId) of
@ -369,14 +404,17 @@ with_channel(GwName, ClientId, Fun) ->
lookup_channels(GwName, ClientId) -> lookup_channels(GwName, ClientId) ->
emqx_gateway_cm_registry:lookup_channels(GwName, ClientId). emqx_gateway_cm_registry:lookup_channels(GwName, ClientId).
get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> -spec do_get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
do_get_chann_conn_mod(GwName, ClientId, ChanPid) ->
Chan = {ClientId, ChanPid}, Chan = {ClientId, ChanPid},
try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod
catch catch
error:badarg -> undefined error:badarg -> undefined
end; end.
-spec get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
get_chann_conn_mod(GwName, ClientId, ChanPid) -> get_chann_conn_mod(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]). wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)).
%% Locker %% Locker
@ -398,8 +436,8 @@ locker_unlock(Locker, ClientId) ->
ekka_locker:release(Locker, ClientId, quorum). ekka_locker:release(Locker, ClientId, quorum).
%% @private %% @private
rpc_call(Node, Fun, Args) -> wrap_rpc(Ret) ->
case rpc:call(Node, ?MODULE, Fun, Args) of case Ret of
{badrpc, Reason} -> error(Reason); {badrpc, Reason} -> error(Reason);
Res -> Res Res -> Res
end. end.

View File

@ -0,0 +1,71 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_cm_proto_v1).
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, get_chan_info/3
, set_chan_info/4
, get_chan_stats/3
, set_chan_stats/4
, discard_session/3
, kick_session/3
, get_chann_conn_mod/3
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:infos() | undefined | {badrpc, _}.
get_chan_info(GwName, ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_info, [GwName, ClientId, ChanPid]).
-spec set_chan_info(emqx_gateway_cm:gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:infos()) -> boolean() | {badrpc, _}.
set_chan_info(GwName, ClientId, ChanPid, Infos) ->
rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_info, [GwName, ClientId, ChanPid, Infos]).
-spec get_chan_stats(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:stats() | undefined | {badrpc, _}.
get_chan_stats(GwName, ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_stats, [GwName, ClientId, ChanPid]).
-spec set_chan_stats(emqx_gateway_cm:gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:stats()) -> boolean() | {badrpc, _}.
set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_stats, [GwName, ClientId, ChanPid, Stats]).
-spec discard_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _.
discard_session(GwName, ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_gateway_cm, do_discard_session, [GwName, ClientId, ChanPid]).
-spec kick_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _.
kick_session(GwName, ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_gateway_cm, do_kick_session, [GwName, ClientId, ChanPid]).
-spec get_chann_conn_mod(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> atom() | {badrpc, _}.
get_chann_conn_mod(GwName, ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chann_conn_mod, [GwName, ClientId, ChanPid]).