refactor(mgmt): move the fomating codes to emqx_mgmt_api_clients.erl

This commit is contained in:
JianBo He 2021-02-12 16:32:39 +08:00 committed by JianBo He
parent 9b3a6aa635
commit fcbf2539bc
2 changed files with 49 additions and 70 deletions

View File

@ -302,14 +302,14 @@ lookup_client({username, Username}, FormatFun) ->
lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
M:F(ets:lookup(emqx_channel, ClientId)); lists:map(fun(E) -> M:F(E) end, ets:lookup(emqx_channel, ClientId));
lookup_client(Node, {clientid, ClientId}, FormatFun) -> lookup_client(Node, {clientid, ClientId}, FormatFun) ->
rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]); rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]);
lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() -> lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() ->
MatchSpec = [{{'$1', #{clientinfo => #{username => '$2'}}, '_'}, [{'=:=','$2', Username}], ['$1']}], MatchSpec = [{{'$1', #{clientinfo => #{username => '$2'}}, '_'}, [{'=:=','$2', Username}], ['$1']}],
M:F(ets:select(emqx_channel_info, MatchSpec)); lists:map(fun(E) -> M:F(E) end, ets:select(emqx_channel_info, MatchSpec));
lookup_client(Node, {username, Username}, FormatFun) -> lookup_client(Node, {username, Username}, FormatFun) ->
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
@ -917,46 +917,6 @@ get_telemetry_data() ->
%% Common Table API %% Common Table API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
item(client, {ClientId, ChanPid}) ->
Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of
undefined -> throw(gone);
Attrs0 -> Attrs0
end,
Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of
undefined -> #{};
Stats0 -> maps:from_list(Stats0)
end,
ClientInfo = maps:get(clientinfo, Attrs, #{}),
ConnInfo = maps:get(conninfo, Attrs, #{}),
Session = case maps:get(session, Attrs, #{}) of
undefined -> #{};
_Sess -> _Sess
end,
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
Connected = case maps:get(conn_state, Attrs, connected) of
connected -> true;
_ -> false
end,
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),
max_inflight => maps:get(inflight_max, Stats, 0),
max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0),
max_mqueue => maps:get(mqueue_max, Stats, 0),
inflight => maps:get(inflight_cnt, Stats, 0),
awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)},
lists:foldl(fun(Items, Acc) ->
maps:merge(Items, Acc)
end, #{connected => Connected},
[maps:with([ subscriptions_cnt, max_subscriptions,
inflight, max_inflight, awaiting_rel,
max_awaiting_rel, mqueue_len, mqueue_dropped,
max_mqueue, heap_size, reductions, mailbox_len,
recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt,
send_msg, send_oct, send_pkt], NStats),
maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo),
maps:with([clean_start, keepalive, expiry_interval, proto_name,
proto_ver, peername, connected_at, disconnected_at], ConnInfo),
#{created_at => SessCreated}]);
item(subscription, {{Topic, ClientId}, Options}) -> item(subscription, {{Topic, ClientId}, Options}) ->
#{topic => Topic, clientid => ClientId, options => Options}; #{topic => Topic, clientid => ClientId, options => Options};

View File

@ -139,11 +139,11 @@
]). ]).
-export([ query/3 -export([ query/3
, format/1 , format_channel_info/1
]). ]).
-define(query_fun, {?MODULE, query}). -define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format}). -define(format_fun, {?MODULE, format_channel_info}).
list(Bindings, Params) when map_size(Bindings) == 0 -> list(Bindings, Params) when map_size(Bindings) == 0 ->
return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)}); return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)});
@ -249,18 +249,44 @@ parse_ratelimit_str(S) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Format %% Format
format(Items) when is_list(Items) -> format_channel_info(Key = {_ClientId, _Pid}) ->
lists:foldr( [E] = ets:lookup(emqx_channel_info, Key),
fun(Item, Acc) -> format_channel_info(E);
try
[format(Item) | Acc] format_channel_info({_Key, Info, Stats0}) ->
catch Stats = maps:from_list(Stats0),
throw:gone:_Stk -> ClientInfo = maps:get(clientinfo, Info, #{}),
Acc ConnInfo = maps:get(conninfo, Info, #{}),
end Session = case maps:get(session, Info, #{}) of
end, [], Items); undefined -> #{};
format(Key) when is_tuple(Key) -> _Sess -> _Sess
format(emqx_mgmt:item(client, Key)); end,
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
Connected = case maps:get(conn_state, Info, connected) of
connected -> true;
_ -> false
end,
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),
max_inflight => maps:get(inflight_max, Stats, 0),
max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0),
max_mqueue => maps:get(mqueue_max, Stats, 0),
inflight => maps:get(inflight_cnt, Stats, 0),
awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)},
format(
lists:foldl(fun(Items, Acc) ->
maps:merge(Items, Acc)
end, #{connected => Connected},
[maps:with([ subscriptions_cnt, max_subscriptions,
inflight, max_inflight, awaiting_rel,
max_awaiting_rel, mqueue_len, mqueue_dropped,
max_mqueue, heap_size, reductions, mailbox_len,
recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt,
send_msg, send_oct, send_pkt], NStats),
maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo),
maps:with([clean_start, keepalive, expiry_interval, proto_name,
proto_ver, peername, connected_at, disconnected_at], ConnInfo),
#{created_at => SessCreated}])).
format(Data) when is_map(Data)-> format(Data) when is_map(Data)->
{IpAddr, Port} = maps:get(peername, Data), {IpAddr, Port} = maps:get(peername, Data),
ConnectedAt = maps:get(connected_at, Data), ConnectedAt = maps:get(connected_at, Data),
@ -287,13 +313,13 @@ format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
query({Qs, []}, Start, Limit) -> query({Qs, []}, Start, Limit) ->
Ms = qs2ms_k(Qs), Ms = qs2ms(Qs),
emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format/1); emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1);
query({Qs, Fuzzy}, Start, Limit) -> query({Qs, Fuzzy}, Start, Limit) ->
Ms = qs2ms(Qs), Ms = qs2ms(Qs),
MatchFun = match_fun(Ms, Fuzzy), MatchFun = match_fun(Ms, Fuzzy),
emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format/1). emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Match funcs %% Match funcs
@ -308,11 +334,9 @@ match_fun(Ms, Fuzzy) ->
case ets:match_spec_run(Rows, MsC) of case ets:match_spec_run(Rows, MsC) of
[] -> []; [] -> [];
Ls -> Ls ->
lists:filtermap(fun(E) -> lists:filter(fun(E) ->
case run_fuzzy_match(E, REFuzzy) of run_fuzzy_match(E, REFuzzy)
false -> false; end, Ls)
true -> {true, element(1, E)}
end end, Ls)
end end
end. end.
@ -333,10 +357,6 @@ qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
[{{'$1', MtchHead, '_'}, Conds, ['$_']}]. [{{'$1', MtchHead, '_'}, Conds, ['$_']}].
qs2ms_k(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
[{{'$1', MtchHead, '_'}, Conds, ['$1']}].
qs2ms([], _, {MtchHead, Conds}) -> qs2ms([], _, {MtchHead, Conds}) ->
{MtchHead, lists:reverse(Conds)}; {MtchHead, lists:reverse(Conds)};
@ -421,7 +441,6 @@ params2qs_test() ->
?assertEqual(ExpectedMtchHead, MtchHead), ?assertEqual(ExpectedMtchHead, MtchHead),
?assertEqual(ExpectedCondi, Condi), ?assertEqual(ExpectedCondi, Condi),
[{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]), [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]).
[{{'$1', #{}, '_'}, [], ['$1']}] = qs2ms_k([]).
-endif. -endif.