From 97078002f244f1c0b3cd268f74e391170b8d77a0 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Wed, 19 Jan 2022 10:04:34 +0100 Subject: [PATCH] refactor(emqx_cm): Decorate RPCs --- apps/emqx/src/emqx_cm.erl | 65 ++++++++++-------- apps/emqx/src/emqx_cm.hrl | 23 +++++++ apps/emqx/src/proto/emqx_broker_proto_v1.erl | 12 ---- apps/emqx/src/proto/emqx_cm_proto_v1.erl | 71 ++++++++++++++++++++ apps/emqx_management/src/emqx_mgmt.erl | 4 +- 5 files changed, 133 insertions(+), 42 deletions(-) create mode 100644 apps/emqx/src/emqx_cm.hrl create mode 100644 apps/emqx/src/proto/emqx_cm_proto_v1.erl diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 6c889e000..3c46c6b54 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -80,9 +80,15 @@ , mark_channel_connected/1 , mark_channel_disconnected/1 , get_connected_client_count/0 + + , do_kick_session/3 + , do_get_chan_stats/2 + , do_get_chan_info/2 + , do_get_chann_conn_mod/2 ]). -export_type([ channel_info/0 + , chan_pid/0 ]). -type(chan_pid() :: pid()). @@ -92,6 +98,8 @@ , _Stats :: emqx_types:stats() }). +-include("emqx_cm.hrl"). + %% Tables for channel management. -define(CHAN_TAB, emqx_channel). -define(CHAN_CONN_TAB, emqx_channel_conn). @@ -111,10 +119,6 @@ %% Server name -define(CM, ?MODULE). --define(T_KICK, 5_000). --define(T_GET_INFO, 5_000). --define(T_TAKEOVER, 15_000). - %% linting overrides -elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}} , {elvis_style, god_modules, #{ignore => [emqx_cm]}} @@ -181,16 +185,19 @@ connection_closed(ClientId, ChanPid) -> get_chan_info(ClientId) -> with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end). --spec(get_chan_info(emqx_types:clientid(), chan_pid()) +-spec(do_get_chan_info(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:infos())). -get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_info(ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(?CHAN_INFO_TAB, Chan, 2) catch error:badarg -> undefined - end; + end. + +-spec(get_chan_info(emqx_types:clientid(), chan_pid()) + -> maybe(emqx_types:infos())). get_chan_info(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO). + wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)). %% @doc Update infos of the channel. -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()). @@ -206,16 +213,19 @@ set_chan_info(ClientId, Info) when is_binary(ClientId) -> get_chan_stats(ClientId) -> with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end). --spec(get_chan_stats(emqx_types:clientid(), chan_pid()) +-spec(do_get_chan_stats(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:stats())). -get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_stats(ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(?CHAN_INFO_TAB, Chan, 3) catch error:badarg -> undefined - end; + end. + +-spec(get_chan_stats(emqx_types:clientid(), chan_pid()) + -> maybe(emqx_types:stats())). get_chan_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO). + wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)). %% @doc Set channel's stats. -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()). @@ -368,7 +378,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> {living, ConnMod, ChanPid, Session} end; do_takeover_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). + wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)). %% @doc Discard all the sessions identified by the ClientId. -spec(discard_session(emqx_types:clientid()) -> ok). @@ -422,24 +432,20 @@ discard_session(ClientId, ChanPid) -> kick_session(ClientId, ChanPid) -> kick_session(kick, ClientId, ChanPid). -%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). -kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_kick_session(kick | discard, emqx_types:clientid(), chan_pid()) -> ok. +do_kick_session(Action, ClientId, ChanPid) -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> ok = kick_or_kill(Action, ConnMod, ChanPid) - end; + end. + +%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). kick_session(Action, ClientId, ChanPid) -> - %% call remote node on the old APIs because we do not know if they have upgraded - %% to have kick_session/3 - Function = case Action of - discard -> discard_session; - kick -> kick_session - end, try - rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK) + wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid)) catch Error : Reason -> %% This should mostly be RPC failures. @@ -525,8 +531,8 @@ lookup_client({clientid, ClientId}) -> , Rec <- ets:lookup(emqx_channel_info, Key)]. %% @private -rpc_call(Node, Fun, Args, Timeout) -> - case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of +wrap_rpc(Result) -> + case Result of {badrpc, Reason} -> %% since emqx app 4.3.10, the 'kick' and 'discard' calls handler %% should catch all exceptions and always return 'ok'. @@ -599,14 +605,17 @@ update_stats({Tab, Stat, MaxStat}) -> Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. -get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_get_chann_conn_mod(emqx_types:clientid(), chan_pid()) -> + module() | undefined. +do_get_chann_conn_mod(ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod catch error:badarg -> undefined - end; + end. + get_chann_conn_mod(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). + wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)). mark_channel_connected(ChanPid) -> ?tp(emqx_cm_connected_client_count_inc, #{}), diff --git a/apps/emqx/src/emqx_cm.hrl b/apps/emqx/src/emqx_cm.hrl new file mode 100644 index 000000000..ec23e21e9 --- /dev/null +++ b/apps/emqx/src/emqx_cm.hrl @@ -0,0 +1,23 @@ +%%------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_CM_HRL). +-define(EMQX_CM_HRL, true). + +-define(T_KICK, 5_000). +-define(T_GET_INFO, 5_000). +-define(T_TAKEOVER, 15_000). + +-endif. diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index d55fef88f..8bf777935 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -25,9 +25,6 @@ , list_client_subscriptions/2 , list_subscriptions_via_topic/2 - , lookup_client/2 - , kickout_client/2 - , start_listener/2 , stop_listener/2 , restart_listener/2 @@ -48,15 +45,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). --spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. -kickout_client(Node, ClientId) -> - rpc:call(Node, emqx_cm, kick_session, [ClientId]). - --spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> - [emqx_cm:channel_info()] | {badrpc, _}. -lookup_client(Node, Key) -> - rpc:call(Node, emqx_cm, lookup_client, [Key]). - -spec list_client_subscriptions(node(), emqx_types:clientid()) -> [{emqx_types:topic(), emqx_types:subopts()}] | emqx_rpc:badrpc(). diff --git a/apps/emqx/src/proto/emqx_cm_proto_v1.erl b/apps/emqx/src/proto/emqx_cm_proto_v1.erl new file mode 100644 index 000000000..e8f0115cb --- /dev/null +++ b/apps/emqx/src/proto/emqx_cm_proto_v1.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , lookup_client/2 + , kickout_client/2 + + , get_chan_stats/2 + , get_chan_info/2 + , get_chann_conn_mod/2 + + , takeover_session/2 + , kick_session/3 + ]). + +-include("bpapi.hrl"). +-include("emqx_cm.hrl"). + +introduced_in() -> + "5.0.0". + +-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. +kickout_client(Node, ClientId) -> + rpc:call(Node, emqx_cm, kick_session, [ClientId]). + +-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [emqx_cm:channel_info()] | {badrpc, _}. +lookup_client(Node, Key) -> + rpc:call(Node, emqx_cm, lookup_client, [Key]). + +-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}. +get_chan_stats(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}. +get_chan_info(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> module() | undefined | {badrpc, _}. +get_chann_conn_mod(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> + none + | {expired | persistent, emqx_session:session()} + | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + | {badrpc, _}. +takeover_session(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2). + +-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}. +kick_session(Action, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index b58767039..8148e4487 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -243,7 +243,7 @@ lookup_client({username, Username}, FormatFun) -> || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, Key, {M, F}) -> - case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of + case wrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of {error, Err} -> {error, Err}; L -> lists:map(fun({Chan, Info0, Stats}) -> Info = Info0#{node => Node}, @@ -262,7 +262,7 @@ kickout_client({ClientID, FormatFun}) -> end. kickout_client(Node, ClientId) -> - wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)). + wrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)). list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache).