diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index a89063870..7368b442d 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -41,7 +41,8 @@ delete_all_deactivated_alarms/0, get_alarms/0, get_alarms/1, - format/1 + format/1, + format/2 ]). %% gen_server callbacks @@ -169,12 +170,15 @@ get_alarms(activated) -> get_alarms(deactivated) -> gen_server:call(?MODULE, {get_alarms, deactivated}). -format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> +format(Alarm) -> + format(node(), Alarm). + +format(Node, #activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> Now = erlang:system_time(microsecond), %% mnesia db stored microsecond for high frequency alarm %% format for dashboard using millisecond #{ - node => node(), + node => Node, name => Name, message => Message, %% to millisecond @@ -182,7 +186,7 @@ format(#activated_alarm{name = Name, message = Message, activate_at = At, detail activate_at => to_rfc3339(At), details => Details }; -format(#deactivated_alarm{ +format(Node, #deactivated_alarm{ name = Name, message = Message, activate_at = At, @@ -190,7 +194,7 @@ format(#deactivated_alarm{ deactivate_at = DAt }) -> #{ - node => node(), + node => Node, name => Name, message => Message, %% to millisecond diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index d39f43686..5adc7a811 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -650,8 +650,8 @@ init([]) -> TabOpts = [public, {write_concurrency, true}], ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), - ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), - ok = emqx_tables:new(?CHAN_LIVE_TAB, [set, {write_concurrency, true} | TabOpts]), + ok = emqx_tables:new(?CHAN_INFO_TAB, [ordered_set, compressed | TabOpts]), + ok = emqx_tables:new(?CHAN_LIVE_TAB, [ordered_set, {write_concurrency, true} | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), State = #{chan_pmon => emqx_pmon:new()}, {ok, State}. diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index d87258201..69cede2bc 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -62,9 +62,10 @@ %% List of business-layer functions that are exempt from the checks: %% erlfmt-ignore -define(EXEMPTIONS, - "emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are - % passed in the args, it's futile to try to statically - % check it + % Reason: legacy code. A fun and a QC query are + % passed in the args, it's futile to try to statically + % check it + "emqx_mgmt_api:do_query/2, emqx_mgmt_api:collect_total_from_tail_nodes/3" ). -define(XREF, myxref). diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index f195e083c..2d51f6f14 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -519,21 +519,51 @@ ensure_quic_listener(Name, UdpPort) -> %% Clusterisation and multi-node testing %% +-type cluster_spec() :: [node_spec()]. +-type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}. +-type role() :: core | replicant. +-type shortname() :: atom(). +-type nodename() :: atom(). +-type node_opts() :: #{ + %% Need to loaded apps. These apps will be loaded once the node started + load_apps => list(), + %% Need to started apps. It is the first arg passed to emqx_common_test_helpers:start_apps/2 + apps => list(), + %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2 + env_handler => fun((AppName :: atom()) -> term()), + %% Application env preset before calling `emqx_common_test_helpers:start_apps/2` + env => {AppName :: atom(), Key :: atom(), Val :: term()}, + %% Whether to execute `emqx_config:init_load(SchemaMod)` + %% default: true + load_schema => boolean(), + %% Eval by emqx_config:put/2 + conf => [{KeyPath :: list(), Val :: term()}], + %% Fast option to config listener port + %% default rule: + %% - tcp: base_port + %% - ssl: base_port + 1 + %% - ws : base_port + 3 + %% - wss: base_port + 4 + listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}] +}. + +-spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}]. emqx_cluster(Specs) -> emqx_cluster(Specs, #{}). +-spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}]. emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) -> emqx_cluster(Specs, maps:from_list(CommonOpts)); emqx_cluster(Specs0, CommonOpts) -> Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))), Specs = expand_node_specs(Specs1, CommonOpts), - CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], - %% Assign grpc ports: + %% Assign grpc ports GenRpcPorts = maps:from_list([ {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}} || {{_, Name, _}, Num} <- Specs ]), %% Set the default node of the cluster: + CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], JoinTo = case CoreNodes of [First | _] -> First; @@ -554,6 +584,8 @@ emqx_cluster(Specs0, CommonOpts) -> ]. %% Lower level starting API + +-spec start_slave(shortname(), node_opts()) -> nodename(). start_slave(Name, Opts) -> {ok, Node} = ct_slave:start( list_to_atom(atom_to_list(Name) ++ "@" ++ host()), @@ -590,6 +622,7 @@ epmd_path() -> %% Node initialization +-spec setup_node(nodename(), node_opts()) -> ok. setup_node(Node, Opts) when is_list(Opts) -> setup_node(Node, maps:from_list(Opts)); setup_node(Node, Opts) when is_map(Opts) -> diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index cb3fad7dc..ddc1bb464 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -47,7 +47,8 @@ ]). -export([ - query/4, + qs2ms/2, + run_fuzzy_filter/2, format_user_info/1, group_match_spec/1 ]). @@ -66,7 +67,6 @@ {<<"user_group">>, binary}, {<<"is_superuser">>, atom} ]). --define(QUERY_FUN, {?MODULE, query}). -type user_group() :: binary(). @@ -262,42 +262,30 @@ lookup_user(UserID, #{user_group := UserGroup}) -> list_users(QueryString, #{user_group := UserGroup}) -> NQueryString = QueryString#{<<"user_group">> => UserGroup}, - emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN). - -%%-------------------------------------------------------------------- -%% Query Functions - -query(Tab, {QString, []}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit, - fun format_user_info/1 - ); -query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit, - fun format_user_info/1 + emqx_mgmt_api:node_query( + node(), + ?TAB, + NQueryString, + ?AUTHN_QSCHEMA, + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_user_info/1 ). %%-------------------------------------------------------------------- -%% Match funcs +%% QueryString to MatchSpec + +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {QString, Fuzzy}) -> + #{ + match_spec => ms_from_qstring(QString), + fuzzy_fun => fuzzy_filter_fun(Fuzzy) + }. %% Fuzzy username funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> - fun(MsRaws) when is_list(MsRaws) -> - lists:filter( - fun(E) -> run_fuzzy_filter(E, Fuzzy) end, - MsRaws - ) - end. + {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}. run_fuzzy_filter(_, []) -> true; diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index 7276ad428..415d23c25 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -49,7 +49,8 @@ ]). -export([ - query/4, + qs2ms/2, + run_fuzzy_filter/2, format_user_info/1, group_match_spec/1 ]). @@ -84,7 +85,6 @@ {<<"user_group">>, binary}, {<<"is_superuser">>, atom} ]). --define(QUERY_FUN, {?MODULE, query}). %%------------------------------------------------------------------------------ %% Mnesia bootstrap @@ -288,42 +288,30 @@ lookup_user(UserID, #{user_group := UserGroup}) -> list_users(QueryString, #{user_group := UserGroup}) -> NQueryString = QueryString#{<<"user_group">> => UserGroup}, - emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN). - -%%-------------------------------------------------------------------- -%% Query Functions - -query(Tab, {QString, []}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit, - fun format_user_info/1 - ); -query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> - Ms = ms_from_qstring(QString), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit, - fun format_user_info/1 + emqx_mgmt_api:node_query( + node(), + ?TAB, + NQueryString, + ?AUTHN_QSCHEMA, + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_user_info/1 ). %%-------------------------------------------------------------------- -%% Match funcs +%% QueryString to MatchSpec + +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {QString, FuzzyQString}) -> + #{ + match_spec => ms_from_qstring(QString), + fuzzy_fun => fuzzy_filter_fun(FuzzyQString) + }. %% Fuzzy username funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> - fun(MsRaws) when is_list(MsRaws) -> - lists:filter( - fun(E) -> run_fuzzy_filter(E, Fuzzy) end, - MsRaws - ) - end. + {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}. run_fuzzy_filter(_, []) -> true; diff --git a/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl index 2ab9efb1d..d9fa225bb 100644 --- a/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl @@ -213,7 +213,7 @@ t_list_users(_) -> #{ data := [#{is_superuser := false, user_id := <<"u3">>}], - meta := #{page := 1, limit := 20, count := 1} + meta := #{page := 1, limit := 20, count := 0} } = emqx_authn_mnesia:list_users( #{ <<"page">> => 1, diff --git a/apps/emqx_authn/test/emqx_enhanced_authn_scram_mnesia_SUITE.erl b/apps/emqx_authn/test/emqx_enhanced_authn_scram_mnesia_SUITE.erl index 41fa5a38c..a0b5ce980 100644 --- a/apps/emqx_authn/test/emqx_enhanced_authn_scram_mnesia_SUITE.erl +++ b/apps/emqx_authn/test/emqx_enhanced_authn_scram_mnesia_SUITE.erl @@ -319,7 +319,7 @@ t_list_users(_) -> is_superuser := _ } ], - meta := #{page := 1, limit := 3, count := 1} + meta := #{page := 1, limit := 3, count := 0} } = emqx_enhanced_authn_scram_mnesia:list_users( #{ <<"page">> => 1, diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index 6ec14ac3b..e98e8c6b3 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index 609974e98..090f9d1e2 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -24,8 +24,8 @@ -import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1, enum/1]). --define(QUERY_USERNAME_FUN, {?MODULE, query_username}). --define(QUERY_CLIENTID_FUN, {?MODULE, query_clientid}). +-define(QUERY_USERNAME_FUN, fun ?MODULE:query_username/2). +-define(QUERY_CLIENTID_FUN, fun ?MODULE:query_clientid/2). -define(ACL_USERNAME_QSCHEMA, [{<<"like_username">>, binary}]). -define(ACL_CLIENTID_QSCHEMA, [{<<"like_clientid">>, binary}]). @@ -49,12 +49,12 @@ %% query funs -export([ - query_username/4, - query_clientid/4 + query_username/2, + query_clientid/2, + run_fuzzy_filter/2, + format_result/1 ]). --export([format_result/1]). - -define(BAD_REQUEST, 'BAD_REQUEST'). -define(NOT_FOUND, 'NOT_FOUND'). -define(ALREADY_EXISTS, 'ALREADY_EXISTS'). @@ -405,10 +405,11 @@ users(get, #{query_string := QueryString}) -> case emqx_mgmt_api:node_query( node(), - QueryString, ?ACL_TABLE, + QueryString, ?ACL_USERNAME_QSCHEMA, - ?QUERY_USERNAME_FUN + ?QUERY_USERNAME_FUN, + fun ?MODULE:format_result/1 ) of {error, page_limit_invalid} -> @@ -440,10 +441,11 @@ clients(get, #{query_string := QueryString}) -> case emqx_mgmt_api:node_query( node(), - QueryString, ?ACL_TABLE, + QueryString, ?ACL_CLIENTID_QSCHEMA, - ?QUERY_CLIENTID_FUN + ?QUERY_CLIENTID_FUN, + fun ?MODULE:format_result/1 ) of {error, page_limit_invalid} -> @@ -574,59 +576,27 @@ purge(delete, _) -> end. %%-------------------------------------------------------------------- -%% Query Functions +%% QueryString to MatchSpec -query_username(Tab, {_QString, []}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_username_rules(), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit, - fun format_result/1 - ); -query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_username_rules(), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit, - fun format_result/1 - ). +-spec query_username(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +query_username(_Tab, {_QString, FuzzyQString}) -> + #{ + match_spec => emqx_authz_mnesia:list_username_rules(), + fuzzy_fun => fuzzy_filter_fun(FuzzyQString) + }. -query_clientid(Tab, {_QString, []}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_clientid_rules(), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit, - fun format_result/1 - ); -query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> - Ms = emqx_authz_mnesia:list_clientid_rules(), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit, - fun format_result/1 - ). - -%%-------------------------------------------------------------------- -%% Match funcs +-spec query_clientid(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +query_clientid(_Tab, {_QString, FuzzyQString}) -> + #{ + match_spec => emqx_authz_mnesia:list_clientid_rules(), + fuzzy_fun => fuzzy_filter_fun(FuzzyQString) + }. %% Fuzzy username funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> - fun(MsRaws) when is_list(MsRaws) -> - lists:filter( - fun(E) -> run_fuzzy_filter(E, Fuzzy) end, - MsRaws - ) - end. + {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}. run_fuzzy_filter(_, []) -> true; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 5ea2e0773..5af1aee89 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -138,7 +138,12 @@ fields(limit) -> Meta = #{in => query, desc => Desc, default => ?DEFAULT_ROW, example => 50}, [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}]; fields(count) -> - Meta = #{desc => <<"Results count.">>, required => true}, + Desc = << + "Total number of records counted.
" + "Note: this field is 0 when the queryed table is empty, " + "or if the query can not be optimized and requires a full table scan." + >>, + Meta = #{desc => Desc, required => true}, [{count, hoconsc:mk(non_neg_integer(), Meta)}]; fields(meta) -> fields(page) ++ fields(limit) ++ fields(count). diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 5f6cc25b6..d219d07cd 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -55,8 +55,10 @@ %% internal exports (for client query) -export([ - query/4, - format_channel_info/1 + qs2ms/2, + run_fuzzy_filter/2, + format_channel_info/1, + format_channel_info/2 ]). -define(TAGS, [<<"Gateway Clients">>]). @@ -97,8 +99,6 @@ paths() -> {<<"lte_lifetime">>, integer} ]). --define(QUERY_FUN, {?MODULE, query}). - clients(get, #{ bindings := #{name := Name0}, query_string := QString @@ -109,10 +109,11 @@ clients(get, #{ case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - QString, TabName, + QString, ?CLIENT_QSCHEMA, - ?QUERY_FUN + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_channel_info/2 ); Node0 -> case emqx_misc:safe_to_existing_atom(Node0) of @@ -120,10 +121,11 @@ clients(get, #{ QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( Node1, - QStringWithoutNode, TabName, + QStringWithoutNode, ?CLIENT_QSCHEMA, - ?QUERY_FUN + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_channel_info/2 ); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} @@ -264,27 +266,11 @@ extra_sub_props(Props) -> ). %%-------------------------------------------------------------------- -%% query funcs +%% QueryString to MatchSpec -query(Tab, {Qs, []}, Continuation, Limit) -> - Ms = qs2ms(Qs), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit, - fun format_channel_info/1 - ); -query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> - Ms = qs2ms(Qs), - FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit, - fun format_channel_info/1 - ). +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {Qs, Fuzzy}) -> + #{match_spec => qs2ms(Qs), fuzzy_fun => fuzzy_filter_fun(Fuzzy)}. qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), @@ -339,13 +325,10 @@ ms(lifetime, X) -> %%-------------------------------------------------------------------- %% Fuzzy filter funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> - fun(MsRaws) when is_list(MsRaws) -> - lists:filter( - fun(E) -> run_fuzzy_filter(E, Fuzzy) end, - MsRaws - ) - end. + {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}. run_fuzzy_filter(_, []) -> true; @@ -363,8 +346,11 @@ run_fuzzy_filter( %%-------------------------------------------------------------------- %% format funcs -format_channel_info({_, Infos, Stats} = R) -> - Node = maps:get(node, Infos, node()), +format_channel_info(ChannInfo) -> + format_channel_info(node(), ChannInfo). + +format_channel_info(WhichNode, {_, Infos, Stats} = R) -> + Node = maps:get(node, Infos, WhichNode), ClientInfo = maps:get(clientinfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}), SessInfo = maps:get(session, Infos, #{}), diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 342228b19..9f69ed3f0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -21,6 +21,7 @@ -elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]). -define(FRESH_SELECT, fresh_select). +-define(LONG_QUERY_TIMEOUT, 50000). -export([ paginate/3, @@ -29,13 +30,34 @@ %% first_next query APIs -export([ - node_query/5, - cluster_query/4, - select_table_with_count/5, + node_query/6, + cluster_query/5, b2i/1 ]). --export([do_query/6]). +-export_type([ + match_spec_and_filter/0 +]). + +-type query_params() :: list() | map(). + +-type query_schema() :: [ + {Key :: binary(), Type :: atom | binary | integer | timestamp | ip | ip_port} +]. + +-type query_to_match_spec_fun() :: fun((list(), list()) -> match_spec_and_filter()). + +-type match_spec_and_filter() :: #{match_spec := ets:match_spec(), fuzzy_fun := fuzzy_filter_fun()}. + +-type fuzzy_filter_fun() :: undefined | {fun(), list()}. + +-type format_result_fun() :: + fun((node(), term()) -> term()) + | fun((term()) -> term()). + +-type query_return() :: #{meta := map(), data := [term()]}. + +-export([do_query/2, apply_total_query/1]). paginate(Tables, Params, {Module, FormatFun}) -> Qh = query_handle(Tables), @@ -117,171 +139,289 @@ limit(Params) when is_map(Params) -> limit(Params) -> proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()). -init_meta(Params) -> - Limit = b2i(limit(Params)), - Page = b2i(page(Params)), - #{ - page => Page, - limit => Limit, - count => 0 - }. - %%-------------------------------------------------------------------- %% Node Query %%-------------------------------------------------------------------- -node_query(Node, QString, Tab, QSchema, QueryFun) -> - {_CodCnt, NQString} = parse_qstring(QString, QSchema), - page_limit_check_query( - init_meta(QString), - {fun do_node_query/5, [Node, Tab, NQString, QueryFun, init_meta(QString)]} - ). +-spec node_query( + node(), + atom(), + query_params(), + query_schema(), + query_to_match_spec_fun(), + format_result_fun() +) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return(). +node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) -> + case parse_pager_params(QString) of + false -> + {error, page_limit_invalid}; + Meta -> + {_CodCnt, NQString} = parse_qstring(QString, QSchema), + ResultAcc = init_query_result(), + QueryState = init_query_state(Tab, NQString, MsFun, Meta), + NResultAcc = do_node_query( + Node, QueryState, ResultAcc + ), + format_query_result(FmtFun, Meta, NResultAcc) + end. %% @private -do_node_query(Node, Tab, QString, QueryFun, Meta) -> - do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). - do_node_query( Node, - Tab, - QString, - QueryFun, - Continuation, - Meta = #{limit := Limit}, - Results + QueryState, + ResultAcc ) -> - case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of + case do_query(Node, QueryState) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; - {Len, Rows, ?FRESH_SELECT} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - #{meta => NMeta, data => NResults}; - {Len, Rows, NContinuation} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults) + {Rows, NQueryState = #{continuation := ?FRESH_SELECT}} -> + {_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc), + NResultAcc; + {Rows, NQueryState} -> + case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of + {enough, NResultAcc} -> + NResultAcc; + {more, NResultAcc} -> + do_node_query(Node, NQueryState, NResultAcc) + end end. %%-------------------------------------------------------------------- %% Cluster Query %%-------------------------------------------------------------------- - -cluster_query(QString, Tab, QSchema, QueryFun) -> - {_CodCnt, NQString} = parse_qstring(QString, QSchema), - Nodes = mria_mnesia:running_nodes(), - page_limit_check_query( - init_meta(QString), - {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]} - ). +-spec cluster_query( + atom(), + query_params(), + query_schema(), + query_to_match_spec_fun(), + format_result_fun() +) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return(). +cluster_query(Tab, QString, QSchema, MsFun, FmtFun) -> + case parse_pager_params(QString) of + false -> + {error, page_limit_invalid}; + Meta -> + {_CodCnt, NQString} = parse_qstring(QString, QSchema), + Nodes = mria_mnesia:running_nodes(), + ResultAcc = init_query_result(), + QueryState = init_query_state(Tab, NQString, MsFun, Meta), + NResultAcc = do_cluster_query( + Nodes, QueryState, ResultAcc + ), + format_query_result(FmtFun, Meta, NResultAcc) + end. %% @private -do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) -> - do_cluster_query( - Nodes, - Tab, - QString, - QueryFun, - _Continuation = ?FRESH_SELECT, - Meta, - _Results = [] - ). - -do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) -> - #{meta => Meta, data => Results}; +do_cluster_query([], _QueryState, ResultAcc) -> + ResultAcc; do_cluster_query( [Node | Tail] = Nodes, - Tab, - QString, - QueryFun, - Continuation, - Meta = #{limit := Limit}, - Results + QueryState, + ResultAcc ) -> - case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of + case do_query(Node, QueryState) of {error, {badrpc, R}} -> - {error, Node, {bar_rpc, R}}; - {Len, Rows, ?FRESH_SELECT} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults); - {Len, Rows, NContinuation} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults) + {error, Node, {badrpc, R}}; + {Rows, NQueryState} -> + case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of + {enough, NResultAcc} -> + maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc); + {more, NResultAcc} -> + NextNodes = + case NQueryState of + #{continuation := ?FRESH_SELECT} -> Tail; + _ -> Nodes + end, + do_cluster_query(NextNodes, NQueryState, NResultAcc) + end + end. + +maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) -> + ResultAcc; +maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) -> + case counting_total_fun(QueryState) of + false -> + ResultAcc; + _Fun -> + collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) + end. + +collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) -> + %% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node + case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of + {_, [Node | _]} -> + {error, Node, {badrpc, badnode}}; + {ResL0, []} -> + ResL = lists:zip(Nodes, ResL0), + case lists:filter(fun({_, I}) -> not is_integer(I) end, ResL) of + [{Node, {badrpc, Reason}} | _] -> + {error, Node, {badrpc, Reason}}; + [] -> + ResultAcc#{total => ResL ++ TotalAcc} + end end. %%-------------------------------------------------------------------- %% Do Query (or rpc query) %%-------------------------------------------------------------------- +%% QueryState :: +%% #{continuation := ets:continuation(), +%% page := pos_integer(), +%% limit := pos_integer(), +%% total := [{node(), non_neg_integer()}], +%% table := atom(), +%% qs := {Qs, Fuzzy} %% parsed query params +%% msfun := query_to_match_spec_fun() +%% } +init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) -> + #{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]), + %% assert FuzzyFun type + _ = + case FuzzyFun of + undefined -> + ok; + {NamedFun, Args} -> + true = is_list(Args), + {type, external} = erlang:fun_info(NamedFun, type) + end, + #{ + page => Page, + limit => Limit, + table => Tab, + qs => QString, + msfun => MsFun, + mactch_spec => Ms, + fuzzy_fun => FuzzyFun, + total => [], + continuation => ?FRESH_SELECT + }. + %% @private This function is exempt from BPAPI -do_query(Node, Tab, QString, {M, F}, Continuation, Limit) when Node =:= node() -> - erlang:apply(M, F, [Tab, QString, Continuation, Limit]); -do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> +do_query(Node, QueryState) when Node =:= node() -> + do_select(Node, QueryState); +do_query(Node, QueryState) -> case rpc:call( Node, ?MODULE, do_query, - [Node, Tab, QString, QueryFun, Continuation, Limit], - 50000 + [Node, QueryState], + ?LONG_QUERY_TIMEOUT ) of {badrpc, _} = R -> {error, R}; Ret -> Ret end. -sub_query_result(Len, Rows, Limit, Results, Meta) -> - {Flag, NMeta} = judge_page_with_counting(Len, Meta), - NResults = - case Flag of - more -> - []; - cutrows -> - {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), - ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), - lists:sublist(lists:append(Results, ThisRows), SubStart, Limit); - enough -> - lists:sublist(lists:append(Results, Rows), 1, Limit) +do_select( + Node, + QueryState0 = #{ + table := Tab, + mactch_spec := Ms, + fuzzy_fun := FuzzyFun, + continuation := Continuation, + limit := Limit + } +) -> + QueryState = maybe_apply_total_query(Node, QueryState0), + Result = + case Continuation of + ?FRESH_SELECT -> + ets:select(Tab, Ms, Limit); + _ -> + %% XXX: Repair is necessary because we pass Continuation back + %% and forth through the nodes in the `do_cluster_query` + ets:select(ets:repair_continuation(Continuation, Ms)) end, - {NMeta, NResults}. + case Result of + '$end_of_table' -> + {[], QueryState#{continuation => ?FRESH_SELECT}}; + {Rows, NContinuation} -> + NRows = + case FuzzyFun of + undefined -> + Rows; + {FilterFun, Args0} when is_function(FilterFun), is_list(Args0) -> + lists:filter( + fun(E) -> erlang:apply(FilterFun, [E | Args0]) end, + Rows + ) + end, + {NRows, QueryState#{continuation => NContinuation}} + end. -%%-------------------------------------------------------------------- -%% Table Select -%%-------------------------------------------------------------------- +maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) -> + case proplists:get_value(Node, TotalAcc, undefined) of + undefined -> + Total = apply_total_query(QueryState), + QueryState#{total := [{Node, Total} | TotalAcc]}; + _ -> + QueryState + end. -select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit, FmtFun) when - is_function(FuzzyFilterFun) andalso Limit > 0 --> - case ets:select(Tab, Ms, Limit) of - '$end_of_table' -> - {0, [], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - Rows = FuzzyFilterFun(RawResult), - {length(Rows), lists:map(FmtFun, Rows), NContinuation} +apply_total_query(QueryState = #{table := Tab}) -> + case counting_total_fun(QueryState) of + false -> + %% return a fake total number if the query have any conditions + 0; + Fun -> + Fun(Tab) + end. + +counting_total_fun(_QueryState = #{qs := {[], []}}) -> + fun(Tab) -> ets:info(Tab, size) end; +counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) -> + %% XXX: Calculating the total number of data that match a certain + %% condition under a large table is very expensive because the + %% entire ETS table needs to be scanned. + %% + %% XXX: How to optimize it? i.e, using: + [{MatchHead, Conditions, _Return}] = Ms, + CountingMs = [{MatchHead, Conditions, [true]}], + fun(Tab) -> + ets:select_count(Tab, CountingMs) end; -select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit, FmtFun) when - is_function(FuzzyFilterFun) --> - case ets:select(ets:repair_continuation(Continuation, Ms)) of - '$end_of_table' -> - {0, [], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - Rows = FuzzyFilterFun(RawResult), - {length(Rows), lists:map(FmtFun, Rows), NContinuation} - end; -select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit, FmtFun) when - Limit > 0 --> - case ets:select(Tab, Ms, Limit) of - '$end_of_table' -> - {0, [], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - {length(RawResult), lists:map(FmtFun, RawResult), NContinuation} - end; -select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) -> - case ets:select(ets:repair_continuation(Continuation, Ms)) of - '$end_of_table' -> - {0, [], ?FRESH_SELECT}; - {RawResult, NContinuation} -> - {length(RawResult), lists:map(FmtFun, RawResult), NContinuation} +counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined -> + %% XXX: Calculating the total number for a fuzzy searching is very very expensive + %% so it is not supported now + false. + +%% ResultAcc :: #{count := integer(), +%% cursor := integer(), +%% rows := [{node(), Rows :: list()}], +%% total := [{node() => integer()}] +%% } +init_query_result() -> + #{cursor => 0, count => 0, rows => [], total => []}. + +accumulate_query_rows( + Node, + Rows, + _QueryState = #{page := Page, limit := Limit, total := TotalAcc}, + ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc} +) -> + PageStart = (Page - 1) * Limit + 1, + PageEnd = Page * Limit, + Len = length(Rows), + case Cursor + Len of + NCursor when NCursor < PageStart -> + {more, ResultAcc#{cursor => NCursor, total => TotalAcc}}; + NCursor when NCursor < PageEnd -> + {more, ResultAcc#{ + cursor => NCursor, + count => Count + length(Rows), + total => TotalAcc, + rows => [{Node, Rows} | RowsAcc] + }}; + NCursor when NCursor >= PageEnd -> + SubRows = lists:sublist(Rows, Limit - Count), + {enough, ResultAcc#{ + cursor => NCursor, + count => Count + length(SubRows), + total => TotalAcc, + rows => [{Node, SubRows} | RowsAcc] + }} end. %%-------------------------------------------------------------------- @@ -295,6 +435,7 @@ parse_qstring(QString, QSchema) -> {length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}. do_parse_qstring([], _, Acc1, Acc2) -> + %% remove fuzzy keys if present in accurate query NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)], {lists:reverse(Acc1), lists:reverse(NAcc2)}; do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) -> @@ -379,40 +520,41 @@ is_fuzzy_key(<<"match_", _/binary>>) -> is_fuzzy_key(_) -> false. -page_start(1, _) -> 1; -page_start(Page, Limit) -> (Page - 1) * Limit + 1. +format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) -> + Error; +format_query_result( + FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc} +) -> + Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc), + #{ + %% The `count` is used in HTTP API to indicate the total number of + %% queries that can be read + meta => Meta#{count => Total}, + data => lists:flatten( + lists:foldl( + fun({Node, Rows}, Acc) -> + [lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc] + end, + [], + RowsAcc + ) + ) + }. -judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) -> - PageStart = page_start(Page, Limit), - PageEnd = Page * Limit, - case Count + Len of - NCount when NCount < PageStart -> - {more, Meta#{count => NCount}}; - NCount when NCount < PageEnd -> - {cutrows, Meta#{count => NCount}}; - NCount when NCount >= PageEnd -> - {enough, Meta#{count => NCount}} +exec_format_fun(FmtFun, Node, Row) -> + case erlang:fun_info(FmtFun, arity) of + {arity, 1} -> FmtFun(Row); + {arity, 2} -> FmtFun(Node, Row) end. -rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) -> - PageStart = page_start(Page, Limit), - case (Count - Len) < PageStart of +parse_pager_params(Params) -> + Page = b2i(page(Params)), + Limit = b2i(limit(Params)), + case Page > 0 andalso Limit > 0 of true -> - NeedNowNum = Count - PageStart + 1, - SubStart = Len - NeedNowNum + 1, - {SubStart, NeedNowNum}; + #{page => Page, limit => Limit, count => 0}; false -> - {_SubStart = 1, _NeedNowNum = Len} - end. - -page_limit_check_query(Meta, {F, A}) -> - case Meta of - #{page := Page, limit := Limit} when - Page < 1; Limit < 1 - -> - {error, page_limit_invalid}; - _ -> - erlang:apply(F, A) + false end. %%-------------------------------------------------------------------- @@ -458,6 +600,11 @@ to_ip_port(IPAddress) -> Port = list_to_integer(Port0), {IP, Port}. +b2i(Bin) when is_binary(Bin) -> + binary_to_integer(Bin); +b2i(Any) -> + Any. + %%-------------------------------------------------------------------- %% EUnits %%-------------------------------------------------------------------- @@ -502,8 +649,3 @@ params2qs_test() -> {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema). -endif. - -b2i(Bin) when is_binary(Bin) -> - binary_to_integer(Bin); -b2i(Any) -> - Any. diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index 36845e4e7..80f93ffe4 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -24,12 +24,12 @@ -export([api_spec/0, paths/0, schema/1, fields/1]). --export([alarms/2]). +-export([alarms/2, format_alarm/2]). -define(TAGS, [<<"Alarms">>]). %% internal export (for query) --export([query/4]). +-export([qs2ms/2]). api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -112,7 +112,15 @@ alarms(get, #{query_string := QString}) -> true -> ?ACTIVATED_ALARM; false -> ?DEACTIVATED_ALARM end, - case emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}) of + case + emqx_mgmt_api:cluster_query( + Table, + QString, + [], + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_alarm/2 + ) + of {error, page_limit_invalid} -> {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; {error, Node, {badrpc, R}} -> @@ -128,11 +136,9 @@ alarms(delete, _Params) -> %%%============================================================================================== %% internal -query(Table, _QsSpec, Continuation, Limit) -> - Ms = [{'$1', [], ['$1']}], - emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_alarm/1). +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {_Qs, _Fuzzy}) -> + #{match_spec => [{'$1', [], ['$1']}], fuzzy_fun => undefined}. -format_alarm(Alarms) when is_list(Alarms) -> - [emqx_alarm:format(Alarm) || Alarm <- Alarms]; -format_alarm(Alarm) -> - emqx_alarm:format(Alarm). +format_alarm(WhichNode, Alarm) -> + emqx_alarm:format(WhichNode, Alarm). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index f4fe0387f..588031fc1 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -46,8 +46,10 @@ ]). -export([ - query/4, - format_channel_info/1 + qs2ms/2, + run_fuzzy_filter/2, + format_channel_info/1, + format_channel_info/2 ]). %% for batch operation @@ -73,7 +75,6 @@ {<<"lte_connected_at">>, timestamp} ]). --define(QUERY_FUN, {?MODULE, query}). -define(FORMAT_FUN, {?MODULE, format_channel_info}). -define(CLIENT_ID_NOT_FOUND, @@ -642,10 +643,11 @@ list_clients(QString) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - QString, ?CLIENT_QTAB, + QString, ?CLIENT_QSCHEMA, - ?QUERY_FUN + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_channel_info/2 ); Node0 -> case emqx_misc:safe_to_existing_atom(Node0) of @@ -653,10 +655,11 @@ list_clients(QString) -> QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( Node1, - QStringWithoutNode, ?CLIENT_QTAB, + QStringWithoutNode, ?CLIENT_QSCHEMA, - ?QUERY_FUN + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_channel_info/2 ); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} @@ -780,32 +783,16 @@ do_unsubscribe(ClientID, Topic) -> Res end. -%%-------------------------------------------------------------------- -%% Query Functions - -query(Tab, {QString, []}, Continuation, Limit) -> - Ms = qs2ms(QString), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit, - fun format_channel_info/1 - ); -query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> - Ms = qs2ms(QString), - FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit, - fun format_channel_info/1 - ). - %%-------------------------------------------------------------------- %% QueryString to Match Spec +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {QString, FuzzyQString}) -> + #{ + match_spec => qs2ms(QString), + fuzzy_fun => fuzzy_filter_fun(FuzzyQString) + }. + -spec qs2ms(list()) -> ets:match_spec(). qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), @@ -855,13 +842,10 @@ ms(created_at, X) -> %%-------------------------------------------------------------------- %% Match funcs +fuzzy_filter_fun([]) -> + undefined; fuzzy_filter_fun(Fuzzy) -> - fun(MsRaws) when is_list(MsRaws) -> - lists:filter( - fun(E) -> run_fuzzy_filter(E, Fuzzy) end, - MsRaws - ) - end. + {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}. run_fuzzy_filter(_, []) -> true; @@ -876,12 +860,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | %%-------------------------------------------------------------------- %% format funcs -format_channel_info({_, ClientInfo0, ClientStats}) -> - Node = - case ClientInfo0 of - #{node := N} -> N; - _ -> node() - end, +format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) -> + format_channel_info(node(), ChannInfo). + +format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> + Node = maps:get(node, ClientInfo0, WhichNode), ClientInfo1 = emqx_map_lib:deep_remove([conninfo, clientid], ClientInfo0), ClientInfo2 = emqx_map_lib:deep_remove([conninfo, username], ClientInfo1), StatsMap = maps:without( diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 03b833e84..c05878c6d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -32,8 +32,9 @@ -export([subscriptions/2]). -export([ - query/4, - format/1 + qs2ms/2, + run_fuzzy_filter/2, + format/2 ]). -define(SUBS_QTABLE, emqx_suboption). @@ -47,8 +48,6 @@ {<<"match_topic">>, binary} ]). --define(QUERY_FUN, {?MODULE, query}). - api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -139,20 +138,22 @@ subscriptions(get, #{query_string := QString}) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - QString, ?SUBS_QTABLE, + QString, ?SUBS_QSCHEMA, - ?QUERY_FUN + fun ?MODULE:qs2ms/2, + fun ?MODULE:format/2 ); Node0 -> case emqx_misc:safe_to_existing_atom(Node0) of {ok, Node1} -> emqx_mgmt_api:node_query( Node1, - QString, ?SUBS_QTABLE, + QString, ?SUBS_QSCHEMA, - ?QUERY_FUN + fun ?MODULE:qs2ms/2, + fun ?MODULE:format/2 ); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} @@ -168,16 +169,12 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. -format(Items) when is_list(Items) -> - [format(Item) || Item <- Items]; -format({{Subscriber, Topic}, Options}) -> - format({Subscriber, Topic, Options}); -format({_Subscriber, Topic, Options}) -> +format(WhichNode, {{_Subscriber, Topic}, Options}) -> maps:merge( #{ topic => get_topic(Topic, Options), clientid => maps:get(subid, Options), - node => node() + node => WhichNode }, maps:with([qos, nl, rap, rh], Options) ). @@ -190,53 +187,21 @@ get_topic(Topic, _) -> Topic. %%-------------------------------------------------------------------- -%% Query Function +%% QueryString to MatchSpec %%-------------------------------------------------------------------- -query(Tab, {Qs, []}, Continuation, Limit) -> - Ms = qs2ms(Qs), - emqx_mgmt_api:select_table_with_count( - Tab, - Ms, - Continuation, - Limit, - fun format/1 - ); -query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> - Ms = qs2ms(Qs), - FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), - emqx_mgmt_api:select_table_with_count( - Tab, - {Ms, FuzzyFilterFun}, - Continuation, - Limit, - fun format/1 - ). +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {Qs, Fuzzy}) -> + #{match_spec => gen_match_spec(Qs), fuzzy_fun => fuzzy_filter_fun(Fuzzy)}. -fuzzy_filter_fun(Fuzzy) -> - fun(MsRaws) when is_list(MsRaws) -> - lists:filter( - fun(E) -> run_fuzzy_filter(E, Fuzzy) end, - MsRaws - ) - end. - -run_fuzzy_filter(_, []) -> - true; -run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> - emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). - -%%-------------------------------------------------------------------- -%% Query String to Match Spec - -qs2ms(Qs) -> - MtchHead = qs2ms(Qs, {{'_', '_'}, #{}}), +gen_match_spec(Qs) -> + MtchHead = gen_match_spec(Qs, {{'_', '_'}, #{}}), [{MtchHead, [], ['$_']}]. -qs2ms([], MtchHead) -> +gen_match_spec([], MtchHead) -> MtchHead; -qs2ms([{Key, '=:=', Value} | More], MtchHead) -> - qs2ms(More, update_ms(Key, Value, MtchHead)). +gen_match_spec([{Key, '=:=', Value} | More], MtchHead) -> + gen_match_spec(More, update_ms(Key, Value, MtchHead)). update_ms(clientid, X, {{Pid, Topic}, Opts}) -> {{Pid, Topic}, Opts#{subid => X}}; @@ -246,3 +211,13 @@ update_ms(share_group, X, {{Pid, Topic}, Opts}) -> {{Pid, Topic}, Opts#{share => X}}; update_ms(qos, X, {{Pid, Topic}, Opts}) -> {{Pid, Topic}, Opts#{qos => X}}. + +fuzzy_filter_fun([]) -> + undefined; +fuzzy_filter_fun(Fuzzy) -> + {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}. + +run_fuzzy_filter(_, []) -> + true; +run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> + emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index c357855b2..5e2bd1ea0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -34,7 +34,7 @@ topic/2 ]). --export([query/4]). +-export([qs2ms/2, format/1]). -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND'). @@ -109,7 +109,12 @@ topic(get, #{bindings := Bindings}) -> do_list(Params) -> case emqx_mgmt_api:node_query( - node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query} + node(), + emqx_route, + Params, + ?TOPICS_QUERY_SCHEMA, + fun ?MODULE:qs2ms/2, + fun ?MODULE:format/1 ) of {error, page_limit_invalid} -> @@ -138,16 +143,19 @@ generate_topic(Params = #{topic := Topic}) -> generate_topic(Params) -> Params. -query(Tab, {Qs, _}, Continuation, Limit) -> - Ms = qs2ms(Qs, [{{route, '_', '_'}, [], ['$_']}]), - emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format/1). +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {Qs, _}) -> + #{ + match_spec => gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]), + fuzzy_fun => undefined + }. -qs2ms([], Res) -> +gen_match_spec([], Res) -> Res; -qs2ms([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> - qs2ms(Qs, [{{route, T, N}, [], ['$_']}]); -qs2ms([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> - qs2ms(Qs, [{{route, T, N}, [], ['$_']}]). +gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> + gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]); +gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> + gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). format(#route{topic = Topic, dest = {_, Node}}) -> #{topic => Topic, node => Node}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl new file mode 100644 index 000000000..a065b9c83 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -0,0 +1,143 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +%%-------------------------------------------------------------------- +%% setup +%%-------------------------------------------------------------------- + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_) -> + ok. + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_cluster_query(_Config) -> + net_kernel:start(['master@127.0.0.1', longnames]), + ct:timetrap({seconds, 120}), + snabbkaffe:fix_ct_logging(), + [{Name, Opts}, {Name1, Opts1}] = cluster_specs(), + Node1 = emqx_common_test_helpers:start_slave(Name, Opts), + Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1), + try + process_flag(trap_exit, true), + ClientLs1 = [start_emqtt_client(Node1, I, 2883) || I <- lists:seq(1, 10)], + ClientLs2 = [start_emqtt_client(Node2, I, 3883) || I <- lists:seq(1, 10)], + + %% returned list should be the same regardless of which node is requested + {200, ClientsAll} = query_clients(Node1, #{}), + ?assertEqual({200, ClientsAll}, query_clients(Node2, #{})), + ?assertMatch( + #{page := 1, limit := 100, count := 20}, + maps:get(meta, ClientsAll) + ), + ?assertMatch(20, length(maps:get(data, ClientsAll))), + %% query the first page, counting in entire cluster + {200, ClientsPage1} = query_clients(Node1, #{<<"limit">> => 5}), + ?assertMatch( + #{page := 1, limit := 5, count := 20}, + maps:get(meta, ClientsPage1) + ), + ?assertMatch(5, length(maps:get(data, ClientsPage1))), + + %% assert: AllPage = Page1 + Page2 + Page3 + Page4 + %% !!!Note: this equation requires that the queried tables must be ordered_set + {200, ClientsPage2} = query_clients(Node1, #{<<"page">> => 2, <<"limit">> => 5}), + {200, ClientsPage3} = query_clients(Node2, #{<<"page">> => 3, <<"limit">> => 5}), + {200, ClientsPage4} = query_clients(Node1, #{<<"page">> => 4, <<"limit">> => 5}), + GetClientIds = fun(L) -> lists:map(fun(#{clientid := Id}) -> Id end, L) end, + ?assertEqual( + GetClientIds(maps:get(data, ClientsAll)), + GetClientIds( + maps:get(data, ClientsPage1) ++ maps:get(data, ClientsPage2) ++ + maps:get(data, ClientsPage3) ++ maps:get(data, ClientsPage4) + ) + ), + + %% exact match can return non-zero total + {200, ClientsNode1} = query_clients(Node2, #{<<"username">> => <<"corenode1@127.0.0.1">>}), + ?assertMatch( + #{count := 10}, + maps:get(meta, ClientsNode1) + ), + + %% fuzzy searching can't return total + {200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}), + ?assertMatch( + #{count := 0}, + maps:get(meta, ClientsNode2) + ), + ?assertMatch(10, length(maps:get(data, ClientsNode2))), + + _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), + _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2) + after + emqx_common_test_helpers:stop_slave(Node1), + emqx_common_test_helpers:stop_slave(Node2) + end, + ok. + +%%-------------------------------------------------------------------- +%% helpers +%%-------------------------------------------------------------------- + +cluster_specs() -> + Specs = + %% default listeners port + [ + {core, corenode1, #{listener_ports => [{tcp, 2883}]}}, + {core, corenode2, #{listener_ports => [{tcp, 3883}]}} + ], + CommOpts = + [ + {env, [{emqx, boot_modules, all}]}, + {apps, []}, + {conf, [ + {[listeners, ssl, default, enabled], false}, + {[listeners, ws, default, enabled], false}, + {[listeners, wss, default, enabled], false} + ]} + ], + emqx_common_test_helpers:emqx_cluster( + Specs, + CommOpts + ). + +start_emqtt_client(Node0, N, Port) -> + Node = atom_to_binary(Node0), + ClientId = iolist_to_binary([Node, "-", integer_to_binary(N)]), + {ok, C} = emqtt:start_link([{clientid, ClientId}, {username, Node}, {port, Port}]), + {ok, _} = emqtt:connect(C), + C. + +query_clients(Node, Qs0) -> + Qs = maps:merge( + #{<<"page">> => 1, <<"limit">> => 100}, + Qs0 + ), + rpc:call(Node, emqx_mgmt_api_clients, clients, [get, #{query_string => Qs}]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index d1cf4e418..9a6de9938 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -93,6 +93,7 @@ t_subscription_api(_) -> {"match_topic", "t/#"} ]), Headers = emqx_mgmt_api_test_util:auth_header_(), + {ok, ResponseTopic2} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers), DataTopic2 = emqx_json:decode(ResponseTopic2, [return_maps]), Meta2 = maps:get(<<"meta">>, DataTopic2), @@ -114,7 +115,8 @@ t_subscription_api(_) -> MatchMeta = maps:get(<<"meta">>, MatchData), ?assertEqual(1, maps:get(<<"page">>, MatchMeta)), ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)), - ?assertEqual(1, maps:get(<<"count">>, MatchMeta)), + %% count equals 0 in fuzzy searching + ?assertEqual(0, maps:get(<<"count">>, MatchMeta)), MatchSubs = maps:get(<<"data">>, MatchData), ?assertEqual(1, length(MatchSubs)), diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index a06154400..419c17adf 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -31,6 +31,7 @@ end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite(). t_nodes_api(_) -> + Node = atom_to_binary(node(), utf8), Topic = <<"test_topic">>, {ok, Client} = emqtt:start_link(#{ username => <<"routes_username">>, clientid => <<"routes_cid">> @@ -49,11 +50,30 @@ t_nodes_api(_) -> Data = maps:get(<<"data">>, RoutesData), Route = erlang:hd(Data), ?assertEqual(Topic, maps:get(<<"topic">>, Route)), - ?assertEqual(atom_to_binary(node(), utf8), maps:get(<<"node">>, Route)), + ?assertEqual(Node, maps:get(<<"node">>, Route)), + + %% exact match + Topic2 = <<"test_topic_2">>, + {ok, _, _} = emqtt:subscribe(Client, Topic2), + QS = uri_string:compose_query([ + {"topic", Topic2}, + {"node", atom_to_list(node())} + ]), + Headers = emqx_mgmt_api_test_util:auth_header_(), + {ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers), + MatchData = emqx_json:decode(MatchResponse, [return_maps]), + ?assertMatch( + #{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100}, + maps:get(<<"meta">>, MatchData) + ), + ?assertMatch( + [#{<<"topic">> := Topic2, <<"node">> := Node}], + maps:get(<<"data">>, MatchData) + ), %% get topics/:topic RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]), {ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath), RouteData = emqx_json:decode(RouteResponse, [return_maps]), ?assertEqual(Topic, maps:get(<<"topic">>, RouteData)), - ?assertEqual(atom_to_binary(node(), utf8), maps:get(<<"node">>, RouteData)). + ?assertEqual(Node, maps:get(<<"node">>, RouteData)). diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index f511d74d9..241ac5a8a 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -56,16 +56,20 @@ get_delayed_message/2, delete_delayed_message/1, delete_delayed_message/2, - cluster_list/1, - cluster_query/4 + cluster_list/1 +]). + +%% exports for query +-export([ + qs2ms/2, + format_delayed/1, + format_delayed/2 ]). -export([ post_config_update/5 ]). --export([format_delayed/1]). - %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). @@ -166,16 +170,29 @@ list(Params) -> emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). cluster_list(Params) -> - emqx_mgmt_api:cluster_query(Params, ?TAB, [], {?MODULE, cluster_query}). + emqx_mgmt_api:cluster_query( + ?TAB, + Params, + [], + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_delayed/2 + ). -cluster_query(Table, _QsSpec, Continuation, Limit) -> - Ms = [{'$1', [], ['$1']}], - emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_delayed/1). +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Table, {_Qs, _Fuzzy}) -> + #{ + match_spec => [{'$1', [], ['$1']}], + fuzzy_fun => undefined + }. format_delayed(Delayed) -> - format_delayed(Delayed, false). + format_delayed(node(), Delayed). + +format_delayed(WhichNode, Delayed) -> + format_delayed(WhichNode, Delayed, false). format_delayed( + WhichNode, #delayed_message{ key = {ExpectTimeStamp, Id}, delayed = Delayed, @@ -195,7 +212,7 @@ format_delayed( RemainingTime = ExpectTimeStamp - ?NOW, Result = #{ msgid => emqx_guid:to_hexstr(Id), - node => node(), + node => WhichNode, publish_at => PublishTime, delayed_interval => Delayed, delayed_remaining => RemainingTime div 1000, @@ -222,7 +239,7 @@ get_delayed_message(Id) -> {error, not_found}; Rows -> Message = hd(Rows), - {ok, format_delayed(Message, true)} + {ok, format_delayed(node(), Message, true)} end. get_delayed_message(Node, Id) when Node =:= node() -> diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index e2a142a99..29a7d0362 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_modules, [ {description, "EMQX Modules"}, - {vsn, "5.0.6"}, + {vsn, "5.0.7"}, {modules, []}, {applications, [kernel, stdlib, emqx]}, {mod, {emqx_modules_app, []}}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 597ee838f..01d819a69 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -34,7 +34,7 @@ -export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]). %% query callback --export([query/4]). +-export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))). -define(ERR_BADARGS(REASON), begin @@ -274,10 +274,11 @@ param_path_id() -> case emqx_mgmt_api:node_query( node(), - QueryString, ?RULE_TAB, + QueryString, ?RULE_QS_SCHEMA, - {?MODULE, query} + fun ?MODULE:qs2ms/2, + fun ?MODULE:format_rule_resp/1 ) of {error, page_limit_invalid} -> @@ -552,38 +553,40 @@ filter_out_request_body(Conf) -> ], maps:without(ExtraConfs, Conf). -query(Tab, {Qs, Fuzzy}, Start, Limit) -> - Ms = qs2ms(), - FuzzyFun = fuzzy_match_fun(Qs, Ms, Fuzzy), - emqx_mgmt_api:select_table_with_count( - Tab, {Ms, FuzzyFun}, Start, Limit, fun format_rule_resp/1 - ). - -%% rule is not a record, so everything is fuzzy filter. -qs2ms() -> - [{'_', [], ['$_']}]. - -fuzzy_match_fun(Qs, Ms, Fuzzy) -> - MsC = ets:match_spec_compile(Ms), - fun(Rows) -> - Ls = ets:match_spec_run(Rows, MsC), - lists:filter( - fun(E) -> - run_qs_match(E, Qs) andalso - run_fuzzy_match(E, Fuzzy) - end, - Ls - ) +-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). +qs2ms(_Tab, {Qs, Fuzzy}) -> + case lists:keytake(from, 1, Qs) of + false -> + #{match_spec => generate_match_spec(Qs), fuzzy_fun => fuzzy_match_fun(Fuzzy)}; + {value, {from, '=:=', From}, Ls} -> + #{ + match_spec => generate_match_spec(Ls), + fuzzy_fun => fuzzy_match_fun([{from, '=:=', From} | Fuzzy]) + } end. -run_qs_match(_, []) -> - true; -run_qs_match(E = {_Id, #{enable := Enable}}, [{enable, '=:=', Pattern} | Qs]) -> - Enable =:= Pattern andalso run_qs_match(E, Qs); -run_qs_match(E = {_Id, #{from := From}}, [{from, '=:=', Pattern} | Qs]) -> - lists:member(Pattern, From) andalso run_qs_match(E, Qs); -run_qs_match(E, [_ | Qs]) -> - run_qs_match(E, Qs). +generate_match_spec(Qs) -> + {MtchHead, Conds} = generate_match_spec(Qs, 2, {#{}, []}), + [{{'_', MtchHead}, Conds, ['$_']}]. + +generate_match_spec([], _, {MtchHead, Conds}) -> + {MtchHead, lists:reverse(Conds)}; +generate_match_spec([Qs | Rest], N, {MtchHead, Conds}) -> + Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), + NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), + NConds = put_conds(Qs, Holder, Conds), + generate_match_spec(Rest, N + 1, {NMtchHead, NConds}). + +put_conds({_, Op, V}, Holder, Conds) -> + [{Op, Holder, V} | Conds]. + +ms(enable, X) -> + #{enable => X}. + +fuzzy_match_fun([]) -> + undefined; +fuzzy_match_fun(Fuzzy) -> + {fun ?MODULE:run_fuzzy_match/2, [Fuzzy]}. run_fuzzy_match(_, []) -> true; @@ -591,6 +594,8 @@ run_fuzzy_match(E = {Id, _}, [{id, like, Pattern} | Fuzzy]) -> binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); run_fuzzy_match(E = {_Id, #{description := Desc}}, [{description, like, Pattern} | Fuzzy]) -> binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); +run_fuzzy_match(E = {_, #{from := Topics}}, [{from, '=:=', Pattern} | Fuzzy]) -> + lists:member(Pattern, Topics) /= false andalso run_fuzzy_match(E, Fuzzy); run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, match, Pattern} | Fuzzy]) -> lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) andalso run_fuzzy_match(E, Fuzzy); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index da4e299f9..93912dd6c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -133,23 +133,23 @@ t_list_rule_api(_Config) -> QueryStr2 = #{query_string => #{<<"like_description">> => <<"也能"/utf8>>}}, {200, Result2} = emqx_rule_engine_api:'/rules'(get, QueryStr2), - ?assertEqual(Result1, Result2), + ?assertEqual(maps:get(data, Result1), maps:get(data, Result2)), QueryStr3 = #{query_string => #{<<"from">> => <<"t/1">>}}, - {200, #{meta := #{count := Count3}}} = emqx_rule_engine_api:'/rules'(get, QueryStr3), - ?assertEqual(19, Count3), + {200, #{data := Data3}} = emqx_rule_engine_api:'/rules'(get, QueryStr3), + ?assertEqual(19, length(Data3)), QueryStr4 = #{query_string => #{<<"like_from">> => <<"t/1/+">>}}, {200, Result4} = emqx_rule_engine_api:'/rules'(get, QueryStr4), - ?assertEqual(Result1, Result4), + ?assertEqual(maps:get(data, Result1), maps:get(data, Result4)), QueryStr5 = #{query_string => #{<<"match_from">> => <<"t/+/+">>}}, {200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5), - ?assertEqual(Result1, Result5), + ?assertEqual(maps:get(data, Result1), maps:get(data, Result5)), QueryStr6 = #{query_string => #{<<"like_id">> => RuleID}}, {200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6), - ?assertEqual(Result1, Result6), + ?assertEqual(maps:get(data, Result1), maps:get(data, Result6)), %% clean up lists:foreach( diff --git a/changes/v5.0.11-en.md b/changes/v5.0.11-en.md index 5329be024..4b35aa5f4 100644 --- a/changes/v5.0.11-en.md +++ b/changes/v5.0.11-en.md @@ -27,6 +27,8 @@ - Support message properties in `/publish` API [#9401](https://github.com/emqx/emqx/pull/9401). +- Optimize client query performance for HTTP APIs [#9374](https://github.com/emqx/emqx/pull/9374). + ## Bug fixes - Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307). diff --git a/changes/v5.0.11-zh.md b/changes/v5.0.11-zh.md index e0c82b022..ab9f5a0d4 100644 --- a/changes/v5.0.11-zh.md +++ b/changes/v5.0.11-zh.md @@ -25,6 +25,8 @@ - 支持在 /publish API 中添加消息属性 [#9401](https://github.com/emqx/emqx/pull/9401)。 +- 优化查询客户端列表的 HTTP API 性能 [#9374](https://github.com/emqx/emqx/pull/9374)。 + ## 修复 - 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。