From 925654978e0dc73171859fc4d8fafac4eef87a4c Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 20 Jan 2022 18:12:17 +0100 Subject: [PATCH] refactor(emqx_gateway): Reuse emqx_gateway_cm APIs --- .../src/emqx_gateway_api_clients.erl | 3 +- apps/emqx_gateway/src/emqx_gateway_cm.erl | 21 ++++++++- apps/emqx_gateway/src/emqx_gateway_http.erl | 45 +++++-------------- .../src/proto/emqx_gateway_cm_proto_v1.erl | 6 +++ .../test/emqx_gateway_cm_SUITE.erl | 2 +- 5 files changed, 39 insertions(+), 38 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 86dbb69af..eddd25853 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -304,10 +304,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, %% format funcs format_channel_info({_, Infos, Stats} = R) -> + Node = maps:get(node, Infos, node()), ClientInfo = maps:get(clientinfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}), SessInfo = maps:get(session, Infos, #{}), - FetchX = [ {node, ClientInfo, node()} + FetchX = [ {node, ClientInfo, Node} , {clientid, ClientInfo} , {username, ClientInfo} , {proto_name, ConnInfo} diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index d1c0dc89a..b9493349a 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -36,6 +36,7 @@ , register_channel/4 , unregister_channel/2 , insert_channel_info/4 + , lookup_by_clientid/2 , set_chan_info/3 , set_chan_info/4 , get_chan_info/2 @@ -64,7 +65,8 @@ ]). %% RPC targets --export([ do_get_chan_info/3 +-export([ do_lookup_by_clientid/2 + , do_get_chan_info/3 , do_set_chan_info/4 , do_get_chan_stats/3 , do_set_chan_stats/4 @@ -159,11 +161,19 @@ get_chan_info(GwName, ClientId) -> get_chan_info(GwName, ClientId, ChanPid) end). +-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> + [pid()]. +do_lookup_by_clientid(GwName, ClientId) -> + ChanTab = emqx_gateway_cm:tabname(chan, GwName), + [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)]. + -spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:infos() | undefined. do_get_chan_info(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, - try ets:lookup_element(tabname(info, GwName), Chan, 2) + try + Info = ets:lookup_element(tabname(info, GwName), Chan, 2), + Info#{node => node()} catch error:badarg -> undefined end. @@ -173,6 +183,13 @@ do_get_chan_info(GwName, ClientId, ChanPid) -> get_chan_info(GwName, ClientId, ChanPid) -> wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). +-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> + [pid()]. +lookup_by_clientid(GwName, ClientId) -> + Nodes = mria_mnesia:running_nodes(), + Pids = emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId), + lists:append([Pid || {ok, Pid} <- Pids]). + %% @doc Update infos of the channel. -spec set_chan_info(gateway_name(), emqx_types:clientid(), diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 641f29932..0db2e1b3d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -47,9 +47,7 @@ %% Mgmt APIs - clients -export([ lookup_client/3 - , lookup_client/4 , kickout_client/2 - , kickout_client/3 , list_client_subscriptions/2 , client_subscribe/4 , client_unsubscribe/3 @@ -231,41 +229,26 @@ confexp({error, Reason}) -> error(Reason). %%-------------------------------------------------------------------- -spec lookup_client(gateway_name(), - emqx_types:clientid(), {atom(), atom()}) -> list(). -lookup_client(GwName, ClientId, FormatFun) -> - lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) - || Node <- mria_mnesia:running_nodes()]). - -lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> - ChanTab = emqx_gateway_cm:tabname(chan, GwName), - InfoTab = emqx_gateway_cm:tabname(info, GwName), - - lists:append(lists:map( - fun(Key) -> - lists:map(fun M:F/1, ets:lookup(InfoTab, Key)) - end, ets:lookup(ChanTab, ClientId))); - -lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> - rpc_call(Node, lookup_client, - [Node, GwName, {clientid, ClientId}, FormatFun]). + emqx_types:clientid(), {module(), atom()}) -> list(). +lookup_client(GwName, ClientId, {M, F}) -> + [begin + Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid), + Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid), + M:F({{ClientId, Pid}, Info, Stats}) + end + || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)]. -spec kickout_client(gateway_name(), emqx_types:clientid()) -> {error, any()} | ok. kickout_client(GwName, ClientId) -> - Results = [kickout_client(Node, GwName, ClientId) - || Node <- mria_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of + Results = [emqx_gateway_cm:kick_session(GwName, ClientId, Pid) + || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)], + case Results =:= [] orelse lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> lists:last(Results) end. -kickout_client(Node, GwName, ClientId) when Node =:= node() -> - emqx_gateway_cm:kick_session(GwName, ClientId); - -kickout_client(Node, GwName, ClientId) -> - rpc_call(Node, kickout_client, [Node, GwName, ClientId]). - -spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) -> {error, any()} | {ok, list()}. @@ -459,9 +442,3 @@ to_list(B) when is_binary(B) -> %%-------------------------------------------------------------------- %% Internal funcs - -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl index 49db5d311..5862d5567 100644 --- a/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl +++ b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl @@ -27,6 +27,7 @@ , discard_session/3 , kick_session/3 , get_chann_conn_mod/3 + , lookup_by_clientid/3 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -34,6 +35,11 @@ introduced_in() -> "5.0.0". +-spec lookup_by_clientid([node()], emqx_gateway_cm:gateway_name(), emqx_types:clientid()) -> + emqx_rpc:multicall_return([pid()]). +lookup_by_clientid(Nodes, GwName, ClientId) -> + erpc:multicall(Nodes, emqx_gateway_cm, do_lookup_by_clientid, [GwName, ClientId]). + -spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:infos() | undefined | {badrpc, _}. get_chan_info(GwName, ClientId, ChanPid) -> diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl index 82d97a166..7e5fbefb9 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl @@ -125,7 +125,7 @@ t_get_set_chan_info_stats(_) -> #{clientinfo => clientinfo(), conninfo => conninfo()}, []), %% Info: get/set - NInfo = #{newinfo => true}, + NInfo = #{newinfo => true, node => node()}, emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo), ?assertEqual( NInfo,