From 28d391f26c893f97742fc1140cab6291b70abd3d Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 16 Nov 2022 17:16:17 +0800 Subject: [PATCH] fix(mgmt): collect total number in node_query/cluster_query --- apps/emqx_management/src/emqx_mgmt_api.erl | 98 ++++++++++++++++++---- 1 file changed, 81 insertions(+), 17 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 5fd16e2ff..da5a75c62 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -147,7 +147,7 @@ node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) -> {error, page_limit_invalid}; Meta -> {_CodCnt, NQString} = parse_qstring(QString, QSchema), - ResultAcc = #{cursor => 0, count => 0, rows => []}, + ResultAcc = init_query_result(), QueryState = init_query_state(Meta), NResultAcc = do_node_query( Node, Tab, NQString, MsFun, QueryState, ResultAcc @@ -196,7 +196,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) -> Meta -> {_CodCnt, NQString} = parse_qstring(QString, QSchema), Nodes = mria_mnesia:running_nodes(), - ResultAcc = #{cursor => 0, count => 0, rows => []}, + ResultAcc = init_query_result(), QueryState = init_query_state(Meta), NResultAcc = do_cluster_query( Nodes, Tab, NQString, MsFun, QueryState, ResultAcc @@ -205,7 +205,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) -> end. %% @private -do_cluster_query([], _Tab, _QString, _QueryFun, _QueryState, ResultAcc) -> +do_cluster_query([], _Tab, _QString, _MsFun, _QueryState, ResultAcc) -> ResultAcc; do_cluster_query( [Node | Tail] = Nodes, @@ -221,7 +221,7 @@ do_cluster_query( {Rows, NQueryState} -> case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of {enough, NResultAcc} -> - NResultAcc; + maybe_collect_total_from_tail_nodes(Tail, Tab, QString, MsFun, NResultAcc); {more, NResultAcc} -> NextNodes = case NQueryState of @@ -232,6 +232,29 @@ do_cluster_query( end end. +maybe_collect_total_from_tail_nodes([], _Tab, _QString, _MsFun, ResultAcc) -> + ResultAcc; +maybe_collect_total_from_tail_nodes(Nodes, Tab, QString, MsFun, ResultAcc = #{total := TotalAcc}) -> + {Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]), + case is_countable_total(Ms, FuzzyFun) of + true -> + %% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node + case rpc:multicall(Nodes, ?MODULE, apply_total_query, [Tab, Ms, FuzzyFun]) of + {_, [Node | _]} -> + {error, Node, {badrpc, badnode}}; + {ResL0, []} -> + ResL = lists:zip(Nodes, ResL0), + case lists:filter(fun({_, I}) -> not is_integer(I) end, ResL) of + [{Node, {badrpc, Reason}} | _] -> + {error, Node, {badrpc, Reason}}; + [] -> + ResultAcc#{total => ResL ++ TotalAcc} + end + end; + false -> + ResultAcc + end. + %%-------------------------------------------------------------------- %% Do Query (or rpc query) %%-------------------------------------------------------------------- @@ -240,7 +263,7 @@ do_cluster_query( %% #{continuation := ets:continuation(), %% page := pos_integer(), %% limit := pos_integer(), -%% total := #{node() := non_neg_integer()} +%% total := [{node(), non_neg_integer()}] %% } init_query_state(_Meta = #{page := Page, limit := Limit}) -> #{ @@ -253,7 +276,7 @@ init_query_state(_Meta = #{page := Page, limit := Limit}) -> %% @private This function is exempt from BPAPI do_query(Node, Tab, QString, MsFun, QueryState) when Node =:= node(), is_function(MsFun) -> {Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]), - do_select(Tab, Ms, FuzzyFun, QueryState); + do_select(Node, Tab, Ms, FuzzyFun, QueryState); do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) -> case rpc:call( @@ -269,11 +292,13 @@ do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) -> end. do_select( + Node, Tab, Ms, FuzzyFun, - QueryState = #{continuation := Continuation, limit := Limit} + QueryState0 = #{continuation := Continuation, limit := Limit} ) -> + QueryState = maybe_apply_total_query(Node, Tab, Ms, FuzzyFun, QueryState0), Result = case Continuation of ?FRESH_SELECT -> @@ -293,14 +318,48 @@ do_select( {NRows, QueryState#{continuation => NContinuation}} end. +maybe_apply_total_query(Node, Tab, Ms, FuzzyFun, QueryState = #{total := TotalAcc}) -> + case proplists:get_value(Node, TotalAcc, undefined) of + undefined -> + Total = apply_total_query(Tab, Ms, FuzzyFun), + QueryState#{total := [{Node, Total} | TotalAcc]}; + _ -> + QueryState + end. + +%% XXX: Calculating the total number of data that match a certain condition under a large table +%% is very expensive because the entire ETS table needs to be scanned. +apply_total_query(Tab, Ms, FuzzyFun) -> + case is_countable_total(Ms, FuzzyFun) of + true -> + ets:info(Tab, size); + false -> + %% return a fake total number if the query have any conditions + 0 + end. + +is_countable_total(Ms, FuzzyFun) -> + FuzzyFun =:= undefined andalso is_non_conditions_match_spec(Ms). + +is_non_conditions_match_spec([{_MatchHead, _Conds = [], _Return} | More]) -> + is_non_conditions_match_spec(More); +is_non_conditions_match_spec([{_MatchHead, Conds, _Return} | _More]) when length(Conds) =/= 0 -> + false; +is_non_conditions_match_spec([]) -> + true. + %% ResultAcc :: #{count := integer(), %% cursor := integer(), -%% rows := [{node(), Rows :: list()}] +%% rows := [{node(), Rows :: list()}], +%% total := [{node() => integer()}] %% } +init_query_result() -> + #{cursor => 0, count => 0, rows => [], total => []}. + accumulate_query_rows( Node, Rows, - _QueryState = #{page := Page, limit := Limit}, + _QueryState = #{page := Page, limit := Limit, total := TotalAcc}, ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc} ) -> PageStart = (Page - 1) * Limit + 1, @@ -308,11 +367,12 @@ accumulate_query_rows( Len = length(Rows), case Cursor + Len of NCursor when NCursor < PageStart -> - {more, ResultAcc#{cursor => NCursor}}; + {more, ResultAcc#{cursor => NCursor, total => TotalAcc}}; NCursor when NCursor < PageEnd -> {more, ResultAcc#{ cursor => NCursor, count => Count + length(Rows), + total => TotalAcc, rows => [{Node, Rows} | RowsAcc] }}; NCursor when NCursor >= PageEnd -> @@ -320,6 +380,7 @@ accumulate_query_rows( {enough, ResultAcc#{ cursor => NCursor, count => Count + length(SubRows), + total => TotalAcc, rows => [{Node, SubRows} | RowsAcc] }} end. @@ -426,10 +487,13 @@ is_fuzzy_key(_) -> format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) -> Error; format_query_result( - FmtFun, Meta, _ResultAcc = #{count := _Count, cursor := Cursor, rows := RowsAcc} + FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc} ) -> + Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc), #{ - meta => Meta#{count => Cursor}, + %% The `count` is used in HTTP API to indicate the total number of + %% queries that can be read + meta => Meta#{count => Total}, data => lists:flatten( lists:foldr( fun({Node, Rows}, Acc) -> @@ -500,6 +564,11 @@ to_ip_port(IPAddress) -> Port = list_to_integer(Port0), {IP, Port}. +b2i(Bin) when is_binary(Bin) -> + binary_to_integer(Bin); +b2i(Any) -> + Any. + %%-------------------------------------------------------------------- %% EUnits %%-------------------------------------------------------------------- @@ -544,8 +613,3 @@ params2qs_test() -> {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema). -endif. - -b2i(Bin) when is_binary(Bin) -> - binary_to_integer(Bin); -b2i(Any) -> - Any.