diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index a84fe112c..6c889e000 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -56,6 +56,8 @@ -export([ lookup_channels/1 , lookup_channels/2 + + , lookup_client/1 ]). %% Test/debug interface @@ -80,8 +82,16 @@ , get_connected_client_count/0 ]). +-export_type([ channel_info/0 + ]). + -type(chan_pid() :: pid()). +-type(channel_info() :: { _Chan :: {emqx_types:clientid(), pid()} + , _Info :: emqx_types:infos() + , _Stats :: emqx_types:stats() + }). + %% Tables for channel management. -define(CHAN_TAB, emqx_channel). -define(CHAN_CONN_TAB, emqx_channel_conn). @@ -502,6 +512,18 @@ lookup_channels(global, ClientId) -> lookup_channels(local, ClientId) -> [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. +-spec lookup_client({clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [channel_info()]. +lookup_client({username, Username}) -> + MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'} + , [{'=:=','$1', Username}] + , ['$_'] + }], + ets:select(emqx_channel_info, MatchSpec); +lookup_client({clientid, ClientId}) -> + [Rec || Key <- ets:lookup(emqx_channel, ClientId) + , Rec <- ets:lookup(emqx_channel_info, Key)]. + %% @private rpc_call(Node, Fun, Args, Timeout) -> case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index 98c4f36c9..539ca9962 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -24,6 +24,7 @@ , forward_async/3 , client_subscriptions/2 + , lookup_client/2 , kickout_client/2 ]). @@ -51,3 +52,8 @@ client_subscriptions(Node, ClientId) -> -spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. kickout_client(Node, ClientId) -> rpc:call(Node, emqx_cm, kick_session, [ClientId]). + +-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [emqx_cm:channel_info()] | {badrpc, _}. +lookup_client(Node, Key) -> + rpc:call(Node, emqx_cm, lookup_client, [Key]). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index a054a0989..1adc2f61f 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -121,6 +121,12 @@ -elvis([{elvis_style, god_modules, disable}]). +-export_type([listener_manage_op/0]). + +-type listener_manage_op() :: start_listener + | stop_listener + | restart_listener. + %% TODO: remove these function after all api use minirest version 1.X return() -> ok. @@ -241,24 +247,11 @@ lookup_client({username, Username}, FormatFun) -> lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]). -lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> - lists:append(lists:map( - fun(Key) -> - lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key)) - end, ets:lookup(emqx_channel, ClientId))); - -lookup_client(Node, {clientid, ClientId}, FormatFun) -> - rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]); - -lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() -> - MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'} - , [{'=:=','$1', Username}] - , ['$_'] - }], - lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec)); - -lookup_client(Node, {username, Username}, FormatFun) -> - rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). +lookup_client(Node, Key, {M, F}) -> + case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of + {error, Err} -> {error, Err}; + L -> lists:map(fun M:F/1, L) + end. kickout_client({ClientID, FormatFun}) -> case lookup_client({clientid, ClientID}, FormatFun) of @@ -464,10 +457,7 @@ listener_id_filter(Id, Listeners) -> Filter = fun(#{id := Id0}) -> Id0 =:= Id end, lists:filter(Filter, Listeners). - --spec manage_listener( Operation :: start_listener - | stop_listener - | restart_listener +-spec manage_listener( listener_manage_op() , Param :: map()) -> ok | {error, Reason :: term()}. manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->