From 1fe9c105aa6a801346a0ec0e03b710a7c5a57d56 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 16 Nov 2022 15:49:06 +0800 Subject: [PATCH] refactor(mgmt): smplify the node_query/cluster_query implementation --- .../emqx_enhanced_authn_scram_mnesia.erl | 34 +--- .../src/simple_authn/emqx_authn_mnesia.erl | 34 +--- apps/emqx_authz/src/emqx_authz_api_mnesia.erl | 62 ++---- .../src/emqx_gateway_api_clients.erl | 36 ++-- apps/emqx_management/src/emqx_mgmt_api.erl | 177 ++++++++++-------- .../src/emqx_mgmt_api_alarms.erl | 12 +- .../src/emqx_mgmt_api_clients.erl | 38 ++-- .../src/emqx_mgmt_api_subscriptions.erl | 73 +++----- .../src/emqx_mgmt_api_topics.erl | 21 +-- apps/emqx_modules/src/emqx_delayed.erl | 18 +- .../src/emqx_rule_engine_api.erl | 13 +- 11 files changed, 217 insertions(+), 301 deletions(-) diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 1fcc00dc9..dfa2f32ef 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -47,7 +47,7 @@ ]). -export([ - query/4, + qs2ms/2, format_user_info/1, group_match_spec/1 ]). @@ -66,7 +66,6 @@ {<<"user_group">>, binary}, {<<"is_superuser">>, atom} ]). --define(QUERY_FUN, {?MODULE, query}). -type user_group() :: binary(). @@ -264,38 +263,23 @@ list_users(QueryString, #{user_group := UserGroup}) -> NQueryString = QueryString#{<<"user_group">> => UserGroup}, emqx_mgmt_api:node_query( node(), - NQueryString, ?TAB, + NQueryString, ?AUTHN_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_user_info/1 ). %%-------------------------------------------------------------------- -%% Query Functions +%% QueryString to MatchSpec -query(Tab, {QString, []}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit - ); -query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit - ). - -%%-------------------------------------------------------------------- -%% Match funcs +-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +qs2ms(_Tab, {QString, Fuzzy}) -> + {ms_from_qstring(QString), fuzzy_filter_fun(Fuzzy)}. %% Fuzzy username funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> fun(MsRaws) when is_list(MsRaws) -> lists:filter( diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index f84bfc195..50edb6612 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -49,7 +49,7 @@ ]). -export([ - query/4, + qs2ms/2, format_user_info/1, group_match_spec/1 ]). @@ -84,7 +84,6 @@ {<<"user_group">>, binary}, {<<"is_superuser">>, atom} ]). --define(QUERY_FUN, {?MODULE, query}). %%------------------------------------------------------------------------------ %% Mnesia bootstrap @@ -290,38 +289,23 @@ list_users(QueryString, #{user_group := UserGroup}) -> NQueryString = QueryString#{<<"user_group">> => UserGroup}, emqx_mgmt_api:node_query( node(), - NQueryString, ?TAB, + NQueryString, ?AUTHN_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_user_info/1 ). %%-------------------------------------------------------------------- -%% Query Functions +%% QueryString to MatchSpec -query(Tab, {QString, []}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit - ); -query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit - ). - -%%-------------------------------------------------------------------- -%% Match funcs +-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +qs2ms(_Tab, {QString, FuzzyQString}) -> + {ms_from_qstring(QString), fuzzy_filter_fun(FuzzyQString)}. %% Fuzzy username funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> fun(MsRaws) when is_list(MsRaws) -> lists:filter( diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index 8edd62e21..1ab7feea8 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -24,8 +24,8 @@ -import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1, enum/1]). --define(QUERY_USERNAME_FUN, {?MODULE, query_username}). --define(QUERY_CLIENTID_FUN, {?MODULE, query_clientid}). +-define(QUERY_USERNAME_FUN, fun ?MODULE:query_username/2). +-define(QUERY_CLIENTID_FUN, fun ?MODULE:query_clientid/2). -define(ACL_USERNAME_QSCHEMA, [{<<"like_username">>, binary}]). -define(ACL_CLIENTID_QSCHEMA, [{<<"like_clientid">>, binary}]). @@ -49,12 +49,11 @@ %% query funs -export([ - query_username/4, - query_clientid/4 + query_username/2, + query_clientid/2, + format_result/1 ]). --export([format_result/1]). - -define(BAD_REQUEST, 'BAD_REQUEST'). -define(NOT_FOUND, 'NOT_FOUND'). -define(ALREADY_EXISTS, 'ALREADY_EXISTS'). @@ -405,8 +404,8 @@ users(get, #{query_string := QueryString}) -> case emqx_mgmt_api:node_query( node(), - QueryString, ?ACL_TABLE, + QueryString, ?ACL_USERNAME_QSCHEMA, ?QUERY_USERNAME_FUN, fun ?MODULE:format_result/1 @@ -441,8 +440,8 @@ clients(get, #{query_string := QueryString}) -> case emqx_mgmt_api:node_query( node(), - QueryString, ?ACL_TABLE, + QueryString, ?ACL_CLIENTID_QSCHEMA, ?QUERY_CLIENTID_FUN, fun ?MODULE:format_result/1 @@ -576,48 +575,19 @@ purge(delete, _) -> end. %%-------------------------------------------------------------------- -%% Query Functions +%% QueryString to MatchSpec -query_username(Tab, {_QString, []}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_username_rules(), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit - ); -query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_username_rules(), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit - ). +-spec query_username(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +query_username(_Tab, {_QString, FuzzyQString}) -> + {emqx_authz_mnesia:list_username_rules(), fuzzy_filter_fun(FuzzyQString)}. -query_clientid(Tab, {_QString, []}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_clientid_rules(), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit - ); -query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_clientid_rules(), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit - ). - -%%-------------------------------------------------------------------- -%% Match funcs +-spec query_clientid(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +query_clientid(_Tab, {_QString, FuzzyQString}) -> + {emqx_authz_mnesia:list_clientid_rules(), fuzzy_filter_fun(FuzzyQString)}. %% Fuzzy username funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> fun(MsRaws) when is_list(MsRaws) -> lists:filter( diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index f0fbcf968..ba0f34206 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -55,7 +55,7 @@ %% internal exports (for client query) -export([ - query/4, + qs2ms/2, format_channel_info/1, format_channel_info/2 ]). @@ -98,8 +98,6 @@ paths() -> {<<"lte_lifetime">>, integer} ]). --define(QUERY_FUN, {?MODULE, query}). - clients(get, #{ bindings := #{name := Name0}, query_string := QString @@ -110,10 +108,10 @@ clients(get, #{ case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - QString, TabName, + QString, ?CLIENT_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_channel_info/2 ); Node0 -> @@ -122,10 +120,10 @@ clients(get, #{ QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( Node1, - QStringWithoutNode, TabName, + QStringWithoutNode, ?CLIENT_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_channel_info/2 ); {error, _} -> @@ -267,25 +265,11 @@ extra_sub_props(Props) -> ). %%-------------------------------------------------------------------- -%% query funcs +%% QueryString to MatchSpec -query(Tab, {Qs, []}, Continuation, Limit) -> - Ms = qs2ms(Qs), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit - ); -query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> - Ms = qs2ms(Qs), - FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit - ). +-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +qs2ms(_Tab, {Qs, Fuzzy}) -> + {qs2ms(Qs), fuzzy_filter_fun(Fuzzy)}. qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), @@ -340,6 +324,8 @@ ms(lifetime, X) -> %%-------------------------------------------------------------------- %% Fuzzy filter funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> fun(MsRaws) when is_list(MsRaws) -> lists:filter( diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index cc0a81864..5fd16e2ff 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -31,11 +31,10 @@ -export([ node_query/6, cluster_query/5, - select_table_with_count/4, b2i/1 ]). --export([do_query/6]). +-export([do_query/5]). paginate(Tables, Params, {Module, FormatFun}) -> Qh = query_handle(Tables), @@ -121,15 +120,37 @@ limit(Params) -> %% Node Query %%-------------------------------------------------------------------- -node_query(Node, QString, Tab, QSchema, QueryFun, FmtFun) -> +-type query_params() :: list() | map(). + +-type query_schema() :: [{Key :: binary(), Type :: atom | integer | timestamp | ip | ip_port}]. + +-type query_to_match_spec_fun() :: + fun((list(), list()) -> {ets:match_spec(), fun()}). + +-type format_result_fun() :: + fun((node(), term()) -> term()) + | fun((term()) -> term()). + +-type query_return() :: #{meta := map(), data := [term()]}. + +-spec node_query( + node(), + atom(), + query_params(), + query_schema(), + query_to_match_spec_fun(), + format_result_fun() +) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return(). +node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) -> case parse_pager_params(QString) of false -> {error, page_limit_invalid}; Meta -> {_CodCnt, NQString} = parse_qstring(QString, QSchema), ResultAcc = #{cursor => 0, count => 0, rows => []}, + QueryState = init_query_state(Meta), NResultAcc = do_node_query( - Node, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc + Node, Tab, NQString, MsFun, QueryState, ResultAcc ), format_query_result(FmtFun, Meta, NResultAcc) end. @@ -139,31 +160,36 @@ do_node_query( Node, Tab, QString, - QueryFun, - Continuation, - Meta = #{limit := Limit}, + MsFun, + QueryState, ResultAcc ) -> - case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of + case do_query(Node, Tab, QString, MsFun, QueryState) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; - {Rows, ?FRESH_SELECT} -> - {_, NResultAcc} = accumulate_query_rows(Node, Rows, ResultAcc, Meta), + {Rows, NQueryState = #{continuation := ?FRESH_SELECT}} -> + {_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc), NResultAcc; - {Rows, NContinuation} -> - case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of + {Rows, NQueryState} -> + case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of {enough, NResultAcc} -> NResultAcc; {more, NResultAcc} -> - do_node_query(Node, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc) + do_node_query(Node, Tab, QString, MsFun, NQueryState, NResultAcc) end end. %%-------------------------------------------------------------------- %% Cluster Query %%-------------------------------------------------------------------- - -cluster_query(QString, Tab, QSchema, QueryFun, FmtFun) -> +-spec cluster_query( + atom(), + query_params(), + query_schema(), + query_to_match_spec_fun(), + format_result_fun() +) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return(). +cluster_query(Tab, QString, QSchema, MsFun, FmtFun) -> case parse_pager_params(QString) of false -> {error, page_limit_invalid}; @@ -171,42 +197,38 @@ cluster_query(QString, Tab, QSchema, QueryFun, FmtFun) -> {_CodCnt, NQString} = parse_qstring(QString, QSchema), Nodes = mria_mnesia:running_nodes(), ResultAcc = #{cursor => 0, count => 0, rows => []}, + QueryState = init_query_state(Meta), NResultAcc = do_cluster_query( - Nodes, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc + Nodes, Tab, NQString, MsFun, QueryState, ResultAcc ), format_query_result(FmtFun, Meta, NResultAcc) end. %% @private -do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, _Meta, ResultAcc) -> +do_cluster_query([], _Tab, _QString, _QueryFun, _QueryState, ResultAcc) -> ResultAcc; do_cluster_query( [Node | Tail] = Nodes, Tab, QString, - QueryFun, - Continuation, - Meta = #{limit := Limit}, + MsFun, + QueryState, ResultAcc ) -> - case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of + case do_query(Node, Tab, QString, MsFun, QueryState) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; - {Rows, NContinuation} -> - case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of + {Rows, NQueryState} -> + case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of {enough, NResultAcc} -> NResultAcc; {more, NResultAcc} -> - case NContinuation of - ?FRESH_SELECT -> - do_cluster_query( - Tail, Tab, QString, QueryFun, ?FRESH_SELECT, Meta, NResultAcc - ); - _ -> - do_cluster_query( - Nodes, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc - ) - end + NextNodes = + case NQueryState of + #{continuation := ?FRESH_SELECT} -> Tail; + _ -> Nodes + end, + do_cluster_query(NextNodes, Tab, QString, MsFun, NQueryState, NResultAcc) end end. @@ -214,16 +236,31 @@ do_cluster_query( %% Do Query (or rpc query) %%-------------------------------------------------------------------- +%% QueryState :: +%% #{continuation := ets:continuation(), +%% page := pos_integer(), +%% limit := pos_integer(), +%% total := #{node() := non_neg_integer()} +%% } +init_query_state(_Meta = #{page := Page, limit := Limit}) -> + #{ + continuation => ?FRESH_SELECT, + page => Page, + limit => Limit, + total => [] + }. + %% @private This function is exempt from BPAPI -do_query(Node, Tab, QString, {M, F}, Continuation, Limit) when Node =:= node() -> - erlang:apply(M, F, [Tab, QString, Continuation, Limit]); -do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> +do_query(Node, Tab, QString, MsFun, QueryState) when Node =:= node(), is_function(MsFun) -> + {Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]), + do_select(Tab, Ms, FuzzyFun, QueryState); +do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) -> case rpc:call( Node, ?MODULE, do_query, - [Node, Tab, QString, QueryFun, Continuation, Limit], + [Node, Tab, QString, MsFun, QueryState], 50000 ) of @@ -231,6 +268,31 @@ do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> Ret -> Ret end. +do_select( + Tab, + Ms, + FuzzyFun, + QueryState = #{continuation := Continuation, limit := Limit} +) -> + Result = + case Continuation of + ?FRESH_SELECT -> + ets:select(Tab, Ms, Limit); + _ -> + ets:select(ets:repair_continuation(Continuation, Ms)) + end, + case Result of + '$end_of_table' -> + {[], QueryState#{continuation => ?FRESH_SELECT}}; + {Rows, NContinuation} -> + NRows = + case is_function(FuzzyFun) of + true -> FuzzyFun(Rows); + false -> Rows + end, + {NRows, QueryState#{continuation => NContinuation}} + end. + %% ResultAcc :: #{count := integer(), %% cursor := integer(), %% rows := [{node(), Rows :: list()}] @@ -238,8 +300,8 @@ do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> accumulate_query_rows( Node, Rows, - ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}, - _Meta = #{page := Page, limit := Limit} + _QueryState = #{page := Page, limit := Limit}, + ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc} ) -> PageStart = (Page - 1) * Limit + 1, PageEnd = Page * Limit, @@ -266,43 +328,6 @@ accumulate_query_rows( %% Table Select %%-------------------------------------------------------------------- -select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit) when - is_function(FuzzyFilterFun) andalso Limit > 0 --> - case ets:select(Tab, Ms, Limit) of - '$end_of_table' -> - {[], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - Rows = FuzzyFilterFun(RawResult), - {Rows, NContinuation} - end; -select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit) when - is_function(FuzzyFilterFun) --> - case ets:select(ets:repair_continuation(Continuation, Ms)) of - '$end_of_table' -> - {[], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - Rows = FuzzyFilterFun(RawResult), - {Rows, NContinuation} - end; -select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit) when - Limit > 0 --> - case ets:select(Tab, Ms, Limit) of - '$end_of_table' -> - {[], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - {RawResult, NContinuation} - end; -select_table_with_count(_Tab, Ms, Continuation, _Limit) -> - case ets:select(ets:repair_continuation(Continuation, Ms)) of - '$end_of_table' -> - {[], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - {RawResult, NContinuation} - end. - %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index d574ad4ee..895a5bcf8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -29,7 +29,7 @@ -define(TAGS, [<<"Alarms">>]). %% internal export (for query) --export([query/4]). +-export([qs2ms/2]). api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -114,10 +114,10 @@ alarms(get, #{query_string := QString}) -> end, case emqx_mgmt_api:cluster_query( - QString, Table, + QString, [], - {?MODULE, query}, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_alarm/2 ) of @@ -136,9 +136,9 @@ alarms(delete, _Params) -> %%%============================================================================================== %% internal -query(Table, _QsSpec, Continuation, Limit) -> - Ms = [{'$1', [], ['$1']}], - emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit). +-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +qs2ms(_Tab, {_Qs, _Fuzzy}) -> + {[{'$1', [], ['$1']}], undefined}. format_alarm(WhichNode, Alarm) -> emqx_alarm:format(WhichNode, Alarm). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 56730fe3e..cec9b9889 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -46,7 +46,7 @@ ]). -export([ - query/4, + qs2ms/2, format_channel_info/1, format_channel_info/2 ]). @@ -74,7 +74,6 @@ {<<"lte_connected_at">>, timestamp} ]). --define(QUERY_FUN, {?MODULE, query}). -define(FORMAT_FUN, {?MODULE, format_channel_info}). -define(CLIENT_ID_NOT_FOUND, @@ -643,10 +642,10 @@ list_clients(QString) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - QString, ?CLIENT_QTAB, + QString, ?CLIENT_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_channel_info/2 ); Node0 -> @@ -655,10 +654,10 @@ list_clients(QString) -> QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( Node1, - QStringWithoutNode, ?CLIENT_QTAB, + QStringWithoutNode, ?CLIENT_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_channel_info/2 ); {error, _} -> @@ -783,30 +782,13 @@ do_unsubscribe(ClientID, Topic) -> Res end. -%%-------------------------------------------------------------------- -%% Query Functions - -query(Tab, {QString, []}, Continuation, Limit) -> - Ms = qs2ms(QString), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit - ); -query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> - Ms = qs2ms(QString), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit - ). - %%-------------------------------------------------------------------- %% QueryString to Match Spec +-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +qs2ms(_Tab, {QString, FuzzyQString}) -> + {qs2ms(QString), fuzzy_filter_fun(FuzzyQString)}. + -spec qs2ms(list()) -> ets:match_spec(). qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), @@ -856,6 +838,8 @@ ms(created_at, X) -> %%-------------------------------------------------------------------- %% Match funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> fun(MsRaws) when is_list(MsRaws) -> lists:filter( diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index eb0f83cc0..e4678d641 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -32,7 +32,7 @@ -export([subscriptions/2]). -export([ - query/4, + qs2ms/2, format/2 ]). @@ -47,8 +47,6 @@ {<<"match_topic">>, binary} ]). --define(QUERY_FUN, {?MODULE, query}). - api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -139,10 +137,10 @@ subscriptions(get, #{query_string := QString}) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - QString, ?SUBS_QTABLE, + QString, ?SUBS_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format/2 ); Node0 -> @@ -150,10 +148,10 @@ subscriptions(get, #{query_string := QString}) -> {ok, Node1} -> emqx_mgmt_api:node_query( Node1, - QString, ?SUBS_QTABLE, + QString, ?SUBS_QSCHEMA, - ?QUERY_FUN, + fun ?MODULE:qs2ms/2, fun ?MODULE:format/2 ); {error, _} -> @@ -188,26 +186,30 @@ get_topic(Topic, _) -> Topic. %%-------------------------------------------------------------------- -%% Query Function +%% QueryString to MatchSpec %%-------------------------------------------------------------------- -query(Tab, {Qs, []}, Continuation, Limit) -> - Ms = qs2ms(Qs), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit - ); -query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> - Ms = qs2ms(Qs), - FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit - ). +-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +qs2ms(_Tab, {Qs, Fuzzy}) -> + {gen_match_spec(Qs), fuzzy_filter_fun(Fuzzy)}. + +gen_match_spec(Qs) -> + MtchHead = gen_match_spec(Qs, {{'_', '_'}, #{}}), + [{MtchHead, [], ['$_']}]. + +gen_match_spec([], MtchHead) -> + MtchHead; +gen_match_spec([{Key, '=:=', Value} | More], MtchHead) -> + gen_match_spec(More, update_ms(Key, Value, MtchHead)). + +update_ms(clientid, X, {{Pid, Topic}, Opts}) -> + {{Pid, Topic}, Opts#{subid => X}}; +update_ms(topic, X, {{Pid, _Topic}, Opts}) -> + {{Pid, X}, Opts}; +update_ms(share_group, X, {{Pid, Topic}, Opts}) -> + {{Pid, Topic}, Opts#{share => X}}; +update_ms(qos, X, {{Pid, Topic}, Opts}) -> + {{Pid, Topic}, Opts#{qos => X}}. fuzzy_filter_fun(Fuzzy) -> fun(MsRaws) when is_list(MsRaws) -> @@ -221,24 +223,3 @@ run_fuzzy_filter(_, []) -> true; run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). - -%%-------------------------------------------------------------------- -%% Query String to Match Spec - -qs2ms(Qs) -> - MtchHead = qs2ms(Qs, {{'_', '_'}, #{}}), - [{MtchHead, [], ['$_']}]. - -qs2ms([], MtchHead) -> - MtchHead; -qs2ms([{Key, '=:=', Value} | More], MtchHead) -> - qs2ms(More, update_ms(Key, Value, MtchHead)). - -update_ms(clientid, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{subid => X}}; -update_ms(topic, X, {{Pid, _Topic}, Opts}) -> - {{Pid, X}, Opts}; -update_ms(share_group, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{share => X}}; -update_ms(qos, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{qos => X}}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 3cc2168e7..ca707ba20 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -34,7 +34,7 @@ topic/2 ]). --export([query/4]). +-export([qs2ms/2, format/1]). -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND'). @@ -110,10 +110,10 @@ do_list(Params) -> case emqx_mgmt_api:node_query( node(), - Params, emqx_route, + Params, ?TOPICS_QUERY_SCHEMA, - {?MODULE, query}, + fun ?MODULE:qs2ms/2, fun ?MODULE:format/1 ) of @@ -143,16 +143,15 @@ generate_topic(Params = #{topic := Topic}) -> generate_topic(Params) -> Params. -query(Tab, {Qs, _}, Continuation, Limit) -> - Ms = qs2ms(Qs, [{{route, '_', '_'}, [], ['$_']}]), - emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format/1). +qs2ms(_Tab, {Qs, _}) -> + {gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]), undefined}. -qs2ms([], Res) -> +gen_match_spec([], Res) -> Res; -qs2ms([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> - qs2ms(Qs, [{{route, T, N}, [], ['$_']}]); -qs2ms([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> - qs2ms(Qs, [{{route, T, N}, [], ['$_']}]). +gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> + gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]); +gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> + gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). format(#route{topic = Topic, dest = {_, Node}}) -> #{topic => Topic, node => Node}; diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 44e7a85e3..0d83e65f1 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -56,10 +56,12 @@ get_delayed_message/2, delete_delayed_message/1, delete_delayed_message/2, - cluster_list/1, - cluster_query/4 + cluster_list/1 ]). +%% internal exports +-export([qs2ms/2]). + -export([ post_config_update/5 ]). @@ -168,12 +170,16 @@ list(Params) -> cluster_list(Params) -> %% FIXME: why cluster_query??? emqx_mgmt_api:cluster_query( - Params, ?TAB, [], {?MODULE, cluster_query}, fun ?MODULE:format_delayed/1 + ?TAB, + Params, + [], + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_delayed/1 ). -cluster_query(Table, _QsSpec, Continuation, Limit) -> - Ms = [{'$1', [], ['$1']}], - emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit). +-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. +qs2ms(_Table, {_Qs, _Fuzzy}) -> + {[{'$1', [], ['$1']}], undefined}. format_delayed(Delayed) -> format_delayed(Delayed, false). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 2157b108c..f2d5914d4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -34,7 +34,7 @@ -export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]). %% query callback --export([query/4]). +-export([qs2ms/2, format_rule_resp/1]). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))). -define(ERR_BADARGS(REASON), begin @@ -274,10 +274,10 @@ param_path_id() -> case emqx_mgmt_api:node_query( node(), - QueryString, ?RULE_TAB, + QueryString, ?RULE_QS_SCHEMA, - {?MODULE, query}, + fun ?MODULE:qs2ms/2, fun ?MODULE:format_rule_resp/1 ) of @@ -553,12 +553,9 @@ filter_out_request_body(Conf) -> ], maps:without(ExtraConfs, Conf). -query(Tab, {Qs, Fuzzy}, Start, Limit) -> +qs2ms(_Tab, {Qs, Fuzzy}) -> Ms = qs2ms(), - FuzzyFun = fuzzy_match_fun(Qs, Ms, Fuzzy), - emqx_mgmt_api:select_table_with_count( - Tab, {Ms, FuzzyFun}, Start, Limit - ). + {Ms, fuzzy_match_fun(Qs, Ms, Fuzzy)}. %% rule is not a record, so everything is fuzzy filter. qs2ms() ->