refactor(mgmt): `Params` => `QString`, `QsScheam` => `QSchema`

This commit is contained in:
JimMoen 2022-03-04 10:51:38 +08:00
parent f8073002b6
commit f25da61f39
6 changed files with 114 additions and 107 deletions

View File

@ -66,7 +66,7 @@ paths() ->
, "/gateway/:name/clients/:clientid/subscriptions/:topic"
].
-define(CLIENT_QS_SCHEMA,
-define(CLIENT_QSCHEMA,
[ {<<"node">>, atom}
, {<<"clientid">>, binary}
, {<<"username">>, binary}
@ -90,22 +90,22 @@ paths() ->
-define(QUERY_FUN, {?MODULE, query}).
clients(get, #{ bindings := #{name := Name0}
, query_string := Params
, query_string := QString
}) ->
with_gateway(Name0, fun(GwName, _) ->
TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Params, undefined) of
case maps:get(<<"node">>, QString, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(
Params, TabName,
?CLIENT_QS_SCHEMA, ?QUERY_FUN),
QString, TabName,
?CLIENT_QSCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response);
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Params),
QStringWithoutNode = maps:without([<<"node">>], QString),
Response = emqx_mgmt_api:node_query(
Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?QUERY_FUN),
Node, QStringWithoutNode,
TabName, ?CLIENT_QSCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response)
end
end).

View File

@ -132,19 +132,20 @@ init_meta(Params) ->
%% Node Query
%%--------------------------------------------------------------------
node_query(Node, Params, Tab, QsSchema, QueryFun) ->
{_CodCnt, Qs} = params2qs(Params, QsSchema),
page_limit_check_query(init_meta(Params),
{fun do_node_query/5, [Node, Tab, Qs, QueryFun, init_meta(Params)]}).
node_query(Node, QString, Tab, QSchema, QueryFun) ->
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
page_limit_check_query( init_meta(QString)
, { fun do_node_query/5
, [Node, Tab, NQString, QueryFun, init_meta(QString)]}).
%% @private
do_node_query(Node, Tab, Qs, QueryFun, Meta) ->
do_node_query(Node, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
do_node_query(Node, Tab, QString, QueryFun, Meta) ->
do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
do_node_query( Node, Tab, Qs, QueryFun, Continuation
do_node_query( Node, Tab, QString, QueryFun, Continuation
, Meta = #{limit := Limit}
, Results) ->
case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of
case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
{error, {badrpc, R}} ->
{error, Node, {badrpc, R}};
{Len, Rows, ?FRESH_SELECT} ->
@ -152,36 +153,38 @@ do_node_query( Node, Tab, Qs, QueryFun, Continuation
#{meta => NMeta, data => NResults};
{Len, Rows, NContinuation} ->
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
end.
%%--------------------------------------------------------------------
%% Cluster Query
%%--------------------------------------------------------------------
cluster_query(Params, Tab, QsSchema, QueryFun) ->
{_CodCnt, Qs} = params2qs(Params, QsSchema),
cluster_query(QString, Tab, QSchema, QueryFun) ->
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
Nodes = mria_mnesia:running_nodes(),
page_limit_check_query(init_meta(Params),
{fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, init_meta(Params)]}).
page_limit_check_query(
init_meta(QString)
, {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]}).
%% @private
do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) ->
do_cluster_query(Nodes, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) ->
do_cluster_query( Nodes, Tab, QString, QueryFun
, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, Meta, Results) ->
do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) ->
#{meta => Meta, data => Results};
do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation,
do_cluster_query([Node | Tail] = Nodes, Tab, QString, QueryFun, Continuation,
Meta = #{limit := Limit}, Results) ->
case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of
case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
{error, {badrpc, R}} ->
{error, Node, {bar_rpc, R}};
{Len, Rows, ?FRESH_SELECT} ->
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
do_cluster_query(Tail, Tab, Qs, QueryFun, ?FRESH_SELECT, NMeta, NResults);
do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults);
{Len, Rows, NContinuation} ->
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
end.
%%--------------------------------------------------------------------
@ -189,11 +192,11 @@ do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation,
%%--------------------------------------------------------------------
%% @private This function is exempt from BPAPI
do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() ->
erlang:apply(M, F, [Tab, Qs, Continuation, Limit]);
do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) ->
do_query(Node, Tab, QString, {M,F}, Continuation, Limit) when Node =:= node() ->
erlang:apply(M, F, [Tab, QString, Continuation, Limit]);
do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
case rpc:call(Node, ?MODULE, do_query,
[Node, Tab, Qs, QueryFun, Continuation, Limit], 50000) of
[Node, Tab, QString, QueryFun, Continuation, Limit], 50000) of
{badrpc, _} = R -> {error, R};
Ret -> Ret
end.
@ -255,19 +258,19 @@ select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) ->
%% Internal Functions
%%--------------------------------------------------------------------
params2qs(Params, QsSchema) when is_map(Params) ->
params2qs(maps:to_list(Params), QsSchema);
params2qs(Params, QsSchema) ->
{Qs, Fuzzy} = pick_params_to_qs(Params, QsSchema, [], []),
{length(Qs) + length(Fuzzy), {Qs, Fuzzy}}.
parse_qstring(QString, QSchema) when is_map(QString) ->
parse_qstring(maps:to_list(QString), QSchema);
parse_qstring(QString, QSchema) ->
{NQString, FuzzyQString} = do_parse_qstring(QString, QSchema, [], []),
{length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}.
pick_params_to_qs([], _, Acc1, Acc2) ->
do_parse_qstring([], _, Acc1, Acc2) ->
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
{lists:reverse(Acc1), lists:reverse(NAcc2)};
pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) ->
case proplists:get_value(Key, QsSchema) of
undefined -> pick_params_to_qs(Params, QsSchema, Acc1, Acc2);
do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) ->
case proplists:get_value(Key, QSchema) of
undefined -> do_parse_qstring(RestQString, QSchema, Acc1, Acc2);
Type ->
case Key of
<<Prefix:4/binary, NKey/binary>>
@ -277,22 +280,22 @@ pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) ->
<<"gte_">> -> <<"lte_", NKey/binary>>;
<<"lte_">> -> <<"gte_", NKey/binary>>
end,
case lists:keytake(OpposeKey, 1, Params) of
case lists:keytake(OpposeKey, 1, RestQString) of
false ->
pick_params_to_qs(Params, QsSchema,
[qs(Key, Value, Type) | Acc1], Acc2);
do_parse_qstring( RestQString, QSchema
, [qs(Key, Value, Type) | Acc1], Acc2);
{value, {K2, V2}, NParams} ->
pick_params_to_qs(NParams, QsSchema,
[qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
do_parse_qstring( NParams, QSchema
, [qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
end;
_ ->
case is_fuzzy_key(Key) of
true ->
pick_params_to_qs(Params, QsSchema, Acc1,
[qs(Key, Value, Type) | Acc2]);
do_parse_qstring( RestQString, QSchema
, Acc1, [qs(Key, Value, Type) | Acc2]);
_ ->
pick_params_to_qs(Params, QsSchema,
[qs(Key, Value, Type) | Acc1], Acc2)
do_parse_qstring( RestQString, QSchema
, [qs(Key, Value, Type) | Acc1], Acc2)
end
end
@ -416,7 +419,7 @@ to_ip_port(IPAddress) ->
-include_lib("eunit/include/eunit.hrl").
params2qs_test() ->
Schema = [{<<"str">>, binary},
QSchema = [{<<"str">>, binary},
{<<"int">>, integer},
{<<"atom">>, atom},
{<<"ts">>, timestamp},
@ -424,7 +427,7 @@ params2qs_test() ->
{<<"lte_range">>, integer},
{<<"like_fuzzy">>, binary},
{<<"match_topic">>, binary}],
Params = [{<<"str">>, <<"abc">>},
QString = [{<<"str">>, <<"abc">>},
{<<"int">>, <<"123">>},
{<<"atom">>, <<"connected">>},
{<<"ts">>, <<"156000">>},
@ -438,11 +441,11 @@ params2qs_test() ->
{ts, '=:=', 156000},
{range, '>=', 1, '=<', 5}
],
FuzzyQs = [{fuzzy, like, <<"user">>},
{topic, match, <<"t/#">>}],
?assertEqual({7, {ExpectedQs, FuzzyQs}}, params2qs(Params, Schema)),
FuzzyNQString = [{fuzzy, like, <<"user">>},
{topic, match, <<"t/#">>}],
?assertEqual({7, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)),
{0, {[], []}} = params2qs([{not_a_predefined_params, val}], Schema).
{0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
-endif.

View File

@ -87,13 +87,13 @@ fields(meta) ->
[{count, hoconsc:mk(integer(), #{example => 1})}].
%%%==============================================================================================
%% parameters trans
alarms(get, #{query_string := Qs}) ->
alarms(get, #{query_string := QString}) ->
Table =
case maps:get(<<"activated">>, Qs, true) of
case maps:get(<<"activated">>, QString, true) of
true -> ?ACTIVATED_ALARM;
false -> ?DEACTIVATED_ALARM
end,
Response = emqx_mgmt_api:cluster_query(Qs, Table, [], {?MODULE, query}),
Response = emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}),
emqx_mgmt_util:generate_response(Response);
alarms(delete, _Params) ->

View File

@ -48,21 +48,23 @@
%% for batch operation
-export([do_subscribe/3]).
-define(CLIENT_QS_SCHEMA, {emqx_channel_info,
[ {<<"node">>, atom}
, {<<"username">>, binary}
, {<<"zone">>, atom}
, {<<"ip_address">>, ip}
, {<<"conn_state">>, atom}
, {<<"clean_start">>, atom}
, {<<"proto_name">>, binary}
, {<<"proto_ver">>, integer}
, {<<"like_clientid">>, binary}
, {<<"like_username">>, binary}
, {<<"gte_created_at">>, timestamp}
, {<<"lte_created_at">>, timestamp}
, {<<"gte_connected_at">>, timestamp}
, {<<"lte_connected_at">>, timestamp}]}).
-define(CLIENT_QTAB, emqx_channel_info).
-define(CLIENT_QSCHEMA,
[ {<<"node">>, atom}
, {<<"username">>, binary}
, {<<"zone">>, atom}
, {<<"ip_address">>, ip}
, {<<"conn_state">>, atom}
, {<<"clean_start">>, atom}
, {<<"proto_name">>, binary}
, {<<"proto_ver">>, integer}
, {<<"like_clientid">>, binary}
, {<<"like_username">>, binary}
, {<<"gte_created_at">>, timestamp}
, {<<"lte_created_at">>, timestamp}
, {<<"gte_connected_at">>, timestamp}
, {<<"lte_connected_at">>, timestamp}]).
-define(QUERY_FUN, {?MODULE, query}).
-define(FORMAT_FUN, {?MODULE, format_channel_info}).
@ -386,8 +388,8 @@ fields(meta) ->
%%%==============================================================================================
%% parameters trans
clients(get, #{query_string := Qs}) ->
list_clients(Qs).
clients(get, #{query_string := QString}) ->
list_clients(QString).
client(get, #{bindings := Bindings}) ->
lookup(Bindings);
@ -451,18 +453,17 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) ->
%%%==============================================================================================
%% api apply
list_clients(Params) ->
{Tab, QuerySchema} = ?CLIENT_QS_SCHEMA,
case maps:get(<<"node">>, Params, undefined) of
list_clients(QString) ->
case maps:get(<<"node">>, QString, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(Params, Tab,
QuerySchema, ?QUERY_FUN),
Response = emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB,
?CLIENT_QSCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response);
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Params),
Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode,
Tab, QuerySchema, ?QUERY_FUN),
QStringWithoutNode = maps:without([<<"node">>], QString),
Response = emqx_mgmt_api:node_query(Node, QStringWithoutNode,
?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response)
end.
@ -566,14 +567,14 @@ do_unsubscribe(ClientID, Topic) ->
%%--------------------------------------------------------------------
%% Query Functions
query(Tab, {Qs, []}, Continuation, Limit) ->
Ms = qs2ms(Qs),
query(Tab, {QString, []}, Continuation, Limit) ->
Ms = qs2ms(QString),
emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit,
fun format_channel_info/1);
query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
Ms = qs2ms(Qs),
FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
Ms = qs2ms(QString),
FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit,
fun format_channel_info/1).

View File

@ -30,7 +30,7 @@
-define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
-define(ROUTES_QS_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]).
-define(ROUTES_QSCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]).
-import(emqx_mgmt_util, [ object_schema/2
, object_array_schema/2
@ -79,16 +79,17 @@ route_api() ->
%%%==============================================================================================
%% parameters trans
routes(get, #{query_string := Qs}) ->
list(generate_topic(Qs)).
routes(get, #{query_string := QString}) ->
list(generate_topic(QString)).
route(get, #{bindings := Bindings}) ->
lookup(generate_topic(Bindings)).
%%%==============================================================================================
%% api apply
list(Params) ->
Response = emqx_mgmt_api:node_query(node(), Params, emqx_route, ?ROUTES_QS_SCHEMA, {?MODULE, query}),
list(QString) ->
Response = emqx_mgmt_api:node_query(
node(), QString, emqx_route, ?ROUTES_QSCHEMA, {?MODULE, query}),
generate_response(Response).
lookup(#{topic := Topic}) ->

View File

@ -33,16 +33,17 @@
, format/1
]).
-define(SUBS_QS_SCHEMA, {emqx_suboption,
-define(SUBS_QTABLE, emqx_suboption).
-define(SUBS_QSCHEMA,
[ {<<"clientid">>, binary}
, {<<"topic">>, binary}
, {<<"share">>, binary}
, {<<"share_group">>, binary}
, {<<"qos">>, integer}
, {<<"match_topic">>, binary}]}).
, {<<"match_topic">>, binary}]).
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format}).
-define(QUERY_FUN, {?MODULE, query}).
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -111,16 +112,15 @@ parameters() ->
}
].
subscriptions(get, #{query_string := Params}) ->
{Tab, QuerySchema} = ?SUBS_QS_SCHEMA,
case maps:get(<<"node">>, Params, undefined) of
subscriptions(get, #{query_string := QString}) ->
case maps:get(<<"node">>, QString, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(Params, Tab,
QuerySchema, ?query_fun),
Response = emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE,
?SUBS_QSCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response);
Node ->
Response = emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params,
Tab, QuerySchema, ?query_fun),
Response = emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), QString,
?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN),
emqx_mgmt_util:generate_response(Response)
end.
@ -153,12 +153,14 @@ format({_Subscriber, Topic, Options}) ->
query(Tab, {Qs, []}, Continuation, Limit) ->
Ms = qs2ms(Qs),
emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format/1);
emqx_mgmt_api:select_table_with_count( Tab, Ms
, Continuation, Limit, fun format/1);
query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
Ms = qs2ms(Qs),
FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit, fun format/1).
emqx_mgmt_api:select_table_with_count( Tab, {Ms, FuzzyFilterFun}
, Continuation, Limit, fun format/1).
fuzzy_filter_fun(Fuzzy) ->
fun(MsRaws) when is_list(MsRaws) ->