diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 86dbb69af..eddd25853 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -304,10 +304,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, %% format funcs format_channel_info({_, Infos, Stats} = R) -> + Node = maps:get(node, Infos, node()), ClientInfo = maps:get(clientinfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}), SessInfo = maps:get(session, Infos, #{}), - FetchX = [ {node, ClientInfo, node()} + FetchX = [ {node, ClientInfo, Node} , {clientid, ClientInfo} , {username, ClientInfo} , {proto_name, ConnInfo} diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 647d3a6a7..d977b0933 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -36,6 +36,7 @@ , register_channel/4 , unregister_channel/2 , insert_channel_info/4 + , lookup_by_clientid/2 , set_chan_info/3 , set_chan_info/4 , get_chan_info/2 @@ -63,6 +64,20 @@ , code_change/3 ]). +%% RPC targets +-export([ do_lookup_by_clientid/2 + , do_get_chan_info/3 + , do_set_chan_info/4 + , do_get_chan_stats/3 + , do_set_chan_stats/4 + , do_discard_session/3 + , do_kick_session/3 + , do_get_chann_conn_mod/3 + ]). + +-export_type([ gateway_name/0 + ]). + -record(state, { gwname :: gateway_name(), %% Gateway Name locker :: pid(), %% ClientId Locker for CM @@ -146,16 +161,38 @@ get_chan_info(GwName, ClientId) -> get_chan_info(GwName, ClientId, ChanPid) end). --spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) +-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> + [pid()]. +do_lookup_by_clientid(GwName, ClientId) -> + ChanTab = emqx_gateway_cm:tabname(chan, GwName), + [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)]. + +-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:infos() | undefined. -get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_info(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, - try ets:lookup_element(tabname(info, GwName), Chan, 2) + try + Info = ets:lookup_element(tabname(info, GwName), Chan, 2), + Info#{node => node()} catch error:badarg -> undefined - end; + end. + +-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:infos() | undefined. get_chan_info(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). + +-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> + [pid()]. +lookup_by_clientid(GwName, ClientId) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of + {Pids, []} -> + lists:append(Pids); + {_, _BadNodes} -> + error(badrpc) + end. %% @doc Update infos of the channel. -spec set_chan_info(gateway_name(), @@ -164,18 +201,23 @@ get_chan_info(GwName, ClientId, ChanPid) -> set_chan_info(GwName, ClientId, Infos) -> set_chan_info(GwName, ClientId, self(), Infos). --spec set_chan_info(gateway_name(), - emqx_types:clientid(), - pid(), - emqx_types:infos()) -> boolean(). -set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() -> +-spec do_set_chan_info(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean(). +do_set_chan_info(GwName, ClientId, ChanPid, Infos) -> Chan = {ClientId, ChanPid}, try ets:update_element(tabname(info, GwName), Chan, {2, Infos}) catch error:badarg -> false - end; + end. + +-spec set_chan_info(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean(). set_chan_info(GwName, ClientId, ChanPid, Infos) -> - rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]). + wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_info(GwName, ClientId, ChanPid, Infos)). %% @doc Get channel's stats. -spec get_chan_stats(gateway_name(), emqx_types:clientid()) @@ -186,16 +228,19 @@ get_chan_stats(GwName, ClientId) -> get_chan_stats(GwName, ClientId, ChanPid) end). --spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) +-spec do_get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:stats() | undefined. -get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_stats(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(tabname(info, GwName), Chan, 3) catch error:badarg -> undefined - end; + end. + +-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:stats() | undefined. get_chan_stats(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_stats(GwName, ClientId, ChanPid)). -spec set_chan_stats(gateway_name(), emqx_types:clientid(), @@ -203,19 +248,23 @@ get_chan_stats(GwName, ClientId, ChanPid) -> set_chan_stats(GwName, ClientId, Stats) -> set_chan_stats(GwName, ClientId, self(), Stats). +-spec do_set_chan_stats(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:stats()) -> boolean(). +do_set_chan_stats(GwName, ClientId, ChanPid, Stats) -> + Chan = {ClientId, ChanPid}, + try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) + catch + error:badarg -> false + end. + -spec set_chan_stats(gateway_name(), emqx_types:clientid(), pid(), emqx_types:stats()) -> boolean(). -set_chan_stats(GwName, ClientId, ChanPid, Stats) - when node(ChanPid) == node() -> - Chan = {ClientId, self()}, - try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) - catch - error:badarg -> false - end; set_chan_stats(GwName, ClientId, ChanPid, Stats) -> - rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]). + wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_stats(GwName, ClientId, ChanPid, Stats)). -spec connection_closed(gateway_name(), emqx_types:clientid()) -> true. connection_closed(GwName, ClientId) -> @@ -297,11 +346,11 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> discard_session(GwName, ClientId) when is_binary(ClientId) -> case lookup_channels(GwName, ClientId) of [] -> ok; - ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> safe_discard_session(GwName, ClientId, Pid) end, ChanPids) end. %% @private -do_discard_session(GwName, ClientId, Pid) -> +safe_discard_session(GwName, ClientId, Pid) -> try discard_session(GwName, ClientId, Pid) catch @@ -315,17 +364,20 @@ do_discard_session(GwName, ClientId, Pid) -> ok end. -%% @private -discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_discard_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. +do_discard_session(GwName, ClientId, ChanPid) -> case get_chann_conn_mod(GwName, ClientId, ChanPid) of undefined -> ok; ConnMod when is_atom(ConnMod) -> ConnMod:call(ChanPid, discard, ?T_TAKEOVER) - end; + end. %% @private +-spec discard_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. discard_session(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:discard_session(GwName, ClientId, ChanPid)). -spec kick_session(gateway_name(), emqx_types:clientid()) -> {error, any()} @@ -346,16 +398,20 @@ kick_session(GwName, ClientId) -> kick_session(GwName, ClientId, ChanPid) end. -kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_kick_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. +do_kick_session(GwName, ClientId, ChanPid) -> case get_chan_info(GwName, ClientId, ChanPid) of #{conninfo := #{conn_mod := ConnMod}} -> ConnMod:call(ChanPid, kick, ?T_TAKEOVER); undefined -> {error, not_found} - end; + end. +-spec kick_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. kick_session(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:kick_session(GwName, ClientId, ChanPid)). with_channel(GwName, ClientId, Fun) -> case lookup_channels(GwName, ClientId) of @@ -369,14 +425,17 @@ with_channel(GwName, ClientId, Fun) -> lookup_channels(GwName, ClientId) -> emqx_gateway_cm_registry:lookup_channels(GwName, ClientId). -get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom(). +do_get_chann_conn_mod(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod catch error:badarg -> undefined - end; + end. + +-spec get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom(). get_chann_conn_mod(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)). %% Locker @@ -398,8 +457,8 @@ locker_unlock(Locker, ClientId) -> ekka_locker:release(Locker, ClientId, quorum). %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of +wrap_rpc(Ret) -> + case Ret of {badrpc, Reason} -> error(Reason); Res -> Res end. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 641f29932..bb4bfa7f9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -47,9 +47,7 @@ %% Mgmt APIs - clients -export([ lookup_client/3 - , lookup_client/4 , kickout_client/2 - , kickout_client/3 , list_client_subscriptions/2 , client_subscribe/4 , client_unsubscribe/3 @@ -231,41 +229,28 @@ confexp({error, Reason}) -> error(Reason). %%-------------------------------------------------------------------- -spec lookup_client(gateway_name(), - emqx_types:clientid(), {atom(), atom()}) -> list(). -lookup_client(GwName, ClientId, FormatFun) -> - lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) - || Node <- mria_mnesia:running_nodes()]). - -lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> - ChanTab = emqx_gateway_cm:tabname(chan, GwName), - InfoTab = emqx_gateway_cm:tabname(info, GwName), - - lists:append(lists:map( - fun(Key) -> - lists:map(fun M:F/1, ets:lookup(InfoTab, Key)) - end, ets:lookup(ChanTab, ClientId))); - -lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> - rpc_call(Node, lookup_client, - [Node, GwName, {clientid, ClientId}, FormatFun]). + emqx_types:clientid(), {module(), atom()}) -> list(). +lookup_client(GwName, ClientId, {M, F}) -> + [begin + Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid), + Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid), + M:F({{ClientId, Pid}, Info, Stats}) + end + || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)]. -spec kickout_client(gateway_name(), emqx_types:clientid()) -> {error, any()} | ok. kickout_client(GwName, ClientId) -> - Results = [kickout_client(Node, GwName, ClientId) - || Node <- mria_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) + Results = [emqx_gateway_cm:kick_session(GwName, ClientId, Pid) + || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)], + IsOk = lists:any(fun(Item) -> Item =:= ok end, Results), + case {IsOk, Results} of + {true , _ } -> ok; + {_ , []} -> {error, not_found}; + {false, _ } -> lists:last(Results) end. -kickout_client(Node, GwName, ClientId) when Node =:= node() -> - emqx_gateway_cm:kick_session(GwName, ClientId); - -kickout_client(Node, GwName, ClientId) -> - rpc_call(Node, kickout_client, [Node, GwName, ClientId]). - -spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) -> {error, any()} | {ok, list()}. @@ -459,9 +444,3 @@ to_list(B) when is_binary(B) -> %%-------------------------------------------------------------------- %% Internal funcs - -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl new file mode 100644 index 000000000..6aa5521c5 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl @@ -0,0 +1,77 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_cm_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , get_chan_info/3 + , set_chan_info/4 + , get_chan_stats/3 + , set_chan_stats/4 + , discard_session/3 + , kick_session/3 + , get_chann_conn_mod/3 + , lookup_by_clientid/3 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec lookup_by_clientid([node()], emqx_gateway_cm:gateway_name(), emqx_types:clientid()) -> + emqx_rpc:multicall_return([pid()]). +lookup_by_clientid(Nodes, GwName, ClientId) -> + rpc:multicall(Nodes, emqx_gateway_cm, do_lookup_by_clientid, [GwName, ClientId]). + +-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:infos() | undefined | {badrpc, _}. +get_chan_info(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_info, [GwName, ClientId, ChanPid]). + +-spec set_chan_info(emqx_gateway_cm:gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean() | {badrpc, _}. +set_chan_info(GwName, ClientId, ChanPid, Infos) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_info, [GwName, ClientId, ChanPid, Infos]). + +-spec get_chan_stats(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:stats() | undefined | {badrpc, _}. +get_chan_stats(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_stats, [GwName, ClientId, ChanPid]). + +-spec set_chan_stats(emqx_gateway_cm:gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:stats()) -> boolean() | {badrpc, _}. +set_chan_stats(GwName, ClientId, ChanPid, Stats) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_stats, [GwName, ClientId, ChanPid, Stats]). + +-spec discard_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _. +discard_session(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_discard_session, [GwName, ClientId, ChanPid]). + +-spec kick_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _. +kick_session(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_kick_session, [GwName, ClientId, ChanPid]). + +-spec get_chann_conn_mod(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> atom() | {badrpc, _}. +get_chann_conn_mod(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chann_conn_mod, [GwName, ClientId, ChanPid]). diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl index 82d97a166..558e90bd3 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl @@ -125,7 +125,7 @@ t_get_set_chan_info_stats(_) -> #{clientinfo => clientinfo(), conninfo => conninfo()}, []), %% Info: get/set - NInfo = #{newinfo => true}, + NInfo = #{newinfo => true, node => node()}, emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo), ?assertEqual( NInfo, @@ -200,6 +200,7 @@ t_kick_session(_) -> 100 -> ?assert(false, "waiting kick msg timeout") end, + ?assertMatch({error, not_found}, emqx_gateway_http:kickout_client(?GWNAME, <<"i-dont-exist">>)), meck:unload(emqx_gateway_cm_registry). t_unexpected_handle(Conf) ->