From f25da61f39df35d71b9193cab7c2a31c4020c69d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 4 Mar 2022 10:51:38 +0800 Subject: [PATCH] refactor(mgmt): `Params` => `QString`, `QsScheam` => `QSchema` --- .../src/emqx_gateway_api_clients.erl | 16 +-- apps/emqx_management/src/emqx_mgmt_api.erl | 99 ++++++++++--------- .../src/emqx_mgmt_api_alarms.erl | 6 +- .../src/emqx_mgmt_api_clients.erl | 61 ++++++------ .../src/emqx_mgmt_api_routes.erl | 11 ++- .../src/emqx_mgmt_api_subscriptions.erl | 28 +++--- 6 files changed, 114 insertions(+), 107 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index e9c0fa879..01f3458b4 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -66,7 +66,7 @@ paths() -> , "/gateway/:name/clients/:clientid/subscriptions/:topic" ]. --define(CLIENT_QS_SCHEMA, +-define(CLIENT_QSCHEMA, [ {<<"node">>, atom} , {<<"clientid">>, binary} , {<<"username">>, binary} @@ -90,22 +90,22 @@ paths() -> -define(QUERY_FUN, {?MODULE, query}). clients(get, #{ bindings := #{name := Name0} - , query_string := Params + , query_string := QString }) -> with_gateway(Name0, fun(GwName, _) -> TabName = emqx_gateway_cm:tabname(info, GwName), - case maps:get(<<"node">>, Params, undefined) of + case maps:get(<<"node">>, QString, undefined) of undefined -> Response = emqx_mgmt_api:cluster_query( - Params, TabName, - ?CLIENT_QS_SCHEMA, ?QUERY_FUN), + QString, TabName, + ?CLIENT_QSCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response); Node1 -> Node = binary_to_atom(Node1, utf8), - ParamsWithoutNode = maps:without([<<"node">>], Params), + QStringWithoutNode = maps:without([<<"node">>], QString), Response = emqx_mgmt_api:node_query( - Node, ParamsWithoutNode, - TabName, ?CLIENT_QS_SCHEMA, ?QUERY_FUN), + Node, QStringWithoutNode, + TabName, ?CLIENT_QSCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response) end end). diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index e3ce94dff..36d647f57 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -132,19 +132,20 @@ init_meta(Params) -> %% Node Query %%-------------------------------------------------------------------- -node_query(Node, Params, Tab, QsSchema, QueryFun) -> - {_CodCnt, Qs} = params2qs(Params, QsSchema), - page_limit_check_query(init_meta(Params), - {fun do_node_query/5, [Node, Tab, Qs, QueryFun, init_meta(Params)]}). +node_query(Node, QString, Tab, QSchema, QueryFun) -> + {_CodCnt, NQString} = parse_qstring(QString, QSchema), + page_limit_check_query( init_meta(QString) + , { fun do_node_query/5 + , [Node, Tab, NQString, QueryFun, init_meta(QString)]}). %% @private -do_node_query(Node, Tab, Qs, QueryFun, Meta) -> - do_node_query(Node, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). +do_node_query(Node, Tab, QString, QueryFun, Meta) -> + do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). -do_node_query( Node, Tab, Qs, QueryFun, Continuation +do_node_query( Node, Tab, QString, QueryFun, Continuation , Meta = #{limit := Limit} , Results) -> - case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of + case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; {Len, Rows, ?FRESH_SELECT} -> @@ -152,36 +153,38 @@ do_node_query( Node, Tab, Qs, QueryFun, Continuation #{meta => NMeta, data => NResults}; {Len, Rows, NContinuation} -> {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) + do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults) end. %%-------------------------------------------------------------------- %% Cluster Query %%-------------------------------------------------------------------- -cluster_query(Params, Tab, QsSchema, QueryFun) -> - {_CodCnt, Qs} = params2qs(Params, QsSchema), +cluster_query(QString, Tab, QSchema, QueryFun) -> + {_CodCnt, NQString} = parse_qstring(QString, QSchema), Nodes = mria_mnesia:running_nodes(), - page_limit_check_query(init_meta(Params), - {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, init_meta(Params)]}). + page_limit_check_query( + init_meta(QString) + , {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]}). %% @private -do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) -> - do_cluster_query(Nodes, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). +do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) -> + do_cluster_query( Nodes, Tab, QString, QueryFun + , _Continuation = ?FRESH_SELECT, Meta, _Results = []). -do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, Meta, Results) -> +do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) -> #{meta => Meta, data => Results}; -do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation, +do_cluster_query([Node | Tail] = Nodes, Tab, QString, QueryFun, Continuation, Meta = #{limit := Limit}, Results) -> - case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of + case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of {error, {badrpc, R}} -> {error, Node, {bar_rpc, R}}; {Len, Rows, ?FRESH_SELECT} -> {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_cluster_query(Tail, Tab, Qs, QueryFun, ?FRESH_SELECT, NMeta, NResults); + do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults); {Len, Rows, NContinuation} -> {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) + do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults) end. %%-------------------------------------------------------------------- @@ -189,11 +192,11 @@ do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation, %%-------------------------------------------------------------------- %% @private This function is exempt from BPAPI -do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() -> - erlang:apply(M, F, [Tab, Qs, Continuation, Limit]); -do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) -> +do_query(Node, Tab, QString, {M,F}, Continuation, Limit) when Node =:= node() -> + erlang:apply(M, F, [Tab, QString, Continuation, Limit]); +do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> case rpc:call(Node, ?MODULE, do_query, - [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000) of + [Node, Tab, QString, QueryFun, Continuation, Limit], 50000) of {badrpc, _} = R -> {error, R}; Ret -> Ret end. @@ -255,19 +258,19 @@ select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) -> %% Internal Functions %%-------------------------------------------------------------------- -params2qs(Params, QsSchema) when is_map(Params) -> - params2qs(maps:to_list(Params), QsSchema); -params2qs(Params, QsSchema) -> - {Qs, Fuzzy} = pick_params_to_qs(Params, QsSchema, [], []), - {length(Qs) + length(Fuzzy), {Qs, Fuzzy}}. +parse_qstring(QString, QSchema) when is_map(QString) -> + parse_qstring(maps:to_list(QString), QSchema); +parse_qstring(QString, QSchema) -> + {NQString, FuzzyQString} = do_parse_qstring(QString, QSchema, [], []), + {length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}. -pick_params_to_qs([], _, Acc1, Acc2) -> +do_parse_qstring([], _, Acc1, Acc2) -> NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)], {lists:reverse(Acc1), lists:reverse(NAcc2)}; -pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) -> - case proplists:get_value(Key, QsSchema) of - undefined -> pick_params_to_qs(Params, QsSchema, Acc1, Acc2); +do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) -> + case proplists:get_value(Key, QSchema) of + undefined -> do_parse_qstring(RestQString, QSchema, Acc1, Acc2); Type -> case Key of <> @@ -277,22 +280,22 @@ pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) -> <<"gte_">> -> <<"lte_", NKey/binary>>; <<"lte_">> -> <<"gte_", NKey/binary>> end, - case lists:keytake(OpposeKey, 1, Params) of + case lists:keytake(OpposeKey, 1, RestQString) of false -> - pick_params_to_qs(Params, QsSchema, - [qs(Key, Value, Type) | Acc1], Acc2); + do_parse_qstring( RestQString, QSchema + , [qs(Key, Value, Type) | Acc1], Acc2); {value, {K2, V2}, NParams} -> - pick_params_to_qs(NParams, QsSchema, - [qs(Key, Value, K2, V2, Type) | Acc1], Acc2) + do_parse_qstring( NParams, QSchema + , [qs(Key, Value, K2, V2, Type) | Acc1], Acc2) end; _ -> case is_fuzzy_key(Key) of true -> - pick_params_to_qs(Params, QsSchema, Acc1, - [qs(Key, Value, Type) | Acc2]); + do_parse_qstring( RestQString, QSchema + , Acc1, [qs(Key, Value, Type) | Acc2]); _ -> - pick_params_to_qs(Params, QsSchema, - [qs(Key, Value, Type) | Acc1], Acc2) + do_parse_qstring( RestQString, QSchema + , [qs(Key, Value, Type) | Acc1], Acc2) end end @@ -416,7 +419,7 @@ to_ip_port(IPAddress) -> -include_lib("eunit/include/eunit.hrl"). params2qs_test() -> - Schema = [{<<"str">>, binary}, + QSchema = [{<<"str">>, binary}, {<<"int">>, integer}, {<<"atom">>, atom}, {<<"ts">>, timestamp}, @@ -424,7 +427,7 @@ params2qs_test() -> {<<"lte_range">>, integer}, {<<"like_fuzzy">>, binary}, {<<"match_topic">>, binary}], - Params = [{<<"str">>, <<"abc">>}, + QString = [{<<"str">>, <<"abc">>}, {<<"int">>, <<"123">>}, {<<"atom">>, <<"connected">>}, {<<"ts">>, <<"156000">>}, @@ -438,11 +441,11 @@ params2qs_test() -> {ts, '=:=', 156000}, {range, '>=', 1, '=<', 5} ], - FuzzyQs = [{fuzzy, like, <<"user">>}, - {topic, match, <<"t/#">>}], - ?assertEqual({7, {ExpectedQs, FuzzyQs}}, params2qs(Params, Schema)), + FuzzyNQString = [{fuzzy, like, <<"user">>}, + {topic, match, <<"t/#">>}], + ?assertEqual({7, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)), - {0, {[], []}} = params2qs([{not_a_predefined_params, val}], Schema). + {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema). -endif. diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index 86e9e6905..2a9704bfe 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -87,13 +87,13 @@ fields(meta) -> [{count, hoconsc:mk(integer(), #{example => 1})}]. %%%============================================================================================== %% parameters trans -alarms(get, #{query_string := Qs}) -> +alarms(get, #{query_string := QString}) -> Table = - case maps:get(<<"activated">>, Qs, true) of + case maps:get(<<"activated">>, QString, true) of true -> ?ACTIVATED_ALARM; false -> ?DEACTIVATED_ALARM end, - Response = emqx_mgmt_api:cluster_query(Qs, Table, [], {?MODULE, query}), + Response = emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}), emqx_mgmt_util:generate_response(Response); alarms(delete, _Params) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 72b9a3476..d8c71e80c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -48,21 +48,23 @@ %% for batch operation -export([do_subscribe/3]). --define(CLIENT_QS_SCHEMA, {emqx_channel_info, - [ {<<"node">>, atom} - , {<<"username">>, binary} - , {<<"zone">>, atom} - , {<<"ip_address">>, ip} - , {<<"conn_state">>, atom} - , {<<"clean_start">>, atom} - , {<<"proto_name">>, binary} - , {<<"proto_ver">>, integer} - , {<<"like_clientid">>, binary} - , {<<"like_username">>, binary} - , {<<"gte_created_at">>, timestamp} - , {<<"lte_created_at">>, timestamp} - , {<<"gte_connected_at">>, timestamp} - , {<<"lte_connected_at">>, timestamp}]}). +-define(CLIENT_QTAB, emqx_channel_info). + +-define(CLIENT_QSCHEMA, + [ {<<"node">>, atom} + , {<<"username">>, binary} + , {<<"zone">>, atom} + , {<<"ip_address">>, ip} + , {<<"conn_state">>, atom} + , {<<"clean_start">>, atom} + , {<<"proto_name">>, binary} + , {<<"proto_ver">>, integer} + , {<<"like_clientid">>, binary} + , {<<"like_username">>, binary} + , {<<"gte_created_at">>, timestamp} + , {<<"lte_created_at">>, timestamp} + , {<<"gte_connected_at">>, timestamp} + , {<<"lte_connected_at">>, timestamp}]). -define(QUERY_FUN, {?MODULE, query}). -define(FORMAT_FUN, {?MODULE, format_channel_info}). @@ -386,8 +388,8 @@ fields(meta) -> %%%============================================================================================== %% parameters trans -clients(get, #{query_string := Qs}) -> - list_clients(Qs). +clients(get, #{query_string := QString}) -> + list_clients(QString). client(get, #{bindings := Bindings}) -> lookup(Bindings); @@ -451,18 +453,17 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> %%%============================================================================================== %% api apply -list_clients(Params) -> - {Tab, QuerySchema} = ?CLIENT_QS_SCHEMA, - case maps:get(<<"node">>, Params, undefined) of +list_clients(QString) -> + case maps:get(<<"node">>, QString, undefined) of undefined -> - Response = emqx_mgmt_api:cluster_query(Params, Tab, - QuerySchema, ?QUERY_FUN), + Response = emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB, + ?CLIENT_QSCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response); Node1 -> Node = binary_to_atom(Node1, utf8), - ParamsWithoutNode = maps:without([<<"node">>], Params), - Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode, - Tab, QuerySchema, ?QUERY_FUN), + QStringWithoutNode = maps:without([<<"node">>], QString), + Response = emqx_mgmt_api:node_query(Node, QStringWithoutNode, + ?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response) end. @@ -566,14 +567,14 @@ do_unsubscribe(ClientID, Topic) -> %%-------------------------------------------------------------------- %% Query Functions -query(Tab, {Qs, []}, Continuation, Limit) -> - Ms = qs2ms(Qs), +query(Tab, {QString, []}, Continuation, Limit) -> + Ms = qs2ms(QString), emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format_channel_info/1); -query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> - Ms = qs2ms(Qs), - FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), +query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> + Ms = qs2ms(QString), + FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit, fun format_channel_info/1). diff --git a/apps/emqx_management/src/emqx_mgmt_api_routes.erl b/apps/emqx_management/src/emqx_mgmt_api_routes.erl index 54d372fde..14c38bc6f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_routes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_routes.erl @@ -30,7 +30,7 @@ -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND'). --define(ROUTES_QS_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]). +-define(ROUTES_QSCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]). -import(emqx_mgmt_util, [ object_schema/2 , object_array_schema/2 @@ -79,16 +79,17 @@ route_api() -> %%%============================================================================================== %% parameters trans -routes(get, #{query_string := Qs}) -> - list(generate_topic(Qs)). +routes(get, #{query_string := QString}) -> + list(generate_topic(QString)). route(get, #{bindings := Bindings}) -> lookup(generate_topic(Bindings)). %%%============================================================================================== %% api apply -list(Params) -> - Response = emqx_mgmt_api:node_query(node(), Params, emqx_route, ?ROUTES_QS_SCHEMA, {?MODULE, query}), +list(QString) -> + Response = emqx_mgmt_api:node_query( + node(), QString, emqx_route, ?ROUTES_QSCHEMA, {?MODULE, query}), generate_response(Response). lookup(#{topic := Topic}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index b33db0cd1..92c702043 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -33,16 +33,17 @@ , format/1 ]). --define(SUBS_QS_SCHEMA, {emqx_suboption, +-define(SUBS_QTABLE, emqx_suboption). + +-define(SUBS_QSCHEMA, [ {<<"clientid">>, binary} , {<<"topic">>, binary} , {<<"share">>, binary} , {<<"share_group">>, binary} , {<<"qos">>, integer} - , {<<"match_topic">>, binary}]}). + , {<<"match_topic">>, binary}]). --define(query_fun, {?MODULE, query}). --define(format_fun, {?MODULE, format}). +-define(QUERY_FUN, {?MODULE, query}). api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -111,16 +112,15 @@ parameters() -> } ]. -subscriptions(get, #{query_string := Params}) -> - {Tab, QuerySchema} = ?SUBS_QS_SCHEMA, - case maps:get(<<"node">>, Params, undefined) of +subscriptions(get, #{query_string := QString}) -> + case maps:get(<<"node">>, QString, undefined) of undefined -> - Response = emqx_mgmt_api:cluster_query(Params, Tab, - QuerySchema, ?query_fun), + Response = emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE, + ?SUBS_QSCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response); Node -> - Response = emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, - Tab, QuerySchema, ?query_fun), + Response = emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), QString, + ?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN), emqx_mgmt_util:generate_response(Response) end. @@ -153,12 +153,14 @@ format({_Subscriber, Topic, Options}) -> query(Tab, {Qs, []}, Continuation, Limit) -> Ms = qs2ms(Qs), - emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format/1); + emqx_mgmt_api:select_table_with_count( Tab, Ms + , Continuation, Limit, fun format/1); query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> Ms = qs2ms(Qs), FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), - emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit, fun format/1). + emqx_mgmt_api:select_table_with_count( Tab, {Ms, FuzzyFilterFun} + , Continuation, Limit, fun format/1). fuzzy_filter_fun(Fuzzy) -> fun(MsRaws) when is_list(MsRaws) ->