refactor(emqx_cm): Decorate RPCs

This commit is contained in:
k32 2022-01-19 10:04:34 +01:00
parent 5a9f289f1c
commit 97078002f2
5 changed files with 133 additions and 42 deletions

View File

@ -80,9 +80,15 @@
, mark_channel_connected/1
, mark_channel_disconnected/1
, get_connected_client_count/0
, do_kick_session/3
, do_get_chan_stats/2
, do_get_chan_info/2
, do_get_chann_conn_mod/2
]).
-export_type([ channel_info/0
, chan_pid/0
]).
-type(chan_pid() :: pid()).
@ -92,6 +98,8 @@
, _Stats :: emqx_types:stats()
}).
-include("emqx_cm.hrl").
%% Tables for channel management.
-define(CHAN_TAB, emqx_channel).
-define(CHAN_CONN_TAB, emqx_channel_conn).
@ -111,10 +119,6 @@
%% Server name
-define(CM, ?MODULE).
-define(T_KICK, 5_000).
-define(T_GET_INFO, 5_000).
-define(T_TAKEOVER, 15_000).
%% linting overrides
-elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}
, {elvis_style, god_modules, #{ignore => [emqx_cm]}}
@ -181,16 +185,19 @@ connection_closed(ClientId, ChanPid) ->
get_chan_info(ClientId) ->
with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end).
-spec(get_chan_info(emqx_types:clientid(), chan_pid())
-spec(do_get_chan_info(emqx_types:clientid(), chan_pid())
-> maybe(emqx_types:infos())).
get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
do_get_chan_info(ClientId, ChanPid) ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(?CHAN_INFO_TAB, Chan, 2)
catch
error:badarg -> undefined
end;
end.
-spec(get_chan_info(emqx_types:clientid(), chan_pid())
-> maybe(emqx_types:infos())).
get_chan_info(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)).
%% @doc Update infos of the channel.
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@ -206,16 +213,19 @@ set_chan_info(ClientId, Info) when is_binary(ClientId) ->
get_chan_stats(ClientId) ->
with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
-spec(get_chan_stats(emqx_types:clientid(), chan_pid())
-spec(do_get_chan_stats(emqx_types:clientid(), chan_pid())
-> maybe(emqx_types:stats())).
get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
do_get_chan_stats(ClientId, ChanPid) ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(?CHAN_INFO_TAB, Chan, 3)
catch
error:badarg -> undefined
end;
end.
-spec(get_chan_stats(emqx_types:clientid(), chan_pid())
-> maybe(emqx_types:stats())).
get_chan_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)).
%% @doc Set channel's stats.
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@ -368,7 +378,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
{living, ConnMod, ChanPid, Session}
end;
do_takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)).
%% @doc Discard all the sessions identified by the ClientId.
-spec(discard_session(emqx_types:clientid()) -> ok).
@ -422,24 +432,20 @@ discard_session(ClientId, ChanPid) ->
kick_session(ClientId, ChanPid) ->
kick_session(kick, ClientId, ChanPid).
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
-spec do_kick_session(kick | discard, emqx_types:clientid(), chan_pid()) -> ok.
do_kick_session(Action, ClientId, ChanPid) ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined ->
%% already deregistered
ok;
ConnMod when is_atom(ConnMod) ->
ok = kick_or_kill(Action, ConnMod, ChanPid)
end;
end.
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded
%% to have kick_session/3
Function = case Action of
discard -> discard_session;
kick -> kick_session
end,
try
rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid))
catch
Error : Reason ->
%% This should mostly be RPC failures.
@ -525,8 +531,8 @@ lookup_client({clientid, ClientId}) ->
, Rec <- ets:lookup(emqx_channel_info, Key)].
%% @private
rpc_call(Node, Fun, Args, Timeout) ->
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
wrap_rpc(Result) ->
case Result of
{badrpc, Reason} ->
%% since emqx app 4.3.10, the 'kick' and 'discard' calls handler
%% should catch all exceptions and always return 'ok'.
@ -599,14 +605,17 @@ update_stats({Tab, Stat, MaxStat}) ->
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end.
get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
-spec do_get_chann_conn_mod(emqx_types:clientid(), chan_pid()) ->
module() | undefined.
do_get_chann_conn_mod(ClientId, ChanPid) ->
Chan = {ClientId, ChanPid},
try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
catch
error:badarg -> undefined
end;
end.
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)).
mark_channel_connected(ChanPid) ->
?tp(emqx_cm_connected_client_count_inc, #{}),

23
apps/emqx/src/emqx_cm.hrl Normal file
View File

@ -0,0 +1,23 @@
%%-------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_CM_HRL).
-define(EMQX_CM_HRL, true).
-define(T_KICK, 5_000).
-define(T_GET_INFO, 5_000).
-define(T_TAKEOVER, 15_000).
-endif.

View File

@ -25,9 +25,6 @@
, list_client_subscriptions/2
, list_subscriptions_via_topic/2
, lookup_client/2
, kickout_client/2
, start_listener/2
, stop_listener/2
, restart_listener/2
@ -48,15 +45,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
kickout_client(Node, ClientId) ->
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
[emqx_cm:channel_info()] | {badrpc, _}.
lookup_client(Node, Key) ->
rpc:call(Node, emqx_cm, lookup_client, [Key]).
-spec list_client_subscriptions(node(), emqx_types:clientid()) ->
[{emqx_types:topic(), emqx_types:subopts()}]
| emqx_rpc:badrpc().

View File

@ -0,0 +1,71 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cm_proto_v1).
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, lookup_client/2
, kickout_client/2
, get_chan_stats/2
, get_chan_info/2
, get_chann_conn_mod/2
, takeover_session/2
, kick_session/3
]).
-include("bpapi.hrl").
-include("emqx_cm.hrl").
introduced_in() ->
"5.0.0".
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
kickout_client(Node, ClientId) ->
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
[emqx_cm:channel_info()] | {badrpc, _}.
lookup_client(Node, Key) ->
rpc:call(Node, emqx_cm, lookup_client, [Key]).
-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}.
get_chan_stats(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}.
get_chan_info(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> module() | undefined | {badrpc, _}.
get_chann_conn_mod(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
none
| {expired | persistent, emqx_session:session()}
| {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
| {badrpc, _}.
takeover_session(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).
-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}.
kick_session(Action, ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2).

View File

@ -243,7 +243,7 @@ lookup_client({username, Username}, FormatFun) ->
|| Node <- mria_mnesia:running_nodes()]).
lookup_client(Node, Key, {M, F}) ->
case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of
case wrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of
{error, Err} -> {error, Err};
L -> lists:map(fun({Chan, Info0, Stats}) ->
Info = Info0#{node => Node},
@ -262,7 +262,7 @@ kickout_client({ClientID, FormatFun}) ->
end.
kickout_client(Node, ClientId) ->
wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)).
wrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache).