From 0a7a14f4cd05f2917f7c86c355ead4b63146945b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 31 Aug 2021 12:13:24 +0800 Subject: [PATCH] chore(mgmt): callback query function with table name param --- apps/emqx_management/src/emqx_mgmt_api.erl | 31 +-- .../src/emqx_mgmt_api_alarms.erl | 23 +- .../src/emqx_mgmt_api_clients.erl | 244 +++++++++--------- .../src/emqx_mgmt_api_subscriptions.erl | 17 +- 4 files changed, 162 insertions(+), 153 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index e9aaa2725..5a7b020d7 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -22,13 +22,13 @@ %% first_next query APIs -export([ params2qs/2 - , node_query/4 - , cluster_query/3 + , node_query/5 + , cluster_query/4 , traverse_table/5 , select_table/5 ]). --export([do_query/5]). +-export([do_query/6]). paginate(Tables, Params, RowFun) -> Qh = query_handle(Tables), @@ -78,14 +78,14 @@ limit(Params) -> %% Node Query %%-------------------------------------------------------------------- -node_query(Node, Params, {Tab, QsSchema}, QueryFun) -> +node_query(Node, Params, Tab, QsSchema, QueryFun) -> {CodCnt, Qs} = params2qs(Params, QsSchema), Limit = b2i(limit(Params)), Page = b2i(page(Params)), Start = if Page > 1 -> (Page-1) * Limit; true -> 0 end, - {_, Rows} = do_query(Node, Qs, QueryFun, Start, Limit+1), + {_, Rows} = do_query(Node, Tab, Qs, QueryFun, Start, Limit+1), Meta = #{page => Page, limit => Limit}, NMeta = case CodCnt =:= 0 of true -> Meta#{count => count(Tab)}; @@ -94,10 +94,11 @@ node_query(Node, Params, {Tab, QsSchema}, QueryFun) -> #{meta => NMeta, data => lists:sublist(Rows, Limit)}. %% @private -do_query(Node, Qs, {M,F}, Start, Limit) when Node =:= node() -> - M:F(Qs, Start, Limit); -do_query(Node, Qs, QueryFun, Start, Limit) -> - rpc_call(Node, ?MODULE, do_query, [Node, Qs, QueryFun, Start, Limit], 50000). +do_query(Node, Tab, Qs, {M,F}, Start, Limit) when Node =:= node() -> + M:F(Tab, Qs, Start, Limit); +do_query(Node, Tab, Qs, QueryFun, Start, Limit) -> + rpc_call(Node, ?MODULE, do_query, + [Node, Tab, Qs, QueryFun, Start, Limit], 50000). %% @private rpc_call(Node, M, F, A, T) -> @@ -110,7 +111,7 @@ rpc_call(Node, M, F, A, T) -> %% Cluster Query %%-------------------------------------------------------------------- -cluster_query(Params, {Tab, QsSchema}, QueryFun) -> +cluster_query(Params, Tab, QsSchema, QueryFun) -> {CodCnt, Qs} = params2qs(Params, QsSchema), Limit = b2i(limit(Params)), Page = b2i(page(Params)), @@ -118,7 +119,7 @@ cluster_query(Params, {Tab, QsSchema}, QueryFun) -> true -> 0 end, Nodes = ekka_mnesia:running_nodes(), - Rows = do_cluster_query(Nodes, Qs, QueryFun, Start, Limit+1, []), + Rows = do_cluster_query(Nodes, Tab, Qs, QueryFun, Start, Limit+1, []), Meta = #{page => Page, limit => Limit}, NMeta = case CodCnt =:= 0 of true -> Meta#{count => count(Tab, Nodes)}; @@ -127,13 +128,13 @@ cluster_query(Params, {Tab, QsSchema}, QueryFun) -> #{meta => NMeta, data => lists:sublist(Rows, Limit)}. %% @private -do_cluster_query([], _, _, _, _, Acc) -> +do_cluster_query([], _, _, _, _, _, Acc) -> lists:append(lists:reverse(Acc)); -do_cluster_query([Node|Nodes], Qs, QueryFun, Start, Limit, Acc) -> - {NStart, Rows} = do_query(Node, Qs, QueryFun, Start, Limit), +do_cluster_query([Node|Nodes], Tab, Qs, QueryFun, Start, Limit, Acc) -> + {NStart, Rows} = do_query(Node, Tab, Qs, QueryFun, Start, Limit), case Limit - length(Rows) of Rest when Rest > 0 -> - do_cluster_query(Nodes, Qs, QueryFun, NStart, Limit, [Rows|Acc]); + do_cluster_query(Nodes, Tab, Qs, QueryFun, NStart, Limit, [Rows|Acc]); 0 -> lists:append(lists:reverse([Rows|Acc])) end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index 1adb5fce3..db6484060 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -22,8 +22,10 @@ -export([alarms/2]). --export([ query_activated/3 - , query_deactivated/3]). +%% internal export (for query) +-export([ query/4 + ]). + %% notice: from emqx_alarms -define(ACTIVATED_ALARM, emqx_activated_alarm). -define(DEACTIVATED_ALARM, emqx_deactivated_alarm). @@ -71,14 +73,12 @@ alarms_api() -> %%%============================================================================================== %% parameters trans alarms(get, #{query_string := Qs}) -> - {Table, Function} = + Table = case maps:get(<<"activated">>, Qs, <<"true">>) of - <<"true">> -> - {?ACTIVATED_ALARM, query_activated}; - <<"false">> -> - {?DEACTIVATED_ALARM, query_deactivated} + <<"true">> -> ?ACTIVATED_ALARM; + <<"false">> -> ?DEACTIVATED_ALARM end, - Response = emqx_mgmt_api:cluster_query(Qs, {Table, []}, {?MODULE, Function}), + Response = emqx_mgmt_api:cluster_query(Qs, Table, [], {?MODULE, query}), {200, Response}; alarms(delete, _Params) -> @@ -87,13 +87,8 @@ alarms(delete, _Params) -> %%%============================================================================================== %% internal -query_activated(_, Start, Limit) -> - query(?ACTIVATED_ALARM, Start, Limit). -query_deactivated(_, Start, Limit) -> - query(?DEACTIVATED_ALARM, Start, Limit). - -query(Table, Start, Limit) -> +query(Table, _QsSpec, Start, Limit) -> Ms = [{'$1',[],['$1']}], emqx_mgmt_api:select_table(Table, Ms, Start, Limit, fun format_alarm/1). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index a4e307114..dd4df58a5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -35,7 +35,7 @@ , unsubscribe/2 , subscribe_batch/2]). --export([ query/3 +-export([ query/4 , format_channel_info/1]). %% for batch operation @@ -420,14 +420,17 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) -> %% api apply list(Params) -> + {Tab, QuerySchema} = ?CLIENT_QS_SCHEMA, case maps:get(<<"node">>, Params, undefined) of undefined -> - Response = emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun), + Response = emqx_mgmt_api:cluster_query(Params, Tab, + QuerySchema, ?query_fun), {200, Response}; Node1 -> Node = binary_to_atom(Node1, utf8), ParamsWithoutNode = maps:without([<<"node">>], Params), - Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode, ?CLIENT_QS_SCHEMA, ?query_fun), + Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode, + Tab, QuerySchema, ?query_fun), {200, Response} end. @@ -492,8 +495,123 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) -> ArgList = [[ClientID, Topic, Qos]|| #{topic := Topic, qos := Qos} <- Topics], emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). -%%%============================================================================================== +%%-------------------------------------------------------------------- %% internal function + +do_subscribe(ClientID, Topic0, Qos) -> + {Topic, Opts} = emqx_topic:parse(Topic0), + TopicTable = [{Topic, Opts#{qos => Qos}}], + case emqx_mgmt:subscribe(ClientID, TopicTable) of + {error, Reason} -> + {error, Reason}; + {subscribe, Subscriptions} -> + case proplists:is_defined(Topic, Subscriptions) of + true -> + ok; + false -> + {error, unknow_error} + end + end. + +do_unsubscribe(ClientID, Topic) -> + case emqx_mgmt:unsubscribe(ClientID, Topic) of + {error, Reason} -> + {error, Reason}; + Res -> + Res + end. +%%-------------------------------------------------------------------- +%% Query Functions + +query(Tab, {Qs, []}, Start, Limit) -> + Ms = qs2ms(Qs), + emqx_mgmt_api:select_table(Tab, Ms, Start, Limit, + fun format_channel_info/1); + +query(Tab, {Qs, Fuzzy}, Start, Limit) -> + Ms = qs2ms(Qs), + MatchFun = match_fun(Ms, Fuzzy), + emqx_mgmt_api:traverse_table(Tab, MatchFun, Start, Limit, + fun format_channel_info/1). + +%%-------------------------------------------------------------------- +%% QueryString to Match Spec + +-spec qs2ms(list()) -> ets:match_spec(). +qs2ms(Qs) -> + {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), + [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. + +qs2ms([], _, {MtchHead, Conds}) -> + {MtchHead, lists:reverse(Conds)}; + +qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) -> + NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)), + qs2ms(Rest, N, {NMtchHead, Conds}); +qs2ms([Qs | Rest], N, {MtchHead, Conds}) -> + Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), + NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), + NConds = put_conds(Qs, Holder, Conds), + qs2ms(Rest, N+1, {NMtchHead, NConds}). + +put_conds({_, Op, V}, Holder, Conds) -> + [{Op, Holder, V} | Conds]; +put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) -> + [{Op2, Holder, V2}, + {Op1, Holder, V1} | Conds]. + +ms(clientid, X) -> + #{clientinfo => #{clientid => X}}; +ms(username, X) -> + #{clientinfo => #{username => X}}; +ms(zone, X) -> + #{clientinfo => #{zone => X}}; +ms(ip_address, X) -> + #{clientinfo => #{peerhost => X}}; +ms(conn_state, X) -> + #{conn_state => X}; +ms(clean_start, X) -> + #{conninfo => #{clean_start => X}}; +ms(proto_name, X) -> + #{conninfo => #{proto_name => X}}; +ms(proto_ver, X) -> + #{conninfo => #{proto_ver => X}}; +ms(connected_at, X) -> + #{conninfo => #{connected_at => X}}; +ms(created_at, X) -> + #{session => #{created_at => X}}. + +%%-------------------------------------------------------------------- +%% Match funcs + +match_fun(Ms, Fuzzy) -> + MsC = ets:match_spec_compile(Ms), + REFuzzy = lists:map(fun({K, like, S}) -> + {ok, RE} = re:compile(S), + {K, like, RE} + end, Fuzzy), + fun(Rows) -> + case ets:match_spec_run(Rows, MsC) of + [] -> []; + Ls -> + lists:filter(fun(E) -> + run_fuzzy_match(E, REFuzzy) + end, Ls) + end + end. + +run_fuzzy_match(_, []) -> + true; +run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) -> + Val = case maps:get(Key, ClientInfo, "") of + undefined -> ""; + V -> V + end, + re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy). + +%%-------------------------------------------------------------------- +%% format funcs + format_channel_info({_, ClientInfo, ClientStats}) -> Fun = fun @@ -546,116 +664,8 @@ peer_to_binary(Addr) -> list_to_binary(inet:ntoa(Addr)). format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) -> - #{ - access => PubSub, - topic => Topic, - result => AuthzResult, - updated_time => Timestamp - }. - -do_subscribe(ClientID, Topic0, Qos) -> - {Topic, Opts} = emqx_topic:parse(Topic0), - TopicTable = [{Topic, Opts#{qos => Qos}}], - case emqx_mgmt:subscribe(ClientID, TopicTable) of - {error, Reason} -> - {error, Reason}; - {subscribe, Subscriptions} -> - case proplists:is_defined(Topic, Subscriptions) of - true -> - ok; - false -> - {error, unknow_error} - end - end. - -do_unsubscribe(ClientID, Topic) -> - case emqx_mgmt:unsubscribe(ClientID, Topic) of - {error, Reason} -> - {error, Reason}; - Res -> - Res - end. -%%%============================================================================================== -%% Query Functions - -query({Qs, []}, Start, Limit) -> - Ms = qs2ms(Qs), - emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1); - -query({Qs, Fuzzy}, Start, Limit) -> - Ms = qs2ms(Qs), - MatchFun = match_fun(Ms, Fuzzy), - emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1). - -%%%============================================================================================== -%% QueryString to Match Spec --spec qs2ms(list()) -> ets:match_spec(). -qs2ms(Qs) -> - {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), - [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. - -qs2ms([], _, {MtchHead, Conds}) -> - {MtchHead, lists:reverse(Conds)}; - -qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) -> - NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)), - qs2ms(Rest, N, {NMtchHead, Conds}); -qs2ms([Qs | Rest], N, {MtchHead, Conds}) -> - Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), - NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), - NConds = put_conds(Qs, Holder, Conds), - qs2ms(Rest, N+1, {NMtchHead, NConds}). - -put_conds({_, Op, V}, Holder, Conds) -> - [{Op, Holder, V} | Conds]; -put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) -> - [{Op2, Holder, V2}, - {Op1, Holder, V1} | Conds]. - -ms(clientid, X) -> - #{clientinfo => #{clientid => X}}; -ms(username, X) -> - #{clientinfo => #{username => X}}; -ms(zone, X) -> - #{clientinfo => #{zone => X}}; -ms(ip_address, X) -> - #{clientinfo => #{peerhost => X}}; -ms(conn_state, X) -> - #{conn_state => X}; -ms(clean_start, X) -> - #{conninfo => #{clean_start => X}}; -ms(proto_name, X) -> - #{conninfo => #{proto_name => X}}; -ms(proto_ver, X) -> - #{conninfo => #{proto_ver => X}}; -ms(connected_at, X) -> - #{conninfo => #{connected_at => X}}; -ms(created_at, X) -> - #{session => #{created_at => X}}. - -%%%============================================================================================== -%% Match funcs -match_fun(Ms, Fuzzy) -> - MsC = ets:match_spec_compile(Ms), - REFuzzy = lists:map(fun({K, like, S}) -> - {ok, RE} = re:compile(S), - {K, like, RE} - end, Fuzzy), - fun(Rows) -> - case ets:match_spec_run(Rows, MsC) of - [] -> []; - Ls -> - lists:filter(fun(E) -> - run_fuzzy_match(E, REFuzzy) - end, Ls) - end - end. - -run_fuzzy_match(_, []) -> - true; -run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) -> - Val = case maps:get(Key, ClientInfo, "") of - undefined -> ""; - V -> V - end, - re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy). + #{ access => PubSub, + topic => Topic, + result => AuthzResult, + updated_time => Timestamp + }. diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 058d824ac..5c2475e95 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -29,7 +29,7 @@ -export([subscriptions/2]). --export([ query/3 +-export([ query/4 , format/1 ]). @@ -111,11 +111,14 @@ subscriptions(get, #{query_string := Params}) -> list(Params). list(Params) -> + {Tab, QuerySchema} = ?SUBS_QS_SCHEMA, case maps:get(<<"node">>, Params, undefined) of undefined -> - {200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}; + {200, emqx_mgmt_api:cluster_query(Params, Tab, + QuerySchema, ?query_fun)}; Node -> - {200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, ?SUBS_QS_SCHEMA, ?query_fun)} + {200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, + Tab, QuerySchema, ?query_fun)} end. format(Items) when is_list(Items) -> @@ -145,14 +148,14 @@ format({_Subscriber, Topic, Options}) -> %% Query Function %%-------------------------------------------------------------------- -query({Qs, []}, Start, Limit) -> +query(Tab, {Qs, []}, Start, Limit) -> Ms = qs2ms(Qs), - emqx_mgmt_api:select_table(emqx_suboption, Ms, Start, Limit, fun format/1); + emqx_mgmt_api:select_table(Tab, Ms, Start, Limit, fun format/1); -query({Qs, Fuzzy}, Start, Limit) -> +query(Tab, {Qs, Fuzzy}, Start, Limit) -> Ms = qs2ms(Qs), MatchFun = match_fun(Ms, Fuzzy), - emqx_mgmt_api:traverse_table(emqx_suboption, MatchFun, Start, Limit, fun format/1). + emqx_mgmt_api:traverse_table(Tab, MatchFun, Start, Limit, fun format/1). match_fun(Ms, Fuzzy) -> MsC = ets:match_spec_compile(Ms),