fix(mgmt): collect total number in node_query/cluster_query
This commit is contained in:
parent
1fe9c105aa
commit
28d391f26c
|
@ -147,7 +147,7 @@ node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) ->
|
||||||
{error, page_limit_invalid};
|
{error, page_limit_invalid};
|
||||||
Meta ->
|
Meta ->
|
||||||
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
||||||
ResultAcc = #{cursor => 0, count => 0, rows => []},
|
ResultAcc = init_query_result(),
|
||||||
QueryState = init_query_state(Meta),
|
QueryState = init_query_state(Meta),
|
||||||
NResultAcc = do_node_query(
|
NResultAcc = do_node_query(
|
||||||
Node, Tab, NQString, MsFun, QueryState, ResultAcc
|
Node, Tab, NQString, MsFun, QueryState, ResultAcc
|
||||||
|
@ -196,7 +196,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
|
||||||
Meta ->
|
Meta ->
|
||||||
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
ResultAcc = #{cursor => 0, count => 0, rows => []},
|
ResultAcc = init_query_result(),
|
||||||
QueryState = init_query_state(Meta),
|
QueryState = init_query_state(Meta),
|
||||||
NResultAcc = do_cluster_query(
|
NResultAcc = do_cluster_query(
|
||||||
Nodes, Tab, NQString, MsFun, QueryState, ResultAcc
|
Nodes, Tab, NQString, MsFun, QueryState, ResultAcc
|
||||||
|
@ -205,7 +205,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
do_cluster_query([], _Tab, _QString, _QueryFun, _QueryState, ResultAcc) ->
|
do_cluster_query([], _Tab, _QString, _MsFun, _QueryState, ResultAcc) ->
|
||||||
ResultAcc;
|
ResultAcc;
|
||||||
do_cluster_query(
|
do_cluster_query(
|
||||||
[Node | Tail] = Nodes,
|
[Node | Tail] = Nodes,
|
||||||
|
@ -221,7 +221,7 @@ do_cluster_query(
|
||||||
{Rows, NQueryState} ->
|
{Rows, NQueryState} ->
|
||||||
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
||||||
{enough, NResultAcc} ->
|
{enough, NResultAcc} ->
|
||||||
NResultAcc;
|
maybe_collect_total_from_tail_nodes(Tail, Tab, QString, MsFun, NResultAcc);
|
||||||
{more, NResultAcc} ->
|
{more, NResultAcc} ->
|
||||||
NextNodes =
|
NextNodes =
|
||||||
case NQueryState of
|
case NQueryState of
|
||||||
|
@ -232,6 +232,29 @@ do_cluster_query(
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_collect_total_from_tail_nodes([], _Tab, _QString, _MsFun, ResultAcc) ->
|
||||||
|
ResultAcc;
|
||||||
|
maybe_collect_total_from_tail_nodes(Nodes, Tab, QString, MsFun, ResultAcc = #{total := TotalAcc}) ->
|
||||||
|
{Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
||||||
|
case is_countable_total(Ms, FuzzyFun) of
|
||||||
|
true ->
|
||||||
|
%% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
|
||||||
|
case rpc:multicall(Nodes, ?MODULE, apply_total_query, [Tab, Ms, FuzzyFun]) of
|
||||||
|
{_, [Node | _]} ->
|
||||||
|
{error, Node, {badrpc, badnode}};
|
||||||
|
{ResL0, []} ->
|
||||||
|
ResL = lists:zip(Nodes, ResL0),
|
||||||
|
case lists:filter(fun({_, I}) -> not is_integer(I) end, ResL) of
|
||||||
|
[{Node, {badrpc, Reason}} | _] ->
|
||||||
|
{error, Node, {badrpc, Reason}};
|
||||||
|
[] ->
|
||||||
|
ResultAcc#{total => ResL ++ TotalAcc}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
ResultAcc
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Do Query (or rpc query)
|
%% Do Query (or rpc query)
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -240,7 +263,7 @@ do_cluster_query(
|
||||||
%% #{continuation := ets:continuation(),
|
%% #{continuation := ets:continuation(),
|
||||||
%% page := pos_integer(),
|
%% page := pos_integer(),
|
||||||
%% limit := pos_integer(),
|
%% limit := pos_integer(),
|
||||||
%% total := #{node() := non_neg_integer()}
|
%% total := [{node(), non_neg_integer()}]
|
||||||
%% }
|
%% }
|
||||||
init_query_state(_Meta = #{page := Page, limit := Limit}) ->
|
init_query_state(_Meta = #{page := Page, limit := Limit}) ->
|
||||||
#{
|
#{
|
||||||
|
@ -253,7 +276,7 @@ init_query_state(_Meta = #{page := Page, limit := Limit}) ->
|
||||||
%% @private This function is exempt from BPAPI
|
%% @private This function is exempt from BPAPI
|
||||||
do_query(Node, Tab, QString, MsFun, QueryState) when Node =:= node(), is_function(MsFun) ->
|
do_query(Node, Tab, QString, MsFun, QueryState) when Node =:= node(), is_function(MsFun) ->
|
||||||
{Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
{Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
||||||
do_select(Tab, Ms, FuzzyFun, QueryState);
|
do_select(Node, Tab, Ms, FuzzyFun, QueryState);
|
||||||
do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) ->
|
do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) ->
|
||||||
case
|
case
|
||||||
rpc:call(
|
rpc:call(
|
||||||
|
@ -269,11 +292,13 @@ do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_select(
|
do_select(
|
||||||
|
Node,
|
||||||
Tab,
|
Tab,
|
||||||
Ms,
|
Ms,
|
||||||
FuzzyFun,
|
FuzzyFun,
|
||||||
QueryState = #{continuation := Continuation, limit := Limit}
|
QueryState0 = #{continuation := Continuation, limit := Limit}
|
||||||
) ->
|
) ->
|
||||||
|
QueryState = maybe_apply_total_query(Node, Tab, Ms, FuzzyFun, QueryState0),
|
||||||
Result =
|
Result =
|
||||||
case Continuation of
|
case Continuation of
|
||||||
?FRESH_SELECT ->
|
?FRESH_SELECT ->
|
||||||
|
@ -293,14 +318,48 @@ do_select(
|
||||||
{NRows, QueryState#{continuation => NContinuation}}
|
{NRows, QueryState#{continuation => NContinuation}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_apply_total_query(Node, Tab, Ms, FuzzyFun, QueryState = #{total := TotalAcc}) ->
|
||||||
|
case proplists:get_value(Node, TotalAcc, undefined) of
|
||||||
|
undefined ->
|
||||||
|
Total = apply_total_query(Tab, Ms, FuzzyFun),
|
||||||
|
QueryState#{total := [{Node, Total} | TotalAcc]};
|
||||||
|
_ ->
|
||||||
|
QueryState
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% XXX: Calculating the total number of data that match a certain condition under a large table
|
||||||
|
%% is very expensive because the entire ETS table needs to be scanned.
|
||||||
|
apply_total_query(Tab, Ms, FuzzyFun) ->
|
||||||
|
case is_countable_total(Ms, FuzzyFun) of
|
||||||
|
true ->
|
||||||
|
ets:info(Tab, size);
|
||||||
|
false ->
|
||||||
|
%% return a fake total number if the query have any conditions
|
||||||
|
0
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_countable_total(Ms, FuzzyFun) ->
|
||||||
|
FuzzyFun =:= undefined andalso is_non_conditions_match_spec(Ms).
|
||||||
|
|
||||||
|
is_non_conditions_match_spec([{_MatchHead, _Conds = [], _Return} | More]) ->
|
||||||
|
is_non_conditions_match_spec(More);
|
||||||
|
is_non_conditions_match_spec([{_MatchHead, Conds, _Return} | _More]) when length(Conds) =/= 0 ->
|
||||||
|
false;
|
||||||
|
is_non_conditions_match_spec([]) ->
|
||||||
|
true.
|
||||||
|
|
||||||
%% ResultAcc :: #{count := integer(),
|
%% ResultAcc :: #{count := integer(),
|
||||||
%% cursor := integer(),
|
%% cursor := integer(),
|
||||||
%% rows := [{node(), Rows :: list()}]
|
%% rows := [{node(), Rows :: list()}],
|
||||||
|
%% total := [{node() => integer()}]
|
||||||
%% }
|
%% }
|
||||||
|
init_query_result() ->
|
||||||
|
#{cursor => 0, count => 0, rows => [], total => []}.
|
||||||
|
|
||||||
accumulate_query_rows(
|
accumulate_query_rows(
|
||||||
Node,
|
Node,
|
||||||
Rows,
|
Rows,
|
||||||
_QueryState = #{page := Page, limit := Limit},
|
_QueryState = #{page := Page, limit := Limit, total := TotalAcc},
|
||||||
ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
|
ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
|
||||||
) ->
|
) ->
|
||||||
PageStart = (Page - 1) * Limit + 1,
|
PageStart = (Page - 1) * Limit + 1,
|
||||||
|
@ -308,11 +367,12 @@ accumulate_query_rows(
|
||||||
Len = length(Rows),
|
Len = length(Rows),
|
||||||
case Cursor + Len of
|
case Cursor + Len of
|
||||||
NCursor when NCursor < PageStart ->
|
NCursor when NCursor < PageStart ->
|
||||||
{more, ResultAcc#{cursor => NCursor}};
|
{more, ResultAcc#{cursor => NCursor, total => TotalAcc}};
|
||||||
NCursor when NCursor < PageEnd ->
|
NCursor when NCursor < PageEnd ->
|
||||||
{more, ResultAcc#{
|
{more, ResultAcc#{
|
||||||
cursor => NCursor,
|
cursor => NCursor,
|
||||||
count => Count + length(Rows),
|
count => Count + length(Rows),
|
||||||
|
total => TotalAcc,
|
||||||
rows => [{Node, Rows} | RowsAcc]
|
rows => [{Node, Rows} | RowsAcc]
|
||||||
}};
|
}};
|
||||||
NCursor when NCursor >= PageEnd ->
|
NCursor when NCursor >= PageEnd ->
|
||||||
|
@ -320,6 +380,7 @@ accumulate_query_rows(
|
||||||
{enough, ResultAcc#{
|
{enough, ResultAcc#{
|
||||||
cursor => NCursor,
|
cursor => NCursor,
|
||||||
count => Count + length(SubRows),
|
count => Count + length(SubRows),
|
||||||
|
total => TotalAcc,
|
||||||
rows => [{Node, SubRows} | RowsAcc]
|
rows => [{Node, SubRows} | RowsAcc]
|
||||||
}}
|
}}
|
||||||
end.
|
end.
|
||||||
|
@ -426,10 +487,13 @@ is_fuzzy_key(_) ->
|
||||||
format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) ->
|
format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) ->
|
||||||
Error;
|
Error;
|
||||||
format_query_result(
|
format_query_result(
|
||||||
FmtFun, Meta, _ResultAcc = #{count := _Count, cursor := Cursor, rows := RowsAcc}
|
FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc}
|
||||||
) ->
|
) ->
|
||||||
|
Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc),
|
||||||
#{
|
#{
|
||||||
meta => Meta#{count => Cursor},
|
%% The `count` is used in HTTP API to indicate the total number of
|
||||||
|
%% queries that can be read
|
||||||
|
meta => Meta#{count => Total},
|
||||||
data => lists:flatten(
|
data => lists:flatten(
|
||||||
lists:foldr(
|
lists:foldr(
|
||||||
fun({Node, Rows}, Acc) ->
|
fun({Node, Rows}, Acc) ->
|
||||||
|
@ -500,6 +564,11 @@ to_ip_port(IPAddress) ->
|
||||||
Port = list_to_integer(Port0),
|
Port = list_to_integer(Port0),
|
||||||
{IP, Port}.
|
{IP, Port}.
|
||||||
|
|
||||||
|
b2i(Bin) when is_binary(Bin) ->
|
||||||
|
binary_to_integer(Bin);
|
||||||
|
b2i(Any) ->
|
||||||
|
Any.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% EUnits
|
%% EUnits
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -544,8 +613,3 @@ params2qs_test() ->
|
||||||
{0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
|
{0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
b2i(Bin) when is_binary(Bin) ->
|
|
||||||
binary_to_integer(Bin);
|
|
||||||
b2i(Any) ->
|
|
||||||
Any.
|
|
||||||
|
|
Loading…
Reference in New Issue