From cc468dca4ef1879797625d3445d2b38424a171c5 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 26 Oct 2022 11:49:22 +0300 Subject: [PATCH] chore(cm): sync emqx_cm with ee --- src/emqx_cm.erl | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 09169b160..90adc86a2 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -63,8 +63,8 @@ ]). -export([all_channels/0, - channel_with_session_table/0, - live_connection_table/0]). + channel_with_session_table/1, + live_connection_table/1]). %% gen_server callbacks -export([ init/1 @@ -433,7 +433,7 @@ all_channels() -> ets:select(?CHAN_TAB, Pat). %% @doc Get clientinfo for all clients with sessions -channel_with_session_table() -> +channel_with_session_table(ConnModules) -> Ms = ets:fun2ms( fun({{ClientId, _ChanPid}, Info, @@ -441,22 +441,25 @@ channel_with_session_table() -> {ClientId, Info} end), Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]), + ConnModuleMap = maps:from_list([{Mod, true} || Mod <- ConnModules]), qlc:q([ {ClientId, ConnState, ConnInfo, ClientInfo} || {ClientId, #{conn_state := ConnState, clientinfo := ClientInfo, - conninfo := #{clean_start := false} = ConnInfo}} <- Table + conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo}} + <- Table, + maps:is_key(ConnModule, ConnModuleMap) ]). %% @doc Get all local connection query handle -live_connection_table() -> - Ms = ets:fun2ms( - fun({{ClientId, ChanPid}, _}) -> - {ClientId, ChanPid} - end), +live_connection_table(ConnModules) -> + Ms = lists:map(fun live_connection_ms/1, ConnModules), Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]), qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ClientId, ChanPid)]). +live_connection_ms(ConnModule) -> + {{{'$1','$2'},ConnModule},[],[{{'$1','$2'}}]}. + is_channel_connected(ClientId, ChanPid) when node(ChanPid) =:= node() -> case get_chan_info(ClientId, ChanPid) of #{conn_state := disconnected} -> false;