%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mgmt_api). -include_lib("stdlib/include/qlc.hrl"). -elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]). -define(FRESH_SELECT, fresh_select). -export([ paginate/3, paginate/4 ]). %% first_next query APIs -export([ node_query/5, cluster_query/4, select_table_with_count/5, b2i/1 ]). -export([do_query/6]). paginate(Tables, Params, {Module, FormatFun}) -> Qh = query_handle(Tables), Count = count(Tables), do_paginate(Qh, Count, Params, {Module, FormatFun}). paginate(Tables, MatchSpec, Params, {Module, FormatFun}) -> Qh = query_handle(Tables, MatchSpec), Count = count(Tables, MatchSpec), do_paginate(Qh, Count, Params, {Module, FormatFun}). do_paginate(Qh, Count, Params, {Module, FormatFun}) -> Page = b2i(page(Params)), Limit = b2i(limit(Params)), Cursor = qlc:cursor(Qh), case Page > 1 of true -> _ = qlc:next_answers(Cursor, (Page - 1) * Limit), ok; false -> ok end, Rows = qlc:next_answers(Cursor, Limit), qlc:delete_cursor(Cursor), #{ meta => #{page => Page, limit => Limit, count => Count}, data => [erlang:apply(Module, FormatFun, [Row]) || Row <- Rows] }. query_handle(Table) when is_atom(Table) -> qlc:q([R || R <- ets:table(Table)]); query_handle({Table, Opts}) when is_atom(Table) -> qlc:q([R || R <- ets:table(Table, Opts)]); query_handle([Table]) when is_atom(Table) -> qlc:q([R || R <- ets:table(Table)]); query_handle([{Table, Opts}]) when is_atom(Table) -> qlc:q([R || R <- ets:table(Table, Opts)]); query_handle(Tables) -> % qlc:append([query_handle(T) || T <- Tables]). query_handle(Table, MatchSpec) when is_atom(Table) -> Options = {traverse, {select, MatchSpec}}, qlc:q([R || R <- ets:table(Table, Options)]); query_handle([Table], MatchSpec) when is_atom(Table) -> Options = {traverse, {select, MatchSpec}}, qlc:q([R || R <- ets:table(Table, Options)]); query_handle(Tables, MatchSpec) -> Options = {traverse, {select, MatchSpec}}, qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]). count(Table) when is_atom(Table) -> ets:info(Table, size); count({Table, _}) when is_atom(Table) -> ets:info(Table, size); count([Table]) when is_atom(Table) -> ets:info(Table, size); count([{Table, _}]) when is_atom(Table) -> ets:info(Table, size); count(Tables) -> lists:sum([count(T) || T <- Tables]). count(Table, MatchSpec) when is_atom(Table) -> [{MatchPattern, Where, _Re}] = MatchSpec, NMatchSpec = [{MatchPattern, Where, [true]}], ets:select_count(Table, NMatchSpec); count([Table], MatchSpec) when is_atom(Table) -> count(Table, MatchSpec); count(Tables, MatchSpec) -> lists:sum([count(T, MatchSpec) || T <- Tables]). page(Params) when is_map(Params) -> maps:get(<<"page">>, Params, 1); page(Params) -> proplists:get_value(<<"page">>, Params, <<"1">>). limit(Params) when is_map(Params) -> maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit()); limit(Params) -> proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()). init_meta(Params) -> Limit = b2i(limit(Params)), Page = b2i(page(Params)), #{ page => Page, limit => Limit, count => 0 }. %%-------------------------------------------------------------------- %% Node Query %%-------------------------------------------------------------------- node_query(Node, QString, Tab, QSchema, QueryFun) -> {_CodCnt, NQString} = parse_qstring(QString, QSchema), page_limit_check_query( init_meta(QString), {fun do_node_query/5, [Node, Tab, NQString, QueryFun, init_meta(QString)]} ). %% @private do_node_query(Node, Tab, QString, QueryFun, Meta) -> do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). do_node_query( Node, Tab, QString, QueryFun, Continuation, Meta = #{limit := Limit}, Results ) -> case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; {Len, Rows, ?FRESH_SELECT} -> {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), #{meta => NMeta, data => NResults}; {Len, Rows, NContinuation} -> {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults) end. %%-------------------------------------------------------------------- %% Cluster Query %%-------------------------------------------------------------------- cluster_query(QString, Tab, QSchema, QueryFun) -> {_CodCnt, NQString} = parse_qstring(QString, QSchema), Nodes = mria_mnesia:running_nodes(), page_limit_check_query( init_meta(QString), {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]} ). %% @private do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) -> do_cluster_query( Nodes, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = [] ). do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) -> #{meta => Meta, data => Results}; do_cluster_query( [Node | Tail] = Nodes, Tab, QString, QueryFun, Continuation, Meta = #{limit := Limit}, Results ) -> case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of {error, {badrpc, R}} -> {error, Node, {bar_rpc, R}}; {Len, Rows, ?FRESH_SELECT} -> {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults); {Len, Rows, NContinuation} -> {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults) end. %%-------------------------------------------------------------------- %% Do Query (or rpc query) %%-------------------------------------------------------------------- %% @private This function is exempt from BPAPI do_query(Node, Tab, QString, {M, F}, Continuation, Limit) when Node =:= node() -> erlang:apply(M, F, [Tab, QString, Continuation, Limit]); do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> case rpc:call( Node, ?MODULE, do_query, [Node, Tab, QString, QueryFun, Continuation, Limit], 50000 ) of {badrpc, _} = R -> {error, R}; Ret -> Ret end. sub_query_result(Len, Rows, Limit, Results, Meta) -> {Flag, NMeta} = judge_page_with_counting(Len, Meta), NResults = case Flag of more -> []; cutrows -> {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), lists:sublist(lists:append(Results, ThisRows), SubStart, Limit); enough -> lists:sublist(lists:append(Results, Rows), 1, Limit) end, {NMeta, NResults}. %%-------------------------------------------------------------------- %% Table Select %%-------------------------------------------------------------------- select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit, FmtFun) when is_function(FuzzyFilterFun) andalso Limit > 0 -> case ets:select(Tab, Ms, Limit) of '$end_of_table' -> {0, [], ?FRESH_SELECT}; {RawResult, NContinuation} -> Rows = FuzzyFilterFun(RawResult), {length(Rows), lists:map(FmtFun, Rows), NContinuation} end; select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit, FmtFun) when is_function(FuzzyFilterFun) -> case ets:select(ets:repair_continuation(Continuation, Ms)) of '$end_of_table' -> {0, [], ?FRESH_SELECT}; {RawResult, NContinuation} -> Rows = FuzzyFilterFun(RawResult), {length(Rows), lists:map(FmtFun, Rows), NContinuation} end; select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit, FmtFun) when Limit > 0 -> case ets:select(Tab, Ms, Limit) of '$end_of_table' -> {0, [], ?FRESH_SELECT}; {RawResult, NContinuation} -> {length(RawResult), lists:map(FmtFun, RawResult), NContinuation} end; select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) -> case ets:select(ets:repair_continuation(Continuation, Ms)) of '$end_of_table' -> {0, [], ?FRESH_SELECT}; {RawResult, NContinuation} -> {length(RawResult), lists:map(FmtFun, RawResult), NContinuation} end. %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- parse_qstring(QString, QSchema) when is_map(QString) -> parse_qstring(maps:to_list(QString), QSchema); parse_qstring(QString, QSchema) -> {NQString, FuzzyQString} = do_parse_qstring(QString, QSchema, [], []), {length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}. do_parse_qstring([], _, Acc1, Acc2) -> NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)], {lists:reverse(Acc1), lists:reverse(NAcc2)}; do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) -> case proplists:get_value(Key, QSchema) of undefined -> do_parse_qstring(RestQString, QSchema, Acc1, Acc2); Type -> case Key of <> when Prefix =:= <<"gte_">>; Prefix =:= <<"lte_">> -> OpposeKey = case Prefix of <<"gte_">> -> <<"lte_", NKey/binary>>; <<"lte_">> -> <<"gte_", NKey/binary>> end, case lists:keytake(OpposeKey, 1, RestQString) of false -> do_parse_qstring( RestQString, QSchema, [qs(Key, Value, Type) | Acc1], Acc2 ); {value, {K2, V2}, NParams} -> do_parse_qstring( NParams, QSchema, [qs(Key, Value, K2, V2, Type) | Acc1], Acc2 ) end; _ -> case is_fuzzy_key(Key) of true -> do_parse_qstring( RestQString, QSchema, Acc1, [qs(Key, Value, Type) | Acc2] ); _ -> do_parse_qstring( RestQString, QSchema, [qs(Key, Value, Type) | Acc1], Acc2 ) end end end. qs(K1, V1, K2, V2, Type) -> {Key, Op1, NV1} = qs(K1, V1, Type), {Key, Op2, NV2} = qs(K2, V2, Type), {Key, Op1, NV1, Op2, NV2}. qs(K, Value0, Type) -> try qs(K, to_type(Value0, Type)) catch throw:bad_value_type -> throw({bad_value_type, {K, Type, Value0}}) end. qs(<<"gte_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), '>=', Value}; qs(<<"lte_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), '=<', Value}; qs(<<"like_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), like, Value}; qs(<<"match_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), match, Value}; qs(Key, Value) -> {binary_to_existing_atom(Key, utf8), '=:=', Value}. is_fuzzy_key(<<"like_", _/binary>>) -> true; is_fuzzy_key(<<"match_", _/binary>>) -> true; is_fuzzy_key(_) -> false. page_start(1, _) -> 1; page_start(Page, Limit) -> (Page - 1) * Limit + 1. judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) -> PageStart = page_start(Page, Limit), PageEnd = Page * Limit, case Count + Len of NCount when NCount < PageStart -> {more, Meta#{count => NCount}}; NCount when NCount < PageEnd -> {cutrows, Meta#{count => NCount}}; NCount when NCount >= PageEnd -> {enough, Meta#{count => NCount}} end. rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) -> PageStart = page_start(Page, Limit), case (Count - Len) < PageStart of true -> NeedNowNum = Count - PageStart + 1, SubStart = Len - NeedNowNum + 1, {SubStart, NeedNowNum}; false -> {_SubStart = 1, _NeedNowNum = Len} end. page_limit_check_query(Meta, {F, A}) -> case Meta of #{page := Page, limit := Limit} when Page < 1; Limit < 1 -> {error, page_limit_invalid}; _ -> erlang:apply(F, A) end. %%-------------------------------------------------------------------- %% Types %%-------------------------------------------------------------------- to_type(V, TargetType) -> try to_type_(V, TargetType) catch _:_ -> throw(bad_value_type) end. to_type_(V, atom) -> to_atom(V); to_type_(V, integer) -> to_integer(V); to_type_(V, timestamp) -> to_timestamp(V); to_type_(V, ip) -> aton(V); to_type_(V, ip_port) -> to_ip_port(V); to_type_(V, _) -> V. to_atom(A) when is_atom(A) -> A; to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8). to_integer(I) when is_integer(I) -> I; to_integer(B) when is_binary(B) -> binary_to_integer(B). to_timestamp(I) when is_integer(I) -> I; to_timestamp(B) when is_binary(B) -> binary_to_integer(B). aton(B) when is_binary(B) -> list_to_tuple([binary_to_integer(T) || T <- re:split(B, "[.]")]). to_ip_port(IPAddress) -> [IP0, Port0] = string:tokens(binary_to_list(IPAddress), ":"), {ok, IP} = inet:parse_address(IP0), Port = list_to_integer(Port0), {IP, Port}. %%-------------------------------------------------------------------- %% EUnits %%-------------------------------------------------------------------- -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). params2qs_test() -> QSchema = [ {<<"str">>, binary}, {<<"int">>, integer}, {<<"atom">>, atom}, {<<"ts">>, timestamp}, {<<"gte_range">>, integer}, {<<"lte_range">>, integer}, {<<"like_fuzzy">>, binary}, {<<"match_topic">>, binary} ], QString = [ {<<"str">>, <<"abc">>}, {<<"int">>, <<"123">>}, {<<"atom">>, <<"connected">>}, {<<"ts">>, <<"156000">>}, {<<"gte_range">>, <<"1">>}, {<<"lte_range">>, <<"5">>}, {<<"like_fuzzy">>, <<"user">>}, {<<"match_topic">>, <<"t/#">>} ], ExpectedQs = [ {str, '=:=', <<"abc">>}, {int, '=:=', 123}, {atom, '=:=', connected}, {ts, '=:=', 156000}, {range, '>=', 1, '=<', 5} ], FuzzyNQString = [ {fuzzy, like, <<"user">>}, {topic, match, <<"t/#">>} ], ?assertEqual({7, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)), {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema). -endif. b2i(Bin) when is_binary(Bin) -> binary_to_integer(Bin); b2i(Any) -> Any.