510 lines
16 KiB
Erlang
510 lines
16 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt_api).
|
|
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
|
|
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]).
|
|
|
|
-define(FRESH_SELECT, fresh_select).
|
|
|
|
-export([
|
|
paginate/3,
|
|
paginate/4
|
|
]).
|
|
|
|
%% first_next query APIs
|
|
-export([
|
|
node_query/5,
|
|
cluster_query/4,
|
|
select_table_with_count/5,
|
|
b2i/1
|
|
]).
|
|
|
|
-export([do_query/6]).
|
|
|
|
paginate(Tables, Params, {Module, FormatFun}) ->
|
|
Qh = query_handle(Tables),
|
|
Count = count(Tables),
|
|
do_paginate(Qh, Count, Params, {Module, FormatFun}).
|
|
|
|
paginate(Tables, MatchSpec, Params, {Module, FormatFun}) ->
|
|
Qh = query_handle(Tables, MatchSpec),
|
|
Count = count(Tables, MatchSpec),
|
|
do_paginate(Qh, Count, Params, {Module, FormatFun}).
|
|
|
|
do_paginate(Qh, Count, Params, {Module, FormatFun}) ->
|
|
Page = b2i(page(Params)),
|
|
Limit = b2i(limit(Params)),
|
|
Cursor = qlc:cursor(Qh),
|
|
case Page > 1 of
|
|
true ->
|
|
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
|
|
ok;
|
|
false ->
|
|
ok
|
|
end,
|
|
Rows = qlc:next_answers(Cursor, Limit),
|
|
qlc:delete_cursor(Cursor),
|
|
#{
|
|
meta => #{page => Page, limit => Limit, count => Count},
|
|
data => [erlang:apply(Module, FormatFun, [Row]) || Row <- Rows]
|
|
}.
|
|
|
|
query_handle(Table) when is_atom(Table) ->
|
|
qlc:q([R || R <- ets:table(Table)]);
|
|
query_handle({Table, Opts}) when is_atom(Table) ->
|
|
qlc:q([R || R <- ets:table(Table, Opts)]);
|
|
query_handle([Table]) when is_atom(Table) ->
|
|
qlc:q([R || R <- ets:table(Table)]);
|
|
query_handle([{Table, Opts}]) when is_atom(Table) ->
|
|
qlc:q([R || R <- ets:table(Table, Opts)]);
|
|
query_handle(Tables) ->
|
|
%
|
|
qlc:append([query_handle(T) || T <- Tables]).
|
|
|
|
query_handle(Table, MatchSpec) when is_atom(Table) ->
|
|
Options = {traverse, {select, MatchSpec}},
|
|
qlc:q([R || R <- ets:table(Table, Options)]);
|
|
query_handle([Table], MatchSpec) when is_atom(Table) ->
|
|
Options = {traverse, {select, MatchSpec}},
|
|
qlc:q([R || R <- ets:table(Table, Options)]);
|
|
query_handle(Tables, MatchSpec) ->
|
|
Options = {traverse, {select, MatchSpec}},
|
|
qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]).
|
|
|
|
count(Table) when is_atom(Table) ->
|
|
ets:info(Table, size);
|
|
count({Table, _}) when is_atom(Table) ->
|
|
ets:info(Table, size);
|
|
count([Table]) when is_atom(Table) ->
|
|
ets:info(Table, size);
|
|
count([{Table, _}]) when is_atom(Table) ->
|
|
ets:info(Table, size);
|
|
count(Tables) ->
|
|
lists:sum([count(T) || T <- Tables]).
|
|
|
|
count(Table, MatchSpec) when is_atom(Table) ->
|
|
[{MatchPattern, Where, _Re}] = MatchSpec,
|
|
NMatchSpec = [{MatchPattern, Where, [true]}],
|
|
ets:select_count(Table, NMatchSpec);
|
|
count([Table], MatchSpec) when is_atom(Table) ->
|
|
count(Table, MatchSpec);
|
|
count(Tables, MatchSpec) ->
|
|
lists:sum([count(T, MatchSpec) || T <- Tables]).
|
|
|
|
page(Params) when is_map(Params) ->
|
|
maps:get(<<"page">>, Params, 1);
|
|
page(Params) ->
|
|
proplists:get_value(<<"page">>, Params, <<"1">>).
|
|
|
|
limit(Params) when is_map(Params) ->
|
|
maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit());
|
|
limit(Params) ->
|
|
proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
|
|
|
|
init_meta(Params) ->
|
|
Limit = b2i(limit(Params)),
|
|
Page = b2i(page(Params)),
|
|
#{
|
|
page => Page,
|
|
limit => Limit,
|
|
count => 0
|
|
}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Node Query
|
|
%%--------------------------------------------------------------------
|
|
|
|
node_query(Node, QString, Tab, QSchema, QueryFun) ->
|
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
page_limit_check_query(
|
|
init_meta(QString),
|
|
{fun do_node_query/5, [Node, Tab, NQString, QueryFun, init_meta(QString)]}
|
|
).
|
|
|
|
%% @private
|
|
do_node_query(Node, Tab, QString, QueryFun, Meta) ->
|
|
do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
|
|
|
|
do_node_query(
|
|
Node,
|
|
Tab,
|
|
QString,
|
|
QueryFun,
|
|
Continuation,
|
|
Meta = #{limit := Limit},
|
|
Results
|
|
) ->
|
|
case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
|
|
{error, {badrpc, R}} ->
|
|
{error, Node, {badrpc, R}};
|
|
{Len, Rows, ?FRESH_SELECT} ->
|
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
#{meta => NMeta, data => NResults};
|
|
{Len, Rows, NContinuation} ->
|
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Cluster Query
|
|
%%--------------------------------------------------------------------
|
|
|
|
cluster_query(QString, Tab, QSchema, QueryFun) ->
|
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
page_limit_check_query(
|
|
init_meta(QString),
|
|
{fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]}
|
|
).
|
|
|
|
%% @private
|
|
do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) ->
|
|
do_cluster_query(
|
|
Nodes,
|
|
Tab,
|
|
QString,
|
|
QueryFun,
|
|
_Continuation = ?FRESH_SELECT,
|
|
Meta,
|
|
_Results = []
|
|
).
|
|
|
|
do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) ->
|
|
#{meta => Meta, data => Results};
|
|
do_cluster_query(
|
|
[Node | Tail] = Nodes,
|
|
Tab,
|
|
QString,
|
|
QueryFun,
|
|
Continuation,
|
|
Meta = #{limit := Limit},
|
|
Results
|
|
) ->
|
|
case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
|
|
{error, {badrpc, R}} ->
|
|
{error, Node, {bar_rpc, R}};
|
|
{Len, Rows, ?FRESH_SELECT} ->
|
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults);
|
|
{Len, Rows, NContinuation} ->
|
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Do Query (or rpc query)
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @private This function is exempt from BPAPI
|
|
do_query(Node, Tab, QString, {M, F}, Continuation, Limit) when Node =:= node() ->
|
|
erlang:apply(M, F, [Tab, QString, Continuation, Limit]);
|
|
do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
|
|
case
|
|
rpc:call(
|
|
Node,
|
|
?MODULE,
|
|
do_query,
|
|
[Node, Tab, QString, QueryFun, Continuation, Limit],
|
|
50000
|
|
)
|
|
of
|
|
{badrpc, _} = R -> {error, R};
|
|
Ret -> Ret
|
|
end.
|
|
|
|
sub_query_result(Len, Rows, Limit, Results, Meta) ->
|
|
{Flag, NMeta} = judge_page_with_counting(Len, Meta),
|
|
NResults =
|
|
case Flag of
|
|
more ->
|
|
[];
|
|
cutrows ->
|
|
{SubStart, NeedNowNum} = rows_sub_params(Len, NMeta),
|
|
ThisRows = lists:sublist(Rows, SubStart, NeedNowNum),
|
|
lists:sublist(lists:append(Results, ThisRows), SubStart, Limit);
|
|
enough ->
|
|
lists:sublist(lists:append(Results, Rows), 1, Limit)
|
|
end,
|
|
{NMeta, NResults}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Table Select
|
|
%%--------------------------------------------------------------------
|
|
|
|
select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit, FmtFun) when
|
|
is_function(FuzzyFilterFun) andalso Limit > 0
|
|
->
|
|
case ets:select(Tab, Ms, Limit) of
|
|
'$end_of_table' ->
|
|
{0, [], ?FRESH_SELECT};
|
|
{RawResult, NContinuation} ->
|
|
Rows = FuzzyFilterFun(RawResult),
|
|
{length(Rows), lists:map(FmtFun, Rows), NContinuation}
|
|
end;
|
|
select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit, FmtFun) when
|
|
is_function(FuzzyFilterFun)
|
|
->
|
|
case ets:select(ets:repair_continuation(Continuation, Ms)) of
|
|
'$end_of_table' ->
|
|
{0, [], ?FRESH_SELECT};
|
|
{RawResult, NContinuation} ->
|
|
Rows = FuzzyFilterFun(RawResult),
|
|
{length(Rows), lists:map(FmtFun, Rows), NContinuation}
|
|
end;
|
|
select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit, FmtFun) when
|
|
Limit > 0
|
|
->
|
|
case ets:select(Tab, Ms, Limit) of
|
|
'$end_of_table' ->
|
|
{0, [], ?FRESH_SELECT};
|
|
{RawResult, NContinuation} ->
|
|
{length(RawResult), lists:map(FmtFun, RawResult), NContinuation}
|
|
end;
|
|
select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) ->
|
|
case ets:select(ets:repair_continuation(Continuation, Ms)) of
|
|
'$end_of_table' ->
|
|
{0, [], ?FRESH_SELECT};
|
|
{RawResult, NContinuation} ->
|
|
{length(RawResult), lists:map(FmtFun, RawResult), NContinuation}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal Functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
parse_qstring(QString, QSchema) when is_map(QString) ->
|
|
parse_qstring(maps:to_list(QString), QSchema);
|
|
parse_qstring(QString, QSchema) ->
|
|
{NQString, FuzzyQString} = do_parse_qstring(QString, QSchema, [], []),
|
|
{length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}.
|
|
|
|
do_parse_qstring([], _, Acc1, Acc2) ->
|
|
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
|
|
{lists:reverse(Acc1), lists:reverse(NAcc2)};
|
|
do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) ->
|
|
case proplists:get_value(Key, QSchema) of
|
|
undefined ->
|
|
do_parse_qstring(RestQString, QSchema, Acc1, Acc2);
|
|
Type ->
|
|
case Key of
|
|
<<Prefix:4/binary, NKey/binary>> when
|
|
Prefix =:= <<"gte_">>;
|
|
Prefix =:= <<"lte_">>
|
|
->
|
|
OpposeKey =
|
|
case Prefix of
|
|
<<"gte_">> -> <<"lte_", NKey/binary>>;
|
|
<<"lte_">> -> <<"gte_", NKey/binary>>
|
|
end,
|
|
case lists:keytake(OpposeKey, 1, RestQString) of
|
|
false ->
|
|
do_parse_qstring(
|
|
RestQString,
|
|
QSchema,
|
|
[qs(Key, Value, Type) | Acc1],
|
|
Acc2
|
|
);
|
|
{value, {K2, V2}, NParams} ->
|
|
do_parse_qstring(
|
|
NParams,
|
|
QSchema,
|
|
[qs(Key, Value, K2, V2, Type) | Acc1],
|
|
Acc2
|
|
)
|
|
end;
|
|
_ ->
|
|
case is_fuzzy_key(Key) of
|
|
true ->
|
|
do_parse_qstring(
|
|
RestQString,
|
|
QSchema,
|
|
Acc1,
|
|
[qs(Key, Value, Type) | Acc2]
|
|
);
|
|
_ ->
|
|
do_parse_qstring(
|
|
RestQString,
|
|
QSchema,
|
|
[qs(Key, Value, Type) | Acc1],
|
|
Acc2
|
|
)
|
|
end
|
|
end
|
|
end.
|
|
|
|
qs(K1, V1, K2, V2, Type) ->
|
|
{Key, Op1, NV1} = qs(K1, V1, Type),
|
|
{Key, Op2, NV2} = qs(K2, V2, Type),
|
|
{Key, Op1, NV1, Op2, NV2}.
|
|
|
|
qs(K, Value0, Type) ->
|
|
try
|
|
qs(K, to_type(Value0, Type))
|
|
catch
|
|
throw:bad_value_type ->
|
|
throw({bad_value_type, {K, Type, Value0}})
|
|
end.
|
|
|
|
qs(<<"gte_", Key/binary>>, Value) ->
|
|
{binary_to_existing_atom(Key, utf8), '>=', Value};
|
|
qs(<<"lte_", Key/binary>>, Value) ->
|
|
{binary_to_existing_atom(Key, utf8), '=<', Value};
|
|
qs(<<"like_", Key/binary>>, Value) ->
|
|
{binary_to_existing_atom(Key, utf8), like, Value};
|
|
qs(<<"match_", Key/binary>>, Value) ->
|
|
{binary_to_existing_atom(Key, utf8), match, Value};
|
|
qs(Key, Value) ->
|
|
{binary_to_existing_atom(Key, utf8), '=:=', Value}.
|
|
|
|
is_fuzzy_key(<<"like_", _/binary>>) ->
|
|
true;
|
|
is_fuzzy_key(<<"match_", _/binary>>) ->
|
|
true;
|
|
is_fuzzy_key(_) ->
|
|
false.
|
|
|
|
page_start(1, _) -> 1;
|
|
page_start(Page, Limit) -> (Page - 1) * Limit + 1.
|
|
|
|
judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) ->
|
|
PageStart = page_start(Page, Limit),
|
|
PageEnd = Page * Limit,
|
|
case Count + Len of
|
|
NCount when NCount < PageStart ->
|
|
{more, Meta#{count => NCount}};
|
|
NCount when NCount < PageEnd ->
|
|
{cutrows, Meta#{count => NCount}};
|
|
NCount when NCount >= PageEnd ->
|
|
{enough, Meta#{count => NCount}}
|
|
end.
|
|
|
|
rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) ->
|
|
PageStart = page_start(Page, Limit),
|
|
case (Count - Len) < PageStart of
|
|
true ->
|
|
NeedNowNum = Count - PageStart + 1,
|
|
SubStart = Len - NeedNowNum + 1,
|
|
{SubStart, NeedNowNum};
|
|
false ->
|
|
{_SubStart = 1, _NeedNowNum = Len}
|
|
end.
|
|
|
|
page_limit_check_query(Meta, {F, A}) ->
|
|
case Meta of
|
|
#{page := Page, limit := Limit} when
|
|
Page < 1; Limit < 1
|
|
->
|
|
{error, page_limit_invalid};
|
|
_ ->
|
|
erlang:apply(F, A)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Types
|
|
%%--------------------------------------------------------------------
|
|
|
|
to_type(V, TargetType) ->
|
|
try
|
|
to_type_(V, TargetType)
|
|
catch
|
|
_:_ ->
|
|
throw(bad_value_type)
|
|
end.
|
|
|
|
to_type_(V, atom) -> to_atom(V);
|
|
to_type_(V, integer) -> to_integer(V);
|
|
to_type_(V, timestamp) -> to_timestamp(V);
|
|
to_type_(V, ip) -> aton(V);
|
|
to_type_(V, ip_port) -> to_ip_port(V);
|
|
to_type_(V, _) -> V.
|
|
|
|
to_atom(A) when is_atom(A) ->
|
|
A;
|
|
to_atom(B) when is_binary(B) ->
|
|
binary_to_atom(B, utf8).
|
|
|
|
to_integer(I) when is_integer(I) ->
|
|
I;
|
|
to_integer(B) when is_binary(B) ->
|
|
binary_to_integer(B).
|
|
|
|
to_timestamp(I) when is_integer(I) ->
|
|
I;
|
|
to_timestamp(B) when is_binary(B) ->
|
|
binary_to_integer(B).
|
|
|
|
aton(B) when is_binary(B) ->
|
|
list_to_tuple([binary_to_integer(T) || T <- re:split(B, "[.]")]).
|
|
|
|
to_ip_port(IPAddress) ->
|
|
[IP0, Port0] = string:tokens(binary_to_list(IPAddress), ":"),
|
|
{ok, IP} = inet:parse_address(IP0),
|
|
Port = list_to_integer(Port0),
|
|
{IP, Port}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% EUnits
|
|
%%--------------------------------------------------------------------
|
|
|
|
-ifdef(TEST).
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
params2qs_test() ->
|
|
QSchema = [
|
|
{<<"str">>, binary},
|
|
{<<"int">>, integer},
|
|
{<<"atom">>, atom},
|
|
{<<"ts">>, timestamp},
|
|
{<<"gte_range">>, integer},
|
|
{<<"lte_range">>, integer},
|
|
{<<"like_fuzzy">>, binary},
|
|
{<<"match_topic">>, binary}
|
|
],
|
|
QString = [
|
|
{<<"str">>, <<"abc">>},
|
|
{<<"int">>, <<"123">>},
|
|
{<<"atom">>, <<"connected">>},
|
|
{<<"ts">>, <<"156000">>},
|
|
{<<"gte_range">>, <<"1">>},
|
|
{<<"lte_range">>, <<"5">>},
|
|
{<<"like_fuzzy">>, <<"user">>},
|
|
{<<"match_topic">>, <<"t/#">>}
|
|
],
|
|
ExpectedQs = [
|
|
{str, '=:=', <<"abc">>},
|
|
{int, '=:=', 123},
|
|
{atom, '=:=', connected},
|
|
{ts, '=:=', 156000},
|
|
{range, '>=', 1, '=<', 5}
|
|
],
|
|
FuzzyNQString = [
|
|
{fuzzy, like, <<"user">>},
|
|
{topic, match, <<"t/#">>}
|
|
],
|
|
?assertEqual({7, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)),
|
|
|
|
{0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
|
|
|
|
-endif.
|
|
|
|
b2i(Bin) when is_binary(Bin) ->
|
|
binary_to_integer(Bin);
|
|
b2i(Any) ->
|
|
Any.
|