fix(api): augment paged search responses with `hasnext` flag

This flag indicates whether there are more results available on the
next pages. It is needed in cases when the total number of search
results is not known in advance.

Also, in such cases there's no `count` field in responses anymore
because responding with `0` was confusing for clients: it's not possible
to differentiate between "there are no results" and "we don't know how
much".

Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
This commit is contained in:
Andrew Mayorov 2023-01-11 16:53:47 +03:00
parent fde8752452
commit e07aa2086b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 180 additions and 109 deletions

View File

@ -139,14 +139,20 @@ fields(limit) ->
[{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}];
fields(count) ->
Desc = <<
"Total number of records counted.<br/>"
"Note: this field is <code>0</code> when the queryed table is empty, "
"or if the query can not be optimized and requires a full table scan."
"Total number of records matching the query.<br/>"
"Note: this field is present only if the query can be optimized and does "
"not require a full table scan."
>>,
Meta = #{desc => Desc, required => false},
[{count, hoconsc:mk(non_neg_integer(), Meta)}];
fields(hasnext) ->
Desc = <<
"Flag indicating whether there are more results available on next pages."
>>,
Meta = #{desc => Desc, required => true},
[{count, hoconsc:mk(non_neg_integer(), Meta)}];
[{hasnext, hoconsc:mk(boolean(), Meta)}];
fields(meta) ->
fields(page) ++ fields(limit) ++ fields(count).
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext).
-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema_map().
schema_with_example(Type, Example) ->

View File

@ -20,7 +20,6 @@
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]).
-define(FRESH_SELECT, fresh_select).
-define(LONG_QUERY_TIMEOUT, 50000).
-export([
@ -174,13 +173,12 @@ do_node_query(
case do_query(Node, QueryState) of
{error, {badrpc, R}} ->
{error, Node, {badrpc, R}};
{Rows, NQueryState = #{continuation := ?FRESH_SELECT}} ->
{_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc),
NResultAcc;
{Rows, NQueryState} ->
{Rows, NQueryState = #{complete := Complete}} ->
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} ->
NResultAcc;
finalize_query(NResultAcc, NQueryState);
{_, NResultAcc} when Complete ->
finalize_query(NResultAcc, NQueryState);
{more, NResultAcc} ->
do_node_query(Node, NQueryState, NResultAcc)
end
@ -212,8 +210,6 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
end.
%% @private
do_cluster_query([], _QueryState, ResultAcc) ->
ResultAcc;
do_cluster_query(
[Node | Tail] = Nodes,
QueryState,
@ -222,31 +218,29 @@ do_cluster_query(
case do_query(Node, QueryState) of
{error, {badrpc, R}} ->
{error, Node, {badrpc, R}};
{Rows, NQueryState} ->
{Rows, NQueryState = #{complete := Complete}} ->
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} ->
maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc);
FQueryState = maybe_collect_total_from_tail_nodes(Tail, NQueryState),
FComplete = Complete andalso Tail =:= [],
finalize_query(NResultAcc, mark_complete(FQueryState, FComplete));
{more, NResultAcc} when not Complete ->
do_cluster_query(Nodes, NQueryState, NResultAcc);
{more, NResultAcc} when Tail =/= [] ->
do_cluster_query(Tail, reset_query_state(NQueryState), NResultAcc);
{more, NResultAcc} ->
NextNodes =
case NQueryState of
#{continuation := ?FRESH_SELECT} -> Tail;
_ -> Nodes
end,
do_cluster_query(NextNodes, NQueryState, NResultAcc)
finalize_query(NResultAcc, NQueryState)
end
end.
maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) ->
ResultAcc;
maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) ->
case counting_total_fun(QueryState) of
false ->
ResultAcc;
_Fun ->
collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc)
end.
maybe_collect_total_from_tail_nodes([], QueryState) ->
QueryState;
maybe_collect_total_from_tail_nodes(Nodes, QueryState = #{total := _}) ->
collect_total_from_tail_nodes(Nodes, QueryState);
maybe_collect_total_from_tail_nodes(_Nodes, QueryState) ->
QueryState.
collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) ->
collect_total_from_tail_nodes(Nodes, QueryState = #{total := TotalAcc}) ->
%% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of
{_, [Node | _]} ->
@ -257,7 +251,8 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc
[{Node, {badrpc, Reason}} | _] ->
{error, Node, {badrpc, Reason}};
[] ->
ResultAcc#{total => ResL ++ TotalAcc}
NTotalAcc = maps:merge(TotalAcc, maps:from_list(ResL)),
QueryState#{total := NTotalAcc}
end
end.
@ -266,13 +261,14 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc
%%--------------------------------------------------------------------
%% QueryState ::
%% #{continuation := ets:continuation(),
%% #{continuation => ets:continuation(),
%% page := pos_integer(),
%% limit := pos_integer(),
%% total := [{node(), non_neg_integer()}],
%% total => #{node() => non_neg_integer()},
%% table := atom(),
%% qs := {Qs, Fuzzy} %% parsed query params
%% msfun := query_to_match_spec_fun()
%% qs := {Qs, Fuzzy}, %% parsed query params
%% msfun := query_to_match_spec_fun(),
%% complete := boolean()
%% }
init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) ->
#{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
@ -285,17 +281,31 @@ init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) -
true = is_list(Args),
{type, external} = erlang:fun_info(NamedFun, type)
end,
#{
QueryState = #{
page => Page,
limit => Limit,
table => Tab,
qs => QString,
msfun => MsFun,
mactch_spec => Ms,
match_spec => Ms,
fuzzy_fun => FuzzyFun,
total => [],
continuation => ?FRESH_SELECT
}.
complete => false
},
case counting_total_fun(QueryState) of
false ->
QueryState;
Fun when is_function(Fun) ->
QueryState#{total => #{}}
end.
reset_query_state(QueryState) ->
maps:remove(continuation, mark_complete(QueryState, false)).
mark_complete(QueryState) ->
mark_complete(QueryState, true).
mark_complete(QueryState, Complete) ->
QueryState#{complete => Complete}.
%% @private This function is exempt from BPAPI
do_query(Node, QueryState) when Node =:= node() ->
@ -318,47 +328,50 @@ do_select(
Node,
QueryState0 = #{
table := Tab,
mactch_spec := Ms,
fuzzy_fun := FuzzyFun,
continuation := Continuation,
limit := Limit
match_spec := Ms,
limit := Limit,
complete := false
}
) ->
QueryState = maybe_apply_total_query(Node, QueryState0),
Result =
case Continuation of
?FRESH_SELECT ->
case maps:get(continuation, QueryState, undefined) of
undefined ->
ets:select(Tab, Ms, Limit);
_ ->
Continuation ->
%% XXX: Repair is necessary because we pass Continuation back
%% and forth through the nodes in the `do_cluster_query`
ets:select(ets:repair_continuation(Continuation, Ms))
end,
case Result of
'$end_of_table' ->
{[], QueryState#{continuation => ?FRESH_SELECT}};
{Rows, '$end_of_table'} ->
NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
{NRows, mark_complete(QueryState)};
{Rows, NContinuation} ->
NRows =
case FuzzyFun of
undefined ->
Rows;
{FilterFun, Args0} when is_function(FilterFun), is_list(Args0) ->
lists:filter(
fun(E) -> erlang:apply(FilterFun, [E | Args0]) end,
Rows
)
end,
{NRows, QueryState#{continuation => NContinuation}}
NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
{NRows, QueryState#{continuation => NContinuation}};
'$end_of_table' ->
{[], mark_complete(QueryState)}
end.
maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) ->
case proplists:get_value(Node, TotalAcc, undefined) of
undefined ->
Total = apply_total_query(QueryState),
QueryState#{total := [{Node, Total} | TotalAcc]};
_ ->
QueryState
end.
maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := undefined}) ->
Rows;
maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := {FilterFun, Args}}) ->
lists:filter(
fun(E) -> erlang:apply(FilterFun, [E | Args]) end,
Rows
).
maybe_apply_total_query(Node, QueryState = #{total := Acc}) ->
case Acc of
#{Node := _} ->
QueryState;
#{} ->
NodeTotal = apply_total_query(QueryState),
QueryState#{total := Acc#{Node => NodeTotal}}
end;
maybe_apply_total_query(_Node, QueryState = #{}) ->
QueryState.
apply_total_query(QueryState = #{table := Tab}) ->
case counting_total_fun(QueryState) of
@ -371,7 +384,7 @@ apply_total_query(QueryState = #{table := Tab}) ->
counting_total_fun(_QueryState = #{qs := {[], []}}) ->
fun(Tab) -> ets:info(Tab, size) end;
counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) ->
counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
%% XXX: Calculating the total number of data that match a certain
%% condition under a large table is very expensive because the
%% entire ETS table needs to be scanned.
@ -390,15 +403,16 @@ counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= und
%% ResultAcc :: #{count := integer(),
%% cursor := integer(),
%% rows := [{node(), Rows :: list()}],
%% total := [{node() => integer()}]
%% partial := boolean(),
%% hasnext => boolean()
%% }
init_query_result() ->
#{cursor => 0, count => 0, rows => [], total => []}.
#{cursor => 0, count => 0, rows => [], partial => false}.
accumulate_query_rows(
Node,
Rows,
_QueryState = #{page := Page, limit := Limit, total := TotalAcc},
_QueryState = #{page := Page, limit := Limit},
ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
) ->
PageStart = (Page - 1) * Limit + 1,
@ -406,12 +420,11 @@ accumulate_query_rows(
Len = length(Rows),
case Cursor + Len of
NCursor when NCursor < PageStart ->
{more, ResultAcc#{cursor => NCursor, total => TotalAcc}};
{more, ResultAcc#{cursor => NCursor}};
NCursor when NCursor < PageEnd ->
{more, ResultAcc#{
cursor => NCursor,
count => Count + length(Rows),
total => TotalAcc,
rows => [{Node, Rows} | RowsAcc]
}};
NCursor when NCursor >= PageEnd ->
@ -419,11 +432,21 @@ accumulate_query_rows(
{enough, ResultAcc#{
cursor => NCursor,
count => Count + length(SubRows),
total => TotalAcc,
rows => [{Node, SubRows} | RowsAcc]
rows => [{Node, SubRows} | RowsAcc],
partial => (Limit - Count) < Len
}}
end.
finalize_query(Result = #{partial := Partial}, QueryState = #{complete := Complete}) ->
HasNext = Partial orelse not Complete,
maybe_accumulate_totals(Result#{hasnext => HasNext}, QueryState).
maybe_accumulate_totals(Result, #{total := TotalAcc}) ->
QueryTotal = maps:fold(fun(_Node, T, N) -> N + T end, 0, TotalAcc),
Result#{total => QueryTotal};
maybe_accumulate_totals(Result, _QueryState) ->
Result.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
@ -520,16 +543,22 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
is_fuzzy_key(_) ->
false.
format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) ->
format_query_result(_FmtFun, _MetaIn, Error = {error, _Node, _Reason}) ->
Error;
format_query_result(
FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc}
FmtFun, MetaIn, ResultAcc = #{hasnext := HasNext, rows := RowsAcc}
) ->
Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc),
Meta =
case ResultAcc of
#{total := QueryTotal} ->
%% The `count` is used in HTTP API to indicate the total number of
%% queries that can be read
MetaIn#{hasnext => HasNext, count => QueryTotal};
#{} ->
MetaIn#{hasnext => HasNext}
end,
#{
%% The `count` is used in HTTP API to indicate the total number of
%% queries that can be read
meta => Meta#{count => Total},
meta => Meta,
data => lists:flatten(
lists:foldl(
fun({Node, Rows}, Acc) ->
@ -552,7 +581,7 @@ parse_pager_params(Params) ->
Limit = b2i(limit(Params)),
case Page > 0 andalso Limit > 0 of
true ->
#{page => Page, limit => Limit, count => 0};
#{page => Page, limit => Limit};
false ->
false
end.

View File

@ -88,10 +88,9 @@ t_cluster_query(_Config) ->
%% fuzzy searching can't return total
{200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}),
?assertMatch(
#{count := 0},
maps:get(meta, ClientsNode2)
),
MetaNode2 = maps:get(meta, ClientsNode2),
?assertNotMatch(#{count := _}, MetaNode2),
?assertMatch(#{hasnext := false}, MetaNode2),
?assertMatch(10, length(maps:get(data, ClientsNode2))),
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),

View File

@ -44,9 +44,8 @@ init_per_suite(Config) ->
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite().
t_subscription_api(_) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}),
{ok, _} = emqtt:connect(Client),
t_subscription_api(Config) ->
Client = proplists:get_value(client, Config),
{ok, _, _} = emqtt:subscribe(
Client, [
{?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]}
@ -84,40 +83,78 @@ t_subscription_api(_) ->
?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2),
?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID),
QS = uri_string:compose_query([
QS = [
{"clientid", ?CLIENTID},
{"topic", ?TOPIC2_TOPIC_ONLY},
{"node", atom_to_list(node())},
{"qos", "0"},
{"share_group", "test_group"},
{"match_topic", "t/#"}
]),
],
Headers = emqx_mgmt_api_test_util:auth_header_(),
{ok, ResponseTopic2} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
DataTopic2 = emqx_json:decode(ResponseTopic2, [return_maps]),
Meta2 = maps:get(<<"meta">>, DataTopic2),
DataTopic2 = #{<<"meta">> := Meta2} = request_json(get, QS, Headers),
?assertEqual(1, maps:get(<<"page">>, Meta2)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta2)),
?assertEqual(1, maps:get(<<"count">>, Meta2)),
SubscriptionsList2 = maps:get(<<"data">>, DataTopic2),
?assertEqual(length(SubscriptionsList2), 1),
?assertEqual(length(SubscriptionsList2), 1).
MatchQs = uri_string:compose_query([
t_subscription_fuzzy_search(Config) ->
Client = proplists:get_value(client, Config),
Topics = [
<<"t/foo">>,
<<"t/foo/bar">>,
<<"t/foo/baz">>,
<<"topic/foo/bar">>,
<<"topic/foo/baz">>
],
_ = [{ok, _, _} = emqtt:subscribe(Client, T) || T <- Topics],
Headers = emqx_mgmt_api_test_util:auth_header_(),
MatchQs = [
{"clientid", ?CLIENTID},
{"node", atom_to_list(node())},
{"qos", "0"},
{"match_topic", "t/#"}
]),
],
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(get, Path, MatchQs, Headers),
MatchData = emqx_json:decode(MatchRes, [return_maps]),
MatchMeta = maps:get(<<"meta">>, MatchData),
?assertEqual(1, maps:get(<<"page">>, MatchMeta)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)),
%% count equals 0 in fuzzy searching
?assertEqual(0, maps:get(<<"count">>, MatchMeta)),
MatchSubs = maps:get(<<"data">>, MatchData),
?assertEqual(1, length(MatchSubs)),
MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers),
?assertEqual(1, maps:get(<<"page">>, MatchMeta1)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta1)),
%% count is undefined in fuzzy searching
?assertNot(maps:is_key(<<"count">>, MatchMeta1)),
?assertMatch(3, length(maps:get(<<"data">>, MatchData1))),
?assertEqual(false, maps:get(<<"hasnext">>, MatchMeta1)),
LimitMatchQuery = [
{"clientid", ?CLIENTID},
{"match_topic", "+/+/+"},
{"limit", "3"}
],
MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers),
?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2),
?assertEqual(3, length(maps:get(<<"data">>, MatchData2))),
MatchData2P2 =
#{<<"meta">> := MatchMeta2P2} =
request_json(get, [{"page", "2"} | LimitMatchQuery], Headers),
?assertEqual(#{<<"page">> => 2, <<"limit">> => 3, <<"hasnext">> => false}, MatchMeta2P2),
?assertEqual(1, length(maps:get(<<"data">>, MatchData2P2))).
request_json(Method, Query, Headers) when is_list(Query) ->
Qs = uri_string:compose_query(Query),
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers),
emqx_json:decode(MatchRes, [return_maps]).
path() ->
emqx_mgmt_api_test_util:api_path(["subscriptions"]).
init_per_testcase(_TC, Config) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}),
{ok, _} = emqtt:connect(Client),
[{client, Client} | Config].
end_per_testcase(_TC, Config) ->
Client = proplists:get_value(client, Config),
emqtt:disconnect(Client).