From fcbf2539bcaeb34e28eb889af146362270a442bc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 12 Feb 2021 16:32:39 +0800 Subject: [PATCH] refactor(mgmt): move the fomating codes to emqx_mgmt_api_clients.erl --- .../emqx_management/src/emqx_mgmt.erl | 44 +---------- .../src/emqx_mgmt_api_clients.erl | 75 ++++++++++++------- 2 files changed, 49 insertions(+), 70 deletions(-) diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-opensource/emqx_management/src/emqx_mgmt.erl index d4f75ca94..aebb43f69 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt.erl @@ -302,14 +302,14 @@ lookup_client({username, Username}, FormatFun) -> lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> - M:F(ets:lookup(emqx_channel, ClientId)); + lists:map(fun(E) -> M:F(E) end, ets:lookup(emqx_channel, ClientId)); lookup_client(Node, {clientid, ClientId}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]); lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() -> MatchSpec = [{{'$1', #{clientinfo => #{username => '$2'}}, '_'}, [{'=:=','$2', Username}], ['$1']}], - M:F(ets:select(emqx_channel_info, MatchSpec)); + lists:map(fun(E) -> M:F(E) end, ets:select(emqx_channel_info, MatchSpec)); lookup_client(Node, {username, Username}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). @@ -917,46 +917,6 @@ get_telemetry_data() -> %% Common Table API %%-------------------------------------------------------------------- -item(client, {ClientId, ChanPid}) -> - Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of - undefined -> throw(gone); - Attrs0 -> Attrs0 - end, - Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of - undefined -> #{}; - Stats0 -> maps:from_list(Stats0) - end, - ClientInfo = maps:get(clientinfo, Attrs, #{}), - ConnInfo = maps:get(conninfo, Attrs, #{}), - Session = case maps:get(session, Attrs, #{}) of - undefined -> #{}; - _Sess -> _Sess - end, - SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), - Connected = case maps:get(conn_state, Attrs, connected) of - connected -> true; - _ -> false - end, - NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), - max_inflight => maps:get(inflight_max, Stats, 0), - max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0), - max_mqueue => maps:get(mqueue_max, Stats, 0), - inflight => maps:get(inflight_cnt, Stats, 0), - awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)}, - lists:foldl(fun(Items, Acc) -> - maps:merge(Items, Acc) - end, #{connected => Connected}, - [maps:with([ subscriptions_cnt, max_subscriptions, - inflight, max_inflight, awaiting_rel, - max_awaiting_rel, mqueue_len, mqueue_dropped, - max_mqueue, heap_size, reductions, mailbox_len, - recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, - send_msg, send_oct, send_pkt], NStats), - maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), - maps:with([clean_start, keepalive, expiry_interval, proto_name, - proto_ver, peername, connected_at, disconnected_at], ConnInfo), - #{created_at => SessCreated}]); - item(subscription, {{Topic, ClientId}, Options}) -> #{topic => Topic, clientid => ClientId, options => Options}; diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl index e9cd83139..18420cfb4 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl @@ -139,11 +139,11 @@ ]). -export([ query/3 - , format/1 + , format_channel_info/1 ]). -define(query_fun, {?MODULE, query}). --define(format_fun, {?MODULE, format}). +-define(format_fun, {?MODULE, format_channel_info}). list(Bindings, Params) when map_size(Bindings) == 0 -> return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)}); @@ -249,18 +249,44 @@ parse_ratelimit_str(S) -> %%-------------------------------------------------------------------- %% Format -format(Items) when is_list(Items) -> - lists:foldr( - fun(Item, Acc) -> - try - [format(Item) | Acc] - catch - throw:gone:_Stk -> - Acc - end - end, [], Items); -format(Key) when is_tuple(Key) -> - format(emqx_mgmt:item(client, Key)); +format_channel_info(Key = {_ClientId, _Pid}) -> + [E] = ets:lookup(emqx_channel_info, Key), + format_channel_info(E); + +format_channel_info({_Key, Info, Stats0}) -> + Stats = maps:from_list(Stats0), + ClientInfo = maps:get(clientinfo, Info, #{}), + ConnInfo = maps:get(conninfo, Info, #{}), + Session = case maps:get(session, Info, #{}) of + undefined -> #{}; + _Sess -> _Sess + end, + SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), + Connected = case maps:get(conn_state, Info, connected) of + connected -> true; + _ -> false + end, + NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), + max_inflight => maps:get(inflight_max, Stats, 0), + max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0), + max_mqueue => maps:get(mqueue_max, Stats, 0), + inflight => maps:get(inflight_cnt, Stats, 0), + awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)}, + format( + lists:foldl(fun(Items, Acc) -> + maps:merge(Items, Acc) + end, #{connected => Connected}, + [maps:with([ subscriptions_cnt, max_subscriptions, + inflight, max_inflight, awaiting_rel, + max_awaiting_rel, mqueue_len, mqueue_dropped, + max_mqueue, heap_size, reductions, mailbox_len, + recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, + send_msg, send_oct, send_pkt], NStats), + maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), + maps:with([clean_start, keepalive, expiry_interval, proto_name, + proto_ver, peername, connected_at, disconnected_at], ConnInfo), + #{created_at => SessCreated}])). + format(Data) when is_map(Data)-> {IpAddr, Port} = maps:get(peername, Data), ConnectedAt = maps:get(connected_at, Data), @@ -287,13 +313,13 @@ format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) -> %%-------------------------------------------------------------------- query({Qs, []}, Start, Limit) -> - Ms = qs2ms_k(Qs), - emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format/1); + Ms = qs2ms(Qs), + emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1); query({Qs, Fuzzy}, Start, Limit) -> Ms = qs2ms(Qs), MatchFun = match_fun(Ms, Fuzzy), - emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format/1). + emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1). %%-------------------------------------------------------------------- %% Match funcs @@ -308,11 +334,9 @@ match_fun(Ms, Fuzzy) -> case ets:match_spec_run(Rows, MsC) of [] -> []; Ls -> - lists:filtermap(fun(E) -> - case run_fuzzy_match(E, REFuzzy) of - false -> false; - true -> {true, element(1, E)} - end end, Ls) + lists:filter(fun(E) -> + run_fuzzy_match(E, REFuzzy) + end, Ls) end end. @@ -333,10 +357,6 @@ qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. -qs2ms_k(Qs) -> - {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), - [{{'$1', MtchHead, '_'}, Conds, ['$1']}]. - qs2ms([], _, {MtchHead, Conds}) -> {MtchHead, lists:reverse(Conds)}; @@ -421,7 +441,6 @@ params2qs_test() -> ?assertEqual(ExpectedMtchHead, MtchHead), ?assertEqual(ExpectedCondi, Condi), - [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]), - [{{'$1', #{}, '_'}, [], ['$1']}] = qs2ms_k([]). + [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]). -endif.