From 321d941eafb4aaa5f0be7f6cd42f055a2e0adcae Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sat, 9 Oct 2021 16:35:23 +0800 Subject: [PATCH] fix(mgmt_api): rows too big fixing. --- apps/emqx_management/src/emqx_mgmt_api.erl | 138 +++++++++++++++------ 1 file changed, 98 insertions(+), 40 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 86acabdc1..d932f89fd 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -116,29 +116,47 @@ limit(Params) -> %%-------------------------------------------------------------------- node_query(Node, Params, Tab, QsSchema, QueryFun) -> - {CodCnt, Qs} = params2qs(Params, QsSchema), + {_CodCnt, Qs} = params2qs(Params, QsSchema), Limit = b2i(limit(Params)), Page = b2i(page(Params)), - PageStart = page_start(Page, Limit), - %% Rows so big, fixme. - Rows = do_node_query(Node, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Limit, []), - Data = lists:sublist(Rows, PageStart, Limit), - Meta = #{page => Page, limit => Limit}, - NMeta = case CodCnt =:= 0 of - true -> Meta#{count => count(Tab)}; - _ -> Meta#{count => length(Rows)} - end, - #{meta => NMeta, data => Data}. + Meta = #{page => Page, limit => Limit, count => 0}, + do_node_query(Node, Tab, Qs, QueryFun, Meta). %% @private -do_node_query(Node, Tab, Qs, QueryFun, Continuation, Limit, Acc) -> - {Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit), - NAcc = [Rows | Acc], - case NContinuation of - ?FRESH_SELECT -> - lists:append(lists:reverse(Acc)); - _ -> - do_node_query(Node, Tab, Qs, QueryFun, NContinuation, Limit, NAcc) +do_node_query(Node, Tab, Qs, QueryFun, Meta) -> + do_node_query(Node, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). + +do_node_query( Node, Tab, Qs, QueryFun, Continuation + , Meta = #{limit := Limit} + , Results) -> + {Len, Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit), + case judge_page_with_counting(Len, Meta) of + {more, NMeta} -> + case NContinuation of + ?FRESH_SELECT -> + #{meta => NMeta, data => []}; %% page and limit too big + _ -> + do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, []) + end; + {cutrows, NMeta} -> + {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), + ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), + NResults = lists:sublist( lists:append(Results, ThisRows) + , SubStart, Limit), + case NContinuation of + ?FRESH_SELECT -> + #{meta => NMeta, data => NResults}; + _ -> + do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) + end; + {enough, NMeta} -> + NResults = lists:sublist(lists:append(Results, Rows), 1, Limit), + case NContinuation of + ?FRESH_SELECT -> + #{meta => NMeta, data => NResults}; + _ -> + do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) + end end. %%-------------------------------------------------------------------- @@ -146,32 +164,50 @@ do_node_query(Node, Tab, Qs, QueryFun, Continuation, Limit, Acc) -> %%-------------------------------------------------------------------- cluster_query(Params, Tab, QsSchema, QueryFun) -> - {CodCnt, Qs} = params2qs(Params, QsSchema), + {_CodCnt, Qs} = params2qs(Params, QsSchema), Limit = b2i(limit(Params)), Page = b2i(page(Params)), - PageStart = page_start(Page, Limit), Nodes = ekka_mnesia:running_nodes(), - %% Rows so big, fixme. - Rows = do_cluster_query(Nodes, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Limit, []), - Data = lists:sublist(Rows, PageStart, Limit), - Meta = #{page => Page, limit => Limit}, - NMeta = case CodCnt =:= 0 of - true -> Meta#{count => lists:sum([rpc_call(Node, ets, info, [Tab, size], 5000) || Node <- Nodes])}; - _ -> Meta#{count => length(Rows)} - end, - #{meta => NMeta, data => Data}. + Meta = #{page => Page, limit => Limit, count => 0}, + do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta). %% @private -do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, _Limit, Acc) -> - lists:append(lists:reverse(Acc)); -do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, Continuation, Limit, Acc) -> - {Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit), - NAcc = [Rows | Acc], - case NContinuation of - ?FRESH_SELECT -> - do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, Limit, NAcc); - _ -> - do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, Limit, NAcc) +do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) -> + do_cluster_query(Nodes, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). + +do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, Meta, Results) -> + #{meta => Meta, data => Results}; +do_cluster_query( [Node | Nodes], Tab, Qs, QueryFun, Continuation + , Meta = #{limit := Limit} + , Results) -> + {Len, Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit), + case judge_page_with_counting(Len, Meta) of + {more, NMeta} -> + case NContinuation of + ?FRESH_SELECT -> + do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, []); %% next node with parts of results + _ -> + do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, []) %% continue this node + end; + {cutrows, NMeta} -> + {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), + ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), + NResults = lists:sublist( lists:append(Results, ThisRows) + , SubStart, Limit), + case NContinuation of + ?FRESH_SELECT -> + do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults); %% next node with parts of results + _ -> + do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, NResults) %% continue this node + end; + {enough, NMeta} -> + NResults = lists:sublist(lists:append(Results, Rows), 1, Limit), + case NContinuation of + ?FRESH_SELECT -> + do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults); %% next node with parts of results + _ -> + do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, NResults) %% continue this node + end end. %%-------------------------------------------------------------------- @@ -309,6 +345,28 @@ page_start(Page, Limit) -> true -> 1 end. +judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) -> + PageStart = page_start(Page, Limit), + PageEnd = Page * Limit, + case Count + Len of + NCount when NCount < PageStart -> + {more, Meta#{count => NCount}}; + NCount when NCount < PageEnd -> + {cutrows, Meta#{count => NCount}}; + NCount when NCount >= PageEnd -> + {enough, Meta#{count => NCount}} + end. + +rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) -> + PageStart = page_start(Page, Limit), + if Count - Len < PageStart -> + NeedNowNum = Count - PageStart + 1, + SubStart = Len - NeedNowNum + 1, + {SubStart, NeedNowNum}; + true -> + {_SubStart = 1, _NeedNowNum = Len} + end. + %%-------------------------------------------------------------------- %% Types %%--------------------------------------------------------------------