From e07aa2086bdd9e0c428a5da3436024ff55eb0ca7 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 11 Jan 2023 16:53:47 +0300 Subject: [PATCH] fix(api): augment paged search responses with `hasnext` flag This flag indicates whether there are more results available on the next pages. It is needed in cases when the total number of search results is not known in advance. Also, in such cases there's no `count` field in responses anymore because responding with `0` was confusing for clients: it's not possible to differentiate between "there are no results" and "we don't know how much". Co-authored-by: Thales Macedo Garitezi --- .../src/emqx_dashboard_swagger.erl | 16 +- apps/emqx_management/src/emqx_mgmt_api.erl | 187 ++++++++++-------- .../test/emqx_mgmt_api_SUITE.erl | 7 +- .../test/emqx_mgmt_api_subscription_SUITE.erl | 79 ++++++-- 4 files changed, 180 insertions(+), 109 deletions(-) diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 102b95f4e..42fdcd46d 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -139,14 +139,20 @@ fields(limit) -> [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}]; fields(count) -> Desc = << - "Total number of records counted.
" - "Note: this field is 0 when the queryed table is empty, " - "or if the query can not be optimized and requires a full table scan." + "Total number of records matching the query.
" + "Note: this field is present only if the query can be optimized and does " + "not require a full table scan." + >>, + Meta = #{desc => Desc, required => false}, + [{count, hoconsc:mk(non_neg_integer(), Meta)}]; +fields(hasnext) -> + Desc = << + "Flag indicating whether there are more results available on next pages." >>, Meta = #{desc => Desc, required => true}, - [{count, hoconsc:mk(non_neg_integer(), Meta)}]; + [{hasnext, hoconsc:mk(boolean(), Meta)}]; fields(meta) -> - fields(page) ++ fields(limit) ++ fields(count). + fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext). -spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema_map(). schema_with_example(Type, Example) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 893007ebf..614949f16 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -20,7 +20,6 @@ -elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]). --define(FRESH_SELECT, fresh_select). -define(LONG_QUERY_TIMEOUT, 50000). -export([ @@ -174,13 +173,12 @@ do_node_query( case do_query(Node, QueryState) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; - {Rows, NQueryState = #{continuation := ?FRESH_SELECT}} -> - {_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc), - NResultAcc; - {Rows, NQueryState} -> + {Rows, NQueryState = #{complete := Complete}} -> case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of {enough, NResultAcc} -> - NResultAcc; + finalize_query(NResultAcc, NQueryState); + {_, NResultAcc} when Complete -> + finalize_query(NResultAcc, NQueryState); {more, NResultAcc} -> do_node_query(Node, NQueryState, NResultAcc) end @@ -212,8 +210,6 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) -> end. %% @private -do_cluster_query([], _QueryState, ResultAcc) -> - ResultAcc; do_cluster_query( [Node | Tail] = Nodes, QueryState, @@ -222,31 +218,29 @@ do_cluster_query( case do_query(Node, QueryState) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; - {Rows, NQueryState} -> + {Rows, NQueryState = #{complete := Complete}} -> case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of {enough, NResultAcc} -> - maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc); + FQueryState = maybe_collect_total_from_tail_nodes(Tail, NQueryState), + FComplete = Complete andalso Tail =:= [], + finalize_query(NResultAcc, mark_complete(FQueryState, FComplete)); + {more, NResultAcc} when not Complete -> + do_cluster_query(Nodes, NQueryState, NResultAcc); + {more, NResultAcc} when Tail =/= [] -> + do_cluster_query(Tail, reset_query_state(NQueryState), NResultAcc); {more, NResultAcc} -> - NextNodes = - case NQueryState of - #{continuation := ?FRESH_SELECT} -> Tail; - _ -> Nodes - end, - do_cluster_query(NextNodes, NQueryState, NResultAcc) + finalize_query(NResultAcc, NQueryState) end end. -maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) -> - ResultAcc; -maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) -> - case counting_total_fun(QueryState) of - false -> - ResultAcc; - _Fun -> - collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) - end. +maybe_collect_total_from_tail_nodes([], QueryState) -> + QueryState; +maybe_collect_total_from_tail_nodes(Nodes, QueryState = #{total := _}) -> + collect_total_from_tail_nodes(Nodes, QueryState); +maybe_collect_total_from_tail_nodes(_Nodes, QueryState) -> + QueryState. -collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) -> +collect_total_from_tail_nodes(Nodes, QueryState = #{total := TotalAcc}) -> %% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of {_, [Node | _]} -> @@ -257,7 +251,8 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc [{Node, {badrpc, Reason}} | _] -> {error, Node, {badrpc, Reason}}; [] -> - ResultAcc#{total => ResL ++ TotalAcc} + NTotalAcc = maps:merge(TotalAcc, maps:from_list(ResL)), + QueryState#{total := NTotalAcc} end end. @@ -266,13 +261,14 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc %%-------------------------------------------------------------------- %% QueryState :: -%% #{continuation := ets:continuation(), +%% #{continuation => ets:continuation(), %% page := pos_integer(), %% limit := pos_integer(), -%% total := [{node(), non_neg_integer()}], +%% total => #{node() => non_neg_integer()}, %% table := atom(), -%% qs := {Qs, Fuzzy} %% parsed query params -%% msfun := query_to_match_spec_fun() +%% qs := {Qs, Fuzzy}, %% parsed query params +%% msfun := query_to_match_spec_fun(), +%% complete := boolean() %% } init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) -> #{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]), @@ -285,17 +281,31 @@ init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) - true = is_list(Args), {type, external} = erlang:fun_info(NamedFun, type) end, - #{ + QueryState = #{ page => Page, limit => Limit, table => Tab, qs => QString, msfun => MsFun, - mactch_spec => Ms, + match_spec => Ms, fuzzy_fun => FuzzyFun, - total => [], - continuation => ?FRESH_SELECT - }. + complete => false + }, + case counting_total_fun(QueryState) of + false -> + QueryState; + Fun when is_function(Fun) -> + QueryState#{total => #{}} + end. + +reset_query_state(QueryState) -> + maps:remove(continuation, mark_complete(QueryState, false)). + +mark_complete(QueryState) -> + mark_complete(QueryState, true). + +mark_complete(QueryState, Complete) -> + QueryState#{complete => Complete}. %% @private This function is exempt from BPAPI do_query(Node, QueryState) when Node =:= node() -> @@ -318,47 +328,50 @@ do_select( Node, QueryState0 = #{ table := Tab, - mactch_spec := Ms, - fuzzy_fun := FuzzyFun, - continuation := Continuation, - limit := Limit + match_spec := Ms, + limit := Limit, + complete := false } ) -> QueryState = maybe_apply_total_query(Node, QueryState0), Result = - case Continuation of - ?FRESH_SELECT -> + case maps:get(continuation, QueryState, undefined) of + undefined -> ets:select(Tab, Ms, Limit); - _ -> + Continuation -> %% XXX: Repair is necessary because we pass Continuation back %% and forth through the nodes in the `do_cluster_query` ets:select(ets:repair_continuation(Continuation, Ms)) end, case Result of - '$end_of_table' -> - {[], QueryState#{continuation => ?FRESH_SELECT}}; + {Rows, '$end_of_table'} -> + NRows = maybe_apply_fuzzy_filter(Rows, QueryState), + {NRows, mark_complete(QueryState)}; {Rows, NContinuation} -> - NRows = - case FuzzyFun of - undefined -> - Rows; - {FilterFun, Args0} when is_function(FilterFun), is_list(Args0) -> - lists:filter( - fun(E) -> erlang:apply(FilterFun, [E | Args0]) end, - Rows - ) - end, - {NRows, QueryState#{continuation => NContinuation}} + NRows = maybe_apply_fuzzy_filter(Rows, QueryState), + {NRows, QueryState#{continuation => NContinuation}}; + '$end_of_table' -> + {[], mark_complete(QueryState)} end. -maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) -> - case proplists:get_value(Node, TotalAcc, undefined) of - undefined -> - Total = apply_total_query(QueryState), - QueryState#{total := [{Node, Total} | TotalAcc]}; - _ -> - QueryState - end. +maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := undefined}) -> + Rows; +maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := {FilterFun, Args}}) -> + lists:filter( + fun(E) -> erlang:apply(FilterFun, [E | Args]) end, + Rows + ). + +maybe_apply_total_query(Node, QueryState = #{total := Acc}) -> + case Acc of + #{Node := _} -> + QueryState; + #{} -> + NodeTotal = apply_total_query(QueryState), + QueryState#{total := Acc#{Node => NodeTotal}} + end; +maybe_apply_total_query(_Node, QueryState = #{}) -> + QueryState. apply_total_query(QueryState = #{table := Tab}) -> case counting_total_fun(QueryState) of @@ -371,7 +384,7 @@ apply_total_query(QueryState = #{table := Tab}) -> counting_total_fun(_QueryState = #{qs := {[], []}}) -> fun(Tab) -> ets:info(Tab, size) end; -counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) -> +counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) -> %% XXX: Calculating the total number of data that match a certain %% condition under a large table is very expensive because the %% entire ETS table needs to be scanned. @@ -390,15 +403,16 @@ counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= und %% ResultAcc :: #{count := integer(), %% cursor := integer(), %% rows := [{node(), Rows :: list()}], -%% total := [{node() => integer()}] +%% partial := boolean(), +%% hasnext => boolean() %% } init_query_result() -> - #{cursor => 0, count => 0, rows => [], total => []}. + #{cursor => 0, count => 0, rows => [], partial => false}. accumulate_query_rows( Node, Rows, - _QueryState = #{page := Page, limit := Limit, total := TotalAcc}, + _QueryState = #{page := Page, limit := Limit}, ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc} ) -> PageStart = (Page - 1) * Limit + 1, @@ -406,12 +420,11 @@ accumulate_query_rows( Len = length(Rows), case Cursor + Len of NCursor when NCursor < PageStart -> - {more, ResultAcc#{cursor => NCursor, total => TotalAcc}}; + {more, ResultAcc#{cursor => NCursor}}; NCursor when NCursor < PageEnd -> {more, ResultAcc#{ cursor => NCursor, count => Count + length(Rows), - total => TotalAcc, rows => [{Node, Rows} | RowsAcc] }}; NCursor when NCursor >= PageEnd -> @@ -419,11 +432,21 @@ accumulate_query_rows( {enough, ResultAcc#{ cursor => NCursor, count => Count + length(SubRows), - total => TotalAcc, - rows => [{Node, SubRows} | RowsAcc] + rows => [{Node, SubRows} | RowsAcc], + partial => (Limit - Count) < Len }} end. +finalize_query(Result = #{partial := Partial}, QueryState = #{complete := Complete}) -> + HasNext = Partial orelse not Complete, + maybe_accumulate_totals(Result#{hasnext => HasNext}, QueryState). + +maybe_accumulate_totals(Result, #{total := TotalAcc}) -> + QueryTotal = maps:fold(fun(_Node, T, N) -> N + T end, 0, TotalAcc), + Result#{total => QueryTotal}; +maybe_accumulate_totals(Result, _QueryState) -> + Result. + %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- @@ -520,16 +543,22 @@ is_fuzzy_key(<<"match_", _/binary>>) -> is_fuzzy_key(_) -> false. -format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) -> +format_query_result(_FmtFun, _MetaIn, Error = {error, _Node, _Reason}) -> Error; format_query_result( - FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc} + FmtFun, MetaIn, ResultAcc = #{hasnext := HasNext, rows := RowsAcc} ) -> - Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc), + Meta = + case ResultAcc of + #{total := QueryTotal} -> + %% The `count` is used in HTTP API to indicate the total number of + %% queries that can be read + MetaIn#{hasnext => HasNext, count => QueryTotal}; + #{} -> + MetaIn#{hasnext => HasNext} + end, #{ - %% The `count` is used in HTTP API to indicate the total number of - %% queries that can be read - meta => Meta#{count => Total}, + meta => Meta, data => lists:flatten( lists:foldl( fun({Node, Rows}, Acc) -> @@ -552,7 +581,7 @@ parse_pager_params(Params) -> Limit = b2i(limit(Params)), case Page > 0 andalso Limit > 0 of true -> - #{page => Page, limit => Limit, count => 0}; + #{page => Page, limit => Limit}; false -> false end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index a14305d8b..a8bbfa6d9 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -88,10 +88,9 @@ t_cluster_query(_Config) -> %% fuzzy searching can't return total {200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}), - ?assertMatch( - #{count := 0}, - maps:get(meta, ClientsNode2) - ), + MetaNode2 = maps:get(meta, ClientsNode2), + ?assertNotMatch(#{count := _}, MetaNode2), + ?assertMatch(#{hasnext := false}, MetaNode2), ?assertMatch(10, length(maps:get(data, ClientsNode2))), _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 965ed0997..2ab213e30 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -44,9 +44,8 @@ init_per_suite(Config) -> end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite(). -t_subscription_api(_) -> - {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}), - {ok, _} = emqtt:connect(Client), +t_subscription_api(Config) -> + Client = proplists:get_value(client, Config), {ok, _, _} = emqtt:subscribe( Client, [ {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} @@ -84,40 +83,78 @@ t_subscription_api(_) -> ?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2), ?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID), - QS = uri_string:compose_query([ + QS = [ {"clientid", ?CLIENTID}, {"topic", ?TOPIC2_TOPIC_ONLY}, {"node", atom_to_list(node())}, {"qos", "0"}, {"share_group", "test_group"}, {"match_topic", "t/#"} - ]), + ], Headers = emqx_mgmt_api_test_util:auth_header_(), - {ok, ResponseTopic2} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers), - DataTopic2 = emqx_json:decode(ResponseTopic2, [return_maps]), - Meta2 = maps:get(<<"meta">>, DataTopic2), + DataTopic2 = #{<<"meta">> := Meta2} = request_json(get, QS, Headers), ?assertEqual(1, maps:get(<<"page">>, Meta2)), ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta2)), ?assertEqual(1, maps:get(<<"count">>, Meta2)), SubscriptionsList2 = maps:get(<<"data">>, DataTopic2), - ?assertEqual(length(SubscriptionsList2), 1), + ?assertEqual(length(SubscriptionsList2), 1). - MatchQs = uri_string:compose_query([ +t_subscription_fuzzy_search(Config) -> + Client = proplists:get_value(client, Config), + Topics = [ + <<"t/foo">>, + <<"t/foo/bar">>, + <<"t/foo/baz">>, + <<"topic/foo/bar">>, + <<"topic/foo/baz">> + ], + _ = [{ok, _, _} = emqtt:subscribe(Client, T) || T <- Topics], + + Headers = emqx_mgmt_api_test_util:auth_header_(), + MatchQs = [ {"clientid", ?CLIENTID}, {"node", atom_to_list(node())}, - {"qos", "0"}, {"match_topic", "t/#"} - ]), + ], - {ok, MatchRes} = emqx_mgmt_api_test_util:request_api(get, Path, MatchQs, Headers), - MatchData = emqx_json:decode(MatchRes, [return_maps]), - MatchMeta = maps:get(<<"meta">>, MatchData), - ?assertEqual(1, maps:get(<<"page">>, MatchMeta)), - ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)), - %% count equals 0 in fuzzy searching - ?assertEqual(0, maps:get(<<"count">>, MatchMeta)), - MatchSubs = maps:get(<<"data">>, MatchData), - ?assertEqual(1, length(MatchSubs)), + MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers), + ?assertEqual(1, maps:get(<<"page">>, MatchMeta1)), + ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta1)), + %% count is undefined in fuzzy searching + ?assertNot(maps:is_key(<<"count">>, MatchMeta1)), + ?assertMatch(3, length(maps:get(<<"data">>, MatchData1))), + ?assertEqual(false, maps:get(<<"hasnext">>, MatchMeta1)), + LimitMatchQuery = [ + {"clientid", ?CLIENTID}, + {"match_topic", "+/+/+"}, + {"limit", "3"} + ], + + MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers), + ?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2), + ?assertEqual(3, length(maps:get(<<"data">>, MatchData2))), + + MatchData2P2 = + #{<<"meta">> := MatchMeta2P2} = + request_json(get, [{"page", "2"} | LimitMatchQuery], Headers), + ?assertEqual(#{<<"page">> => 2, <<"limit">> => 3, <<"hasnext">> => false}, MatchMeta2P2), + ?assertEqual(1, length(maps:get(<<"data">>, MatchData2P2))). + +request_json(Method, Query, Headers) when is_list(Query) -> + Qs = uri_string:compose_query(Query), + {ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers), + emqx_json:decode(MatchRes, [return_maps]). + +path() -> + emqx_mgmt_api_test_util:api_path(["subscriptions"]). + +init_per_testcase(_TC, Config) -> + {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}), + {ok, _} = emqtt:connect(Client), + [{client, Client} | Config]. + +end_per_testcase(_TC, Config) -> + Client = proplists:get_value(client, Config), emqtt:disconnect(Client).