Merge pull request #9374 from HJianBo/fix-client-query

Improve Clients query performance
This commit is contained in:
Zaiming (Stone) Shi 2022-11-25 07:58:38 +01:00 committed by GitHub
commit be1322c16a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 784 additions and 504 deletions

View File

@ -41,7 +41,8 @@
delete_all_deactivated_alarms/0, delete_all_deactivated_alarms/0,
get_alarms/0, get_alarms/0,
get_alarms/1, get_alarms/1,
format/1 format/1,
format/2
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -169,12 +170,15 @@ get_alarms(activated) ->
get_alarms(deactivated) -> get_alarms(deactivated) ->
gen_server:call(?MODULE, {get_alarms, deactivated}). gen_server:call(?MODULE, {get_alarms, deactivated}).
format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> format(Alarm) ->
format(node(), Alarm).
format(Node, #activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) ->
Now = erlang:system_time(microsecond), Now = erlang:system_time(microsecond),
%% mnesia db stored microsecond for high frequency alarm %% mnesia db stored microsecond for high frequency alarm
%% format for dashboard using millisecond %% format for dashboard using millisecond
#{ #{
node => node(), node => Node,
name => Name, name => Name,
message => Message, message => Message,
%% to millisecond %% to millisecond
@ -182,7 +186,7 @@ format(#activated_alarm{name = Name, message = Message, activate_at = At, detail
activate_at => to_rfc3339(At), activate_at => to_rfc3339(At),
details => Details details => Details
}; };
format(#deactivated_alarm{ format(Node, #deactivated_alarm{
name = Name, name = Name,
message = Message, message = Message,
activate_at = At, activate_at = At,
@ -190,7 +194,7 @@ format(#deactivated_alarm{
deactivate_at = DAt deactivate_at = DAt
}) -> }) ->
#{ #{
node => node(), node => Node,
name => Name, name => Name,
message => Message, message => Message,
%% to millisecond %% to millisecond

View File

@ -650,8 +650,8 @@ init([]) ->
TabOpts = [public, {write_concurrency, true}], TabOpts = [public, {write_concurrency, true}],
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [ordered_set, compressed | TabOpts]),
ok = emqx_tables:new(?CHAN_LIVE_TAB, [set, {write_concurrency, true} | TabOpts]), ok = emqx_tables:new(?CHAN_LIVE_TAB, [ordered_set, {write_concurrency, true} | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
State = #{chan_pmon => emqx_pmon:new()}, State = #{chan_pmon => emqx_pmon:new()},
{ok, State}. {ok, State}.

View File

@ -62,9 +62,10 @@
%% List of business-layer functions that are exempt from the checks: %% List of business-layer functions that are exempt from the checks:
%% erlfmt-ignore %% erlfmt-ignore
-define(EXEMPTIONS, -define(EXEMPTIONS,
"emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are % Reason: legacy code. A fun and a QC query are
% passed in the args, it's futile to try to statically % passed in the args, it's futile to try to statically
% check it % check it
"emqx_mgmt_api:do_query/2, emqx_mgmt_api:collect_total_from_tail_nodes/3"
). ).
-define(XREF, myxref). -define(XREF, myxref).

View File

@ -519,21 +519,51 @@ ensure_quic_listener(Name, UdpPort) ->
%% Clusterisation and multi-node testing %% Clusterisation and multi-node testing
%% %%
-type cluster_spec() :: [node_spec()].
-type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}.
-type role() :: core | replicant.
-type shortname() :: atom().
-type nodename() :: atom().
-type node_opts() :: #{
%% Need to loaded apps. These apps will be loaded once the node started
load_apps => list(),
%% Need to started apps. It is the first arg passed to emqx_common_test_helpers:start_apps/2
apps => list(),
%% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2
env_handler => fun((AppName :: atom()) -> term()),
%% Application env preset before calling `emqx_common_test_helpers:start_apps/2`
env => {AppName :: atom(), Key :: atom(), Val :: term()},
%% Whether to execute `emqx_config:init_load(SchemaMod)`
%% default: true
load_schema => boolean(),
%% Eval by emqx_config:put/2
conf => [{KeyPath :: list(), Val :: term()}],
%% Fast option to config listener port
%% default rule:
%% - tcp: base_port
%% - ssl: base_port + 1
%% - ws : base_port + 3
%% - wss: base_port + 4
listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
}.
-spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].
emqx_cluster(Specs) -> emqx_cluster(Specs) ->
emqx_cluster(Specs, #{}). emqx_cluster(Specs, #{}).
-spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}].
emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) -> emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) ->
emqx_cluster(Specs, maps:from_list(CommonOpts)); emqx_cluster(Specs, maps:from_list(CommonOpts));
emqx_cluster(Specs0, CommonOpts) -> emqx_cluster(Specs0, CommonOpts) ->
Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))), Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
Specs = expand_node_specs(Specs1, CommonOpts), Specs = expand_node_specs(Specs1, CommonOpts),
CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], %% Assign grpc ports
%% Assign grpc ports:
GenRpcPorts = maps:from_list([ GenRpcPorts = maps:from_list([
{node_name(Name), {tcp, gen_rpc_port(base_port(Num))}} {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}}
|| {{_, Name, _}, Num} <- Specs || {{_, Name, _}, Num} <- Specs
]), ]),
%% Set the default node of the cluster: %% Set the default node of the cluster:
CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
JoinTo = JoinTo =
case CoreNodes of case CoreNodes of
[First | _] -> First; [First | _] -> First;
@ -554,6 +584,8 @@ emqx_cluster(Specs0, CommonOpts) ->
]. ].
%% Lower level starting API %% Lower level starting API
-spec start_slave(shortname(), node_opts()) -> nodename().
start_slave(Name, Opts) -> start_slave(Name, Opts) ->
{ok, Node} = ct_slave:start( {ok, Node} = ct_slave:start(
list_to_atom(atom_to_list(Name) ++ "@" ++ host()), list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
@ -590,6 +622,7 @@ epmd_path() ->
%% Node initialization %% Node initialization
-spec setup_node(nodename(), node_opts()) -> ok.
setup_node(Node, Opts) when is_list(Opts) -> setup_node(Node, Opts) when is_list(Opts) ->
setup_node(Node, maps:from_list(Opts)); setup_node(Node, maps:from_list(Opts));
setup_node(Node, Opts) when is_map(Opts) -> setup_node(Node, Opts) when is_map(Opts) ->

View File

@ -47,7 +47,8 @@
]). ]).
-export([ -export([
query/4, qs2ms/2,
run_fuzzy_filter/2,
format_user_info/1, format_user_info/1,
group_match_spec/1 group_match_spec/1
]). ]).
@ -66,7 +67,6 @@
{<<"user_group">>, binary}, {<<"user_group">>, binary},
{<<"is_superuser">>, atom} {<<"is_superuser">>, atom}
]). ]).
-define(QUERY_FUN, {?MODULE, query}).
-type user_group() :: binary(). -type user_group() :: binary().
@ -262,42 +262,30 @@ lookup_user(UserID, #{user_group := UserGroup}) ->
list_users(QueryString, #{user_group := UserGroup}) -> list_users(QueryString, #{user_group := UserGroup}) ->
NQueryString = QueryString#{<<"user_group">> => UserGroup}, NQueryString = QueryString#{<<"user_group">> => UserGroup},
emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN). emqx_mgmt_api:node_query(
node(),
%%-------------------------------------------------------------------- ?TAB,
%% Query Functions NQueryString,
?AUTHN_QSCHEMA,
query(Tab, {QString, []}, Continuation, Limit) -> fun ?MODULE:qs2ms/2,
Ms = ms_from_qstring(QString), fun ?MODULE:format_user_info/1
emqx_mgmt_api:select_table_with_count(
Tab,
Ms,
Continuation,
Limit,
fun format_user_info/1
);
query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
Ms = ms_from_qstring(QString),
FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
emqx_mgmt_api:select_table_with_count(
Tab,
{Ms, FuzzyFilterFun},
Continuation,
Limit,
fun format_user_info/1
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Match funcs %% QueryString to MatchSpec
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
qs2ms(_Tab, {QString, Fuzzy}) ->
#{
match_spec => ms_from_qstring(QString),
fuzzy_fun => fuzzy_filter_fun(Fuzzy)
}.
%% Fuzzy username funcs %% Fuzzy username funcs
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) -> fuzzy_filter_fun(Fuzzy) ->
fun(MsRaws) when is_list(MsRaws) -> {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
lists:filter(
fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
MsRaws
)
end.
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;

View File

@ -49,7 +49,8 @@
]). ]).
-export([ -export([
query/4, qs2ms/2,
run_fuzzy_filter/2,
format_user_info/1, format_user_info/1,
group_match_spec/1 group_match_spec/1
]). ]).
@ -84,7 +85,6 @@
{<<"user_group">>, binary}, {<<"user_group">>, binary},
{<<"is_superuser">>, atom} {<<"is_superuser">>, atom}
]). ]).
-define(QUERY_FUN, {?MODULE, query}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
@ -288,42 +288,30 @@ lookup_user(UserID, #{user_group := UserGroup}) ->
list_users(QueryString, #{user_group := UserGroup}) -> list_users(QueryString, #{user_group := UserGroup}) ->
NQueryString = QueryString#{<<"user_group">> => UserGroup}, NQueryString = QueryString#{<<"user_group">> => UserGroup},
emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN). emqx_mgmt_api:node_query(
node(),
%%-------------------------------------------------------------------- ?TAB,
%% Query Functions NQueryString,
?AUTHN_QSCHEMA,
query(Tab, {QString, []}, Continuation, Limit) -> fun ?MODULE:qs2ms/2,
Ms = ms_from_qstring(QString), fun ?MODULE:format_user_info/1
emqx_mgmt_api:select_table_with_count(
Tab,
Ms,
Continuation,
Limit,
fun format_user_info/1
);
query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
Ms = ms_from_qstring(QString),
FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
emqx_mgmt_api:select_table_with_count(
Tab,
{Ms, FuzzyFilterFun},
Continuation,
Limit,
fun format_user_info/1
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Match funcs %% QueryString to MatchSpec
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
qs2ms(_Tab, {QString, FuzzyQString}) ->
#{
match_spec => ms_from_qstring(QString),
fuzzy_fun => fuzzy_filter_fun(FuzzyQString)
}.
%% Fuzzy username funcs %% Fuzzy username funcs
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) -> fuzzy_filter_fun(Fuzzy) ->
fun(MsRaws) when is_list(MsRaws) -> {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
lists:filter(
fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
MsRaws
)
end.
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;

View File

@ -213,7 +213,7 @@ t_list_users(_) ->
#{ #{
data := [#{is_superuser := false, user_id := <<"u3">>}], data := [#{is_superuser := false, user_id := <<"u3">>}],
meta := #{page := 1, limit := 20, count := 1} meta := #{page := 1, limit := 20, count := 0}
} = emqx_authn_mnesia:list_users( } = emqx_authn_mnesia:list_users(
#{ #{
<<"page">> => 1, <<"page">> => 1,

View File

@ -319,7 +319,7 @@ t_list_users(_) ->
is_superuser := _ is_superuser := _
} }
], ],
meta := #{page := 1, limit := 3, count := 1} meta := #{page := 1, limit := 3, count := 0}
} = emqx_enhanced_authn_scram_mnesia:list_users( } = emqx_enhanced_authn_scram_mnesia:list_users(
#{ #{
<<"page">> => 1, <<"page">> => 1,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authz, [ {application, emqx_authz, [
{description, "An OTP application"}, {description, "An OTP application"},
{vsn, "0.1.7"}, {vsn, "0.1.8"},
{registered, []}, {registered, []},
{mod, {emqx_authz_app, []}}, {mod, {emqx_authz_app, []}},
{applications, [ {applications, [

View File

@ -24,8 +24,8 @@
-import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1, enum/1]). -import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1, enum/1]).
-define(QUERY_USERNAME_FUN, {?MODULE, query_username}). -define(QUERY_USERNAME_FUN, fun ?MODULE:query_username/2).
-define(QUERY_CLIENTID_FUN, {?MODULE, query_clientid}). -define(QUERY_CLIENTID_FUN, fun ?MODULE:query_clientid/2).
-define(ACL_USERNAME_QSCHEMA, [{<<"like_username">>, binary}]). -define(ACL_USERNAME_QSCHEMA, [{<<"like_username">>, binary}]).
-define(ACL_CLIENTID_QSCHEMA, [{<<"like_clientid">>, binary}]). -define(ACL_CLIENTID_QSCHEMA, [{<<"like_clientid">>, binary}]).
@ -49,12 +49,12 @@
%% query funs %% query funs
-export([ -export([
query_username/4, query_username/2,
query_clientid/4 query_clientid/2,
run_fuzzy_filter/2,
format_result/1
]). ]).
-export([format_result/1]).
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND'). -define(NOT_FOUND, 'NOT_FOUND').
-define(ALREADY_EXISTS, 'ALREADY_EXISTS'). -define(ALREADY_EXISTS, 'ALREADY_EXISTS').
@ -405,10 +405,11 @@ users(get, #{query_string := QueryString}) ->
case case
emqx_mgmt_api:node_query( emqx_mgmt_api:node_query(
node(), node(),
QueryString,
?ACL_TABLE, ?ACL_TABLE,
QueryString,
?ACL_USERNAME_QSCHEMA, ?ACL_USERNAME_QSCHEMA,
?QUERY_USERNAME_FUN ?QUERY_USERNAME_FUN,
fun ?MODULE:format_result/1
) )
of of
{error, page_limit_invalid} -> {error, page_limit_invalid} ->
@ -440,10 +441,11 @@ clients(get, #{query_string := QueryString}) ->
case case
emqx_mgmt_api:node_query( emqx_mgmt_api:node_query(
node(), node(),
QueryString,
?ACL_TABLE, ?ACL_TABLE,
QueryString,
?ACL_CLIENTID_QSCHEMA, ?ACL_CLIENTID_QSCHEMA,
?QUERY_CLIENTID_FUN ?QUERY_CLIENTID_FUN,
fun ?MODULE:format_result/1
) )
of of
{error, page_limit_invalid} -> {error, page_limit_invalid} ->
@ -574,59 +576,27 @@ purge(delete, _) ->
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Query Functions %% QueryString to MatchSpec
query_username(Tab, {_QString, []}, Continuation, Limit) -> -spec query_username(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = emqx_authz_mnesia:list_username_rules(), query_username(_Tab, {_QString, FuzzyQString}) ->
emqx_mgmt_api:select_table_with_count( #{
Tab, match_spec => emqx_authz_mnesia:list_username_rules(),
Ms, fuzzy_fun => fuzzy_filter_fun(FuzzyQString)
Continuation, }.
Limit,
fun format_result/1
);
query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
Ms = emqx_authz_mnesia:list_username_rules(),
FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
emqx_mgmt_api:select_table_with_count(
Tab,
{Ms, FuzzyFilterFun},
Continuation,
Limit,
fun format_result/1
).
query_clientid(Tab, {_QString, []}, Continuation, Limit) -> -spec query_clientid(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = emqx_authz_mnesia:list_clientid_rules(), query_clientid(_Tab, {_QString, FuzzyQString}) ->
emqx_mgmt_api:select_table_with_count( #{
Tab, match_spec => emqx_authz_mnesia:list_clientid_rules(),
Ms, fuzzy_fun => fuzzy_filter_fun(FuzzyQString)
Continuation, }.
Limit,
fun format_result/1
);
query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
Ms = emqx_authz_mnesia:list_clientid_rules(),
FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
emqx_mgmt_api:select_table_with_count(
Tab,
{Ms, FuzzyFilterFun},
Continuation,
Limit,
fun format_result/1
).
%%--------------------------------------------------------------------
%% Match funcs
%% Fuzzy username funcs %% Fuzzy username funcs
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) -> fuzzy_filter_fun(Fuzzy) ->
fun(MsRaws) when is_list(MsRaws) -> {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
lists:filter(
fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
MsRaws
)
end.
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;

View File

@ -138,7 +138,12 @@ fields(limit) ->
Meta = #{in => query, desc => Desc, default => ?DEFAULT_ROW, example => 50}, Meta = #{in => query, desc => Desc, default => ?DEFAULT_ROW, example => 50},
[{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}]; [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}];
fields(count) -> fields(count) ->
Meta = #{desc => <<"Results count.">>, required => true}, Desc = <<
"Total number of records counted.<br/>"
"Note: this field is <code>0</code> when the queryed table is empty, "
"or if the query can not be optimized and requires a full table scan."
>>,
Meta = #{desc => Desc, required => true},
[{count, hoconsc:mk(non_neg_integer(), Meta)}]; [{count, hoconsc:mk(non_neg_integer(), Meta)}];
fields(meta) -> fields(meta) ->
fields(page) ++ fields(limit) ++ fields(count). fields(page) ++ fields(limit) ++ fields(count).

View File

@ -55,8 +55,10 @@
%% internal exports (for client query) %% internal exports (for client query)
-export([ -export([
query/4, qs2ms/2,
format_channel_info/1 run_fuzzy_filter/2,
format_channel_info/1,
format_channel_info/2
]). ]).
-define(TAGS, [<<"Gateway Clients">>]). -define(TAGS, [<<"Gateway Clients">>]).
@ -97,8 +99,6 @@ paths() ->
{<<"lte_lifetime">>, integer} {<<"lte_lifetime">>, integer}
]). ]).
-define(QUERY_FUN, {?MODULE, query}).
clients(get, #{ clients(get, #{
bindings := #{name := Name0}, bindings := #{name := Name0},
query_string := QString query_string := QString
@ -109,10 +109,11 @@ clients(get, #{
case maps:get(<<"node">>, QString, undefined) of case maps:get(<<"node">>, QString, undefined) of
undefined -> undefined ->
emqx_mgmt_api:cluster_query( emqx_mgmt_api:cluster_query(
QString,
TabName, TabName,
QString,
?CLIENT_QSCHEMA, ?CLIENT_QSCHEMA,
?QUERY_FUN fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2
); );
Node0 -> Node0 ->
case emqx_misc:safe_to_existing_atom(Node0) of case emqx_misc:safe_to_existing_atom(Node0) of
@ -120,10 +121,11 @@ clients(get, #{
QStringWithoutNode = maps:without([<<"node">>], QString), QStringWithoutNode = maps:without([<<"node">>], QString),
emqx_mgmt_api:node_query( emqx_mgmt_api:node_query(
Node1, Node1,
QStringWithoutNode,
TabName, TabName,
QStringWithoutNode,
?CLIENT_QSCHEMA, ?CLIENT_QSCHEMA,
?QUERY_FUN fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2
); );
{error, _} -> {error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}} {error, Node0, {badrpc, <<"invalid node">>}}
@ -264,27 +266,11 @@ extra_sub_props(Props) ->
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% query funcs %% QueryString to MatchSpec
query(Tab, {Qs, []}, Continuation, Limit) -> -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = qs2ms(Qs), qs2ms(_Tab, {Qs, Fuzzy}) ->
emqx_mgmt_api:select_table_with_count( #{match_spec => qs2ms(Qs), fuzzy_fun => fuzzy_filter_fun(Fuzzy)}.
Tab,
Ms,
Continuation,
Limit,
fun format_channel_info/1
);
query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
Ms = qs2ms(Qs),
FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
emqx_mgmt_api:select_table_with_count(
Tab,
{Ms, FuzzyFilterFun},
Continuation,
Limit,
fun format_channel_info/1
).
qs2ms(Qs) -> qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
@ -339,13 +325,10 @@ ms(lifetime, X) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Fuzzy filter funcs %% Fuzzy filter funcs
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) -> fuzzy_filter_fun(Fuzzy) ->
fun(MsRaws) when is_list(MsRaws) -> {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
lists:filter(
fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
MsRaws
)
end.
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;
@ -363,8 +346,11 @@ run_fuzzy_filter(
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% format funcs %% format funcs
format_channel_info({_, Infos, Stats} = R) -> format_channel_info(ChannInfo) ->
Node = maps:get(node, Infos, node()), format_channel_info(node(), ChannInfo).
format_channel_info(WhichNode, {_, Infos, Stats} = R) ->
Node = maps:get(node, Infos, WhichNode),
ClientInfo = maps:get(clientinfo, Infos, #{}), ClientInfo = maps:get(clientinfo, Infos, #{}),
ConnInfo = maps:get(conninfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}),
SessInfo = maps:get(session, Infos, #{}), SessInfo = maps:get(session, Infos, #{}),

View File

@ -21,6 +21,7 @@
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]). -elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]).
-define(FRESH_SELECT, fresh_select). -define(FRESH_SELECT, fresh_select).
-define(LONG_QUERY_TIMEOUT, 50000).
-export([ -export([
paginate/3, paginate/3,
@ -29,13 +30,34 @@
%% first_next query APIs %% first_next query APIs
-export([ -export([
node_query/5, node_query/6,
cluster_query/4, cluster_query/5,
select_table_with_count/5,
b2i/1 b2i/1
]). ]).
-export([do_query/6]). -export_type([
match_spec_and_filter/0
]).
-type query_params() :: list() | map().
-type query_schema() :: [
{Key :: binary(), Type :: atom | binary | integer | timestamp | ip | ip_port}
].
-type query_to_match_spec_fun() :: fun((list(), list()) -> match_spec_and_filter()).
-type match_spec_and_filter() :: #{match_spec := ets:match_spec(), fuzzy_fun := fuzzy_filter_fun()}.
-type fuzzy_filter_fun() :: undefined | {fun(), list()}.
-type format_result_fun() ::
fun((node(), term()) -> term())
| fun((term()) -> term()).
-type query_return() :: #{meta := map(), data := [term()]}.
-export([do_query/2, apply_total_query/1]).
paginate(Tables, Params, {Module, FormatFun}) -> paginate(Tables, Params, {Module, FormatFun}) ->
Qh = query_handle(Tables), Qh = query_handle(Tables),
@ -117,171 +139,289 @@ limit(Params) when is_map(Params) ->
limit(Params) -> limit(Params) ->
proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()). proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
init_meta(Params) ->
Limit = b2i(limit(Params)),
Page = b2i(page(Params)),
#{
page => Page,
limit => Limit,
count => 0
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Node Query %% Node Query
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
node_query(Node, QString, Tab, QSchema, QueryFun) -> -spec node_query(
{_CodCnt, NQString} = parse_qstring(QString, QSchema), node(),
page_limit_check_query( atom(),
init_meta(QString), query_params(),
{fun do_node_query/5, [Node, Tab, NQString, QueryFun, init_meta(QString)]} query_schema(),
). query_to_match_spec_fun(),
format_result_fun()
) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) ->
case parse_pager_params(QString) of
false ->
{error, page_limit_invalid};
Meta ->
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
ResultAcc = init_query_result(),
QueryState = init_query_state(Tab, NQString, MsFun, Meta),
NResultAcc = do_node_query(
Node, QueryState, ResultAcc
),
format_query_result(FmtFun, Meta, NResultAcc)
end.
%% @private %% @private
do_node_query(Node, Tab, QString, QueryFun, Meta) ->
do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
do_node_query( do_node_query(
Node, Node,
Tab, QueryState,
QString, ResultAcc
QueryFun,
Continuation,
Meta = #{limit := Limit},
Results
) -> ) ->
case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of case do_query(Node, QueryState) of
{error, {badrpc, R}} -> {error, {badrpc, R}} ->
{error, Node, {badrpc, R}}; {error, Node, {badrpc, R}};
{Len, Rows, ?FRESH_SELECT} -> {Rows, NQueryState = #{continuation := ?FRESH_SELECT}} ->
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), {_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc),
#{meta => NMeta, data => NResults}; NResultAcc;
{Len, Rows, NContinuation} -> {Rows, NQueryState} ->
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults) {enough, NResultAcc} ->
NResultAcc;
{more, NResultAcc} ->
do_node_query(Node, NQueryState, NResultAcc)
end
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Cluster Query %% Cluster Query
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec cluster_query(
cluster_query(QString, Tab, QSchema, QueryFun) -> atom(),
{_CodCnt, NQString} = parse_qstring(QString, QSchema), query_params(),
Nodes = mria_mnesia:running_nodes(), query_schema(),
page_limit_check_query( query_to_match_spec_fun(),
init_meta(QString), format_result_fun()
{fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]} ) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
). cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
case parse_pager_params(QString) of
false ->
{error, page_limit_invalid};
Meta ->
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
Nodes = mria_mnesia:running_nodes(),
ResultAcc = init_query_result(),
QueryState = init_query_state(Tab, NQString, MsFun, Meta),
NResultAcc = do_cluster_query(
Nodes, QueryState, ResultAcc
),
format_query_result(FmtFun, Meta, NResultAcc)
end.
%% @private %% @private
do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) -> do_cluster_query([], _QueryState, ResultAcc) ->
do_cluster_query( ResultAcc;
Nodes,
Tab,
QString,
QueryFun,
_Continuation = ?FRESH_SELECT,
Meta,
_Results = []
).
do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) ->
#{meta => Meta, data => Results};
do_cluster_query( do_cluster_query(
[Node | Tail] = Nodes, [Node | Tail] = Nodes,
Tab, QueryState,
QString, ResultAcc
QueryFun,
Continuation,
Meta = #{limit := Limit},
Results
) -> ) ->
case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of case do_query(Node, QueryState) of
{error, {badrpc, R}} -> {error, {badrpc, R}} ->
{error, Node, {bar_rpc, R}}; {error, Node, {badrpc, R}};
{Len, Rows, ?FRESH_SELECT} -> {Rows, NQueryState} ->
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults); {enough, NResultAcc} ->
{Len, Rows, NContinuation} -> maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc);
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), {more, NResultAcc} ->
do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults) NextNodes =
case NQueryState of
#{continuation := ?FRESH_SELECT} -> Tail;
_ -> Nodes
end,
do_cluster_query(NextNodes, NQueryState, NResultAcc)
end
end.
maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) ->
ResultAcc;
maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) ->
case counting_total_fun(QueryState) of
false ->
ResultAcc;
_Fun ->
collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc)
end.
collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) ->
%% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of
{_, [Node | _]} ->
{error, Node, {badrpc, badnode}};
{ResL0, []} ->
ResL = lists:zip(Nodes, ResL0),
case lists:filter(fun({_, I}) -> not is_integer(I) end, ResL) of
[{Node, {badrpc, Reason}} | _] ->
{error, Node, {badrpc, Reason}};
[] ->
ResultAcc#{total => ResL ++ TotalAcc}
end
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Do Query (or rpc query) %% Do Query (or rpc query)
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% QueryState ::
%% #{continuation := ets:continuation(),
%% page := pos_integer(),
%% limit := pos_integer(),
%% total := [{node(), non_neg_integer()}],
%% table := atom(),
%% qs := {Qs, Fuzzy} %% parsed query params
%% msfun := query_to_match_spec_fun()
%% }
init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) ->
#{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
%% assert FuzzyFun type
_ =
case FuzzyFun of
undefined ->
ok;
{NamedFun, Args} ->
true = is_list(Args),
{type, external} = erlang:fun_info(NamedFun, type)
end,
#{
page => Page,
limit => Limit,
table => Tab,
qs => QString,
msfun => MsFun,
mactch_spec => Ms,
fuzzy_fun => FuzzyFun,
total => [],
continuation => ?FRESH_SELECT
}.
%% @private This function is exempt from BPAPI %% @private This function is exempt from BPAPI
do_query(Node, Tab, QString, {M, F}, Continuation, Limit) when Node =:= node() -> do_query(Node, QueryState) when Node =:= node() ->
erlang:apply(M, F, [Tab, QString, Continuation, Limit]); do_select(Node, QueryState);
do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> do_query(Node, QueryState) ->
case case
rpc:call( rpc:call(
Node, Node,
?MODULE, ?MODULE,
do_query, do_query,
[Node, Tab, QString, QueryFun, Continuation, Limit], [Node, QueryState],
50000 ?LONG_QUERY_TIMEOUT
) )
of of
{badrpc, _} = R -> {error, R}; {badrpc, _} = R -> {error, R};
Ret -> Ret Ret -> Ret
end. end.
sub_query_result(Len, Rows, Limit, Results, Meta) -> do_select(
{Flag, NMeta} = judge_page_with_counting(Len, Meta), Node,
NResults = QueryState0 = #{
case Flag of table := Tab,
more -> mactch_spec := Ms,
[]; fuzzy_fun := FuzzyFun,
cutrows -> continuation := Continuation,
{SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), limit := Limit
ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), }
lists:sublist(lists:append(Results, ThisRows), SubStart, Limit); ) ->
enough -> QueryState = maybe_apply_total_query(Node, QueryState0),
lists:sublist(lists:append(Results, Rows), 1, Limit) Result =
case Continuation of
?FRESH_SELECT ->
ets:select(Tab, Ms, Limit);
_ ->
%% XXX: Repair is necessary because we pass Continuation back
%% and forth through the nodes in the `do_cluster_query`
ets:select(ets:repair_continuation(Continuation, Ms))
end, end,
{NMeta, NResults}. case Result of
'$end_of_table' ->
{[], QueryState#{continuation => ?FRESH_SELECT}};
{Rows, NContinuation} ->
NRows =
case FuzzyFun of
undefined ->
Rows;
{FilterFun, Args0} when is_function(FilterFun), is_list(Args0) ->
lists:filter(
fun(E) -> erlang:apply(FilterFun, [E | Args0]) end,
Rows
)
end,
{NRows, QueryState#{continuation => NContinuation}}
end.
%%-------------------------------------------------------------------- maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) ->
%% Table Select case proplists:get_value(Node, TotalAcc, undefined) of
%%-------------------------------------------------------------------- undefined ->
Total = apply_total_query(QueryState),
QueryState#{total := [{Node, Total} | TotalAcc]};
_ ->
QueryState
end.
select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit, FmtFun) when apply_total_query(QueryState = #{table := Tab}) ->
is_function(FuzzyFilterFun) andalso Limit > 0 case counting_total_fun(QueryState) of
-> false ->
case ets:select(Tab, Ms, Limit) of %% return a fake total number if the query have any conditions
'$end_of_table' -> 0;
{0, [], ?FRESH_SELECT}; Fun ->
{RawResult, NContinuation} -> Fun(Tab)
Rows = FuzzyFilterFun(RawResult), end.
{length(Rows), lists:map(FmtFun, Rows), NContinuation}
counting_total_fun(_QueryState = #{qs := {[], []}}) ->
fun(Tab) -> ets:info(Tab, size) end;
counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) ->
%% XXX: Calculating the total number of data that match a certain
%% condition under a large table is very expensive because the
%% entire ETS table needs to be scanned.
%%
%% XXX: How to optimize it? i.e, using:
[{MatchHead, Conditions, _Return}] = Ms,
CountingMs = [{MatchHead, Conditions, [true]}],
fun(Tab) ->
ets:select_count(Tab, CountingMs)
end; end;
select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit, FmtFun) when counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined ->
is_function(FuzzyFilterFun) %% XXX: Calculating the total number for a fuzzy searching is very very expensive
-> %% so it is not supported now
case ets:select(ets:repair_continuation(Continuation, Ms)) of false.
'$end_of_table' ->
{0, [], ?FRESH_SELECT}; %% ResultAcc :: #{count := integer(),
{RawResult, NContinuation} -> %% cursor := integer(),
Rows = FuzzyFilterFun(RawResult), %% rows := [{node(), Rows :: list()}],
{length(Rows), lists:map(FmtFun, Rows), NContinuation} %% total := [{node() => integer()}]
end; %% }
select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit, FmtFun) when init_query_result() ->
Limit > 0 #{cursor => 0, count => 0, rows => [], total => []}.
->
case ets:select(Tab, Ms, Limit) of accumulate_query_rows(
'$end_of_table' -> Node,
{0, [], ?FRESH_SELECT}; Rows,
{RawResult, NContinuation} -> _QueryState = #{page := Page, limit := Limit, total := TotalAcc},
{length(RawResult), lists:map(FmtFun, RawResult), NContinuation} ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
end; ) ->
select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) -> PageStart = (Page - 1) * Limit + 1,
case ets:select(ets:repair_continuation(Continuation, Ms)) of PageEnd = Page * Limit,
'$end_of_table' -> Len = length(Rows),
{0, [], ?FRESH_SELECT}; case Cursor + Len of
{RawResult, NContinuation} -> NCursor when NCursor < PageStart ->
{length(RawResult), lists:map(FmtFun, RawResult), NContinuation} {more, ResultAcc#{cursor => NCursor, total => TotalAcc}};
NCursor when NCursor < PageEnd ->
{more, ResultAcc#{
cursor => NCursor,
count => Count + length(Rows),
total => TotalAcc,
rows => [{Node, Rows} | RowsAcc]
}};
NCursor when NCursor >= PageEnd ->
SubRows = lists:sublist(Rows, Limit - Count),
{enough, ResultAcc#{
cursor => NCursor,
count => Count + length(SubRows),
total => TotalAcc,
rows => [{Node, SubRows} | RowsAcc]
}}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -295,6 +435,7 @@ parse_qstring(QString, QSchema) ->
{length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}. {length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}.
do_parse_qstring([], _, Acc1, Acc2) -> do_parse_qstring([], _, Acc1, Acc2) ->
%% remove fuzzy keys if present in accurate query
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)], NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
{lists:reverse(Acc1), lists:reverse(NAcc2)}; {lists:reverse(Acc1), lists:reverse(NAcc2)};
do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) -> do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) ->
@ -379,40 +520,41 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
is_fuzzy_key(_) -> is_fuzzy_key(_) ->
false. false.
page_start(1, _) -> 1; format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) ->
page_start(Page, Limit) -> (Page - 1) * Limit + 1. Error;
format_query_result(
FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc}
) ->
Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc),
#{
%% The `count` is used in HTTP API to indicate the total number of
%% queries that can be read
meta => Meta#{count => Total},
data => lists:flatten(
lists:foldl(
fun({Node, Rows}, Acc) ->
[lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc]
end,
[],
RowsAcc
)
)
}.
judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) -> exec_format_fun(FmtFun, Node, Row) ->
PageStart = page_start(Page, Limit), case erlang:fun_info(FmtFun, arity) of
PageEnd = Page * Limit, {arity, 1} -> FmtFun(Row);
case Count + Len of {arity, 2} -> FmtFun(Node, Row)
NCount when NCount < PageStart ->
{more, Meta#{count => NCount}};
NCount when NCount < PageEnd ->
{cutrows, Meta#{count => NCount}};
NCount when NCount >= PageEnd ->
{enough, Meta#{count => NCount}}
end. end.
rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) -> parse_pager_params(Params) ->
PageStart = page_start(Page, Limit), Page = b2i(page(Params)),
case (Count - Len) < PageStart of Limit = b2i(limit(Params)),
case Page > 0 andalso Limit > 0 of
true -> true ->
NeedNowNum = Count - PageStart + 1, #{page => Page, limit => Limit, count => 0};
SubStart = Len - NeedNowNum + 1,
{SubStart, NeedNowNum};
false -> false ->
{_SubStart = 1, _NeedNowNum = Len} false
end.
page_limit_check_query(Meta, {F, A}) ->
case Meta of
#{page := Page, limit := Limit} when
Page < 1; Limit < 1
->
{error, page_limit_invalid};
_ ->
erlang:apply(F, A)
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -458,6 +600,11 @@ to_ip_port(IPAddress) ->
Port = list_to_integer(Port0), Port = list_to_integer(Port0),
{IP, Port}. {IP, Port}.
b2i(Bin) when is_binary(Bin) ->
binary_to_integer(Bin);
b2i(Any) ->
Any.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% EUnits %% EUnits
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -502,8 +649,3 @@ params2qs_test() ->
{0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema). {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
-endif. -endif.
b2i(Bin) when is_binary(Bin) ->
binary_to_integer(Bin);
b2i(Any) ->
Any.

View File

@ -24,12 +24,12 @@
-export([api_spec/0, paths/0, schema/1, fields/1]). -export([api_spec/0, paths/0, schema/1, fields/1]).
-export([alarms/2]). -export([alarms/2, format_alarm/2]).
-define(TAGS, [<<"Alarms">>]). -define(TAGS, [<<"Alarms">>]).
%% internal export (for query) %% internal export (for query)
-export([query/4]). -export([qs2ms/2]).
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -112,7 +112,15 @@ alarms(get, #{query_string := QString}) ->
true -> ?ACTIVATED_ALARM; true -> ?ACTIVATED_ALARM;
false -> ?DEACTIVATED_ALARM false -> ?DEACTIVATED_ALARM
end, end,
case emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}) of case
emqx_mgmt_api:cluster_query(
Table,
QString,
[],
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_alarm/2
)
of
{error, page_limit_invalid} -> {error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} -> {error, Node, {badrpc, R}} ->
@ -128,11 +136,9 @@ alarms(delete, _Params) ->
%%%============================================================================================== %%%==============================================================================================
%% internal %% internal
query(Table, _QsSpec, Continuation, Limit) -> -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = [{'$1', [], ['$1']}], qs2ms(_Tab, {_Qs, _Fuzzy}) ->
emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_alarm/1). #{match_spec => [{'$1', [], ['$1']}], fuzzy_fun => undefined}.
format_alarm(Alarms) when is_list(Alarms) -> format_alarm(WhichNode, Alarm) ->
[emqx_alarm:format(Alarm) || Alarm <- Alarms]; emqx_alarm:format(WhichNode, Alarm).
format_alarm(Alarm) ->
emqx_alarm:format(Alarm).

View File

@ -46,8 +46,10 @@
]). ]).
-export([ -export([
query/4, qs2ms/2,
format_channel_info/1 run_fuzzy_filter/2,
format_channel_info/1,
format_channel_info/2
]). ]).
%% for batch operation %% for batch operation
@ -73,7 +75,6 @@
{<<"lte_connected_at">>, timestamp} {<<"lte_connected_at">>, timestamp}
]). ]).
-define(QUERY_FUN, {?MODULE, query}).
-define(FORMAT_FUN, {?MODULE, format_channel_info}). -define(FORMAT_FUN, {?MODULE, format_channel_info}).
-define(CLIENT_ID_NOT_FOUND, -define(CLIENT_ID_NOT_FOUND,
@ -642,10 +643,11 @@ list_clients(QString) ->
case maps:get(<<"node">>, QString, undefined) of case maps:get(<<"node">>, QString, undefined) of
undefined -> undefined ->
emqx_mgmt_api:cluster_query( emqx_mgmt_api:cluster_query(
QString,
?CLIENT_QTAB, ?CLIENT_QTAB,
QString,
?CLIENT_QSCHEMA, ?CLIENT_QSCHEMA,
?QUERY_FUN fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2
); );
Node0 -> Node0 ->
case emqx_misc:safe_to_existing_atom(Node0) of case emqx_misc:safe_to_existing_atom(Node0) of
@ -653,10 +655,11 @@ list_clients(QString) ->
QStringWithoutNode = maps:without([<<"node">>], QString), QStringWithoutNode = maps:without([<<"node">>], QString),
emqx_mgmt_api:node_query( emqx_mgmt_api:node_query(
Node1, Node1,
QStringWithoutNode,
?CLIENT_QTAB, ?CLIENT_QTAB,
QStringWithoutNode,
?CLIENT_QSCHEMA, ?CLIENT_QSCHEMA,
?QUERY_FUN fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2
); );
{error, _} -> {error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}} {error, Node0, {badrpc, <<"invalid node">>}}
@ -780,32 +783,16 @@ do_unsubscribe(ClientID, Topic) ->
Res Res
end. end.
%%--------------------------------------------------------------------
%% Query Functions
query(Tab, {QString, []}, Continuation, Limit) ->
Ms = qs2ms(QString),
emqx_mgmt_api:select_table_with_count(
Tab,
Ms,
Continuation,
Limit,
fun format_channel_info/1
);
query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
Ms = qs2ms(QString),
FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
emqx_mgmt_api:select_table_with_count(
Tab,
{Ms, FuzzyFilterFun},
Continuation,
Limit,
fun format_channel_info/1
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% QueryString to Match Spec %% QueryString to Match Spec
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
qs2ms(_Tab, {QString, FuzzyQString}) ->
#{
match_spec => qs2ms(QString),
fuzzy_fun => fuzzy_filter_fun(FuzzyQString)
}.
-spec qs2ms(list()) -> ets:match_spec(). -spec qs2ms(list()) -> ets:match_spec().
qs2ms(Qs) -> qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
@ -855,13 +842,10 @@ ms(created_at, X) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Match funcs %% Match funcs
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) -> fuzzy_filter_fun(Fuzzy) ->
fun(MsRaws) when is_list(MsRaws) -> {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
lists:filter(
fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
MsRaws
)
end.
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;
@ -876,12 +860,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} |
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% format funcs %% format funcs
format_channel_info({_, ClientInfo0, ClientStats}) -> format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) ->
Node = format_channel_info(node(), ChannInfo).
case ClientInfo0 of
#{node := N} -> N; format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
_ -> node() Node = maps:get(node, ClientInfo0, WhichNode),
end,
ClientInfo1 = emqx_map_lib:deep_remove([conninfo, clientid], ClientInfo0), ClientInfo1 = emqx_map_lib:deep_remove([conninfo, clientid], ClientInfo0),
ClientInfo2 = emqx_map_lib:deep_remove([conninfo, username], ClientInfo1), ClientInfo2 = emqx_map_lib:deep_remove([conninfo, username], ClientInfo1),
StatsMap = maps:without( StatsMap = maps:without(

View File

@ -32,8 +32,9 @@
-export([subscriptions/2]). -export([subscriptions/2]).
-export([ -export([
query/4, qs2ms/2,
format/1 run_fuzzy_filter/2,
format/2
]). ]).
-define(SUBS_QTABLE, emqx_suboption). -define(SUBS_QTABLE, emqx_suboption).
@ -47,8 +48,6 @@
{<<"match_topic">>, binary} {<<"match_topic">>, binary}
]). ]).
-define(QUERY_FUN, {?MODULE, query}).
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -139,20 +138,22 @@ subscriptions(get, #{query_string := QString}) ->
case maps:get(<<"node">>, QString, undefined) of case maps:get(<<"node">>, QString, undefined) of
undefined -> undefined ->
emqx_mgmt_api:cluster_query( emqx_mgmt_api:cluster_query(
QString,
?SUBS_QTABLE, ?SUBS_QTABLE,
QString,
?SUBS_QSCHEMA, ?SUBS_QSCHEMA,
?QUERY_FUN fun ?MODULE:qs2ms/2,
fun ?MODULE:format/2
); );
Node0 -> Node0 ->
case emqx_misc:safe_to_existing_atom(Node0) of case emqx_misc:safe_to_existing_atom(Node0) of
{ok, Node1} -> {ok, Node1} ->
emqx_mgmt_api:node_query( emqx_mgmt_api:node_query(
Node1, Node1,
QString,
?SUBS_QTABLE, ?SUBS_QTABLE,
QString,
?SUBS_QSCHEMA, ?SUBS_QSCHEMA,
?QUERY_FUN fun ?MODULE:qs2ms/2,
fun ?MODULE:format/2
); );
{error, _} -> {error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}} {error, Node0, {badrpc, <<"invalid node">>}}
@ -168,16 +169,12 @@ subscriptions(get, #{query_string := QString}) ->
{200, Result} {200, Result}
end. end.
format(Items) when is_list(Items) -> format(WhichNode, {{_Subscriber, Topic}, Options}) ->
[format(Item) || Item <- Items];
format({{Subscriber, Topic}, Options}) ->
format({Subscriber, Topic, Options});
format({_Subscriber, Topic, Options}) ->
maps:merge( maps:merge(
#{ #{
topic => get_topic(Topic, Options), topic => get_topic(Topic, Options),
clientid => maps:get(subid, Options), clientid => maps:get(subid, Options),
node => node() node => WhichNode
}, },
maps:with([qos, nl, rap, rh], Options) maps:with([qos, nl, rap, rh], Options)
). ).
@ -190,53 +187,21 @@ get_topic(Topic, _) ->
Topic. Topic.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Query Function %% QueryString to MatchSpec
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
query(Tab, {Qs, []}, Continuation, Limit) -> -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = qs2ms(Qs), qs2ms(_Tab, {Qs, Fuzzy}) ->
emqx_mgmt_api:select_table_with_count( #{match_spec => gen_match_spec(Qs), fuzzy_fun => fuzzy_filter_fun(Fuzzy)}.
Tab,
Ms,
Continuation,
Limit,
fun format/1
);
query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
Ms = qs2ms(Qs),
FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
emqx_mgmt_api:select_table_with_count(
Tab,
{Ms, FuzzyFilterFun},
Continuation,
Limit,
fun format/1
).
fuzzy_filter_fun(Fuzzy) -> gen_match_spec(Qs) ->
fun(MsRaws) when is_list(MsRaws) -> MtchHead = gen_match_spec(Qs, {{'_', '_'}, #{}}),
lists:filter(
fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
MsRaws
)
end.
run_fuzzy_filter(_, []) ->
true;
run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).
%%--------------------------------------------------------------------
%% Query String to Match Spec
qs2ms(Qs) ->
MtchHead = qs2ms(Qs, {{'_', '_'}, #{}}),
[{MtchHead, [], ['$_']}]. [{MtchHead, [], ['$_']}].
qs2ms([], MtchHead) -> gen_match_spec([], MtchHead) ->
MtchHead; MtchHead;
qs2ms([{Key, '=:=', Value} | More], MtchHead) -> gen_match_spec([{Key, '=:=', Value} | More], MtchHead) ->
qs2ms(More, update_ms(Key, Value, MtchHead)). gen_match_spec(More, update_ms(Key, Value, MtchHead)).
update_ms(clientid, X, {{Pid, Topic}, Opts}) -> update_ms(clientid, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{subid => X}}; {{Pid, Topic}, Opts#{subid => X}};
@ -246,3 +211,13 @@ update_ms(share_group, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{share => X}}; {{Pid, Topic}, Opts#{share => X}};
update_ms(qos, X, {{Pid, Topic}, Opts}) -> update_ms(qos, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{qos => X}}. {{Pid, Topic}, Opts#{qos => X}}.
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) ->
{fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
run_fuzzy_filter(_, []) ->
true;
run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).

View File

@ -34,7 +34,7 @@
topic/2 topic/2
]). ]).
-export([query/4]). -export([qs2ms/2, format/1]).
-define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND'). -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
@ -109,7 +109,12 @@ topic(get, #{bindings := Bindings}) ->
do_list(Params) -> do_list(Params) ->
case case
emqx_mgmt_api:node_query( emqx_mgmt_api:node_query(
node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query} node(),
emqx_route,
Params,
?TOPICS_QUERY_SCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format/1
) )
of of
{error, page_limit_invalid} -> {error, page_limit_invalid} ->
@ -138,16 +143,19 @@ generate_topic(Params = #{topic := Topic}) ->
generate_topic(Params) -> generate_topic(Params) ->
Params. Params.
query(Tab, {Qs, _}, Continuation, Limit) -> -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = qs2ms(Qs, [{{route, '_', '_'}, [], ['$_']}]), qs2ms(_Tab, {Qs, _}) ->
emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format/1). #{
match_spec => gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]),
fuzzy_fun => undefined
}.
qs2ms([], Res) -> gen_match_spec([], Res) ->
Res; Res;
qs2ms([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) ->
qs2ms(Qs, [{{route, T, N}, [], ['$_']}]); gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]);
qs2ms([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
qs2ms(Qs, [{{route, T, N}, [], ['$_']}]). gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
format(#route{topic = Topic, dest = {_, Node}}) -> format(#route{topic = Topic, dest = {_, Node}}) ->
#{topic => Topic, node => Node}; #{topic => Topic, node => Node};

View File

@ -0,0 +1,143 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% setup
%%--------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(_) ->
ok.
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_cluster_query(_Config) ->
net_kernel:start(['master@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
[{Name, Opts}, {Name1, Opts1}] = cluster_specs(),
Node1 = emqx_common_test_helpers:start_slave(Name, Opts),
Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1),
try
process_flag(trap_exit, true),
ClientLs1 = [start_emqtt_client(Node1, I, 2883) || I <- lists:seq(1, 10)],
ClientLs2 = [start_emqtt_client(Node2, I, 3883) || I <- lists:seq(1, 10)],
%% returned list should be the same regardless of which node is requested
{200, ClientsAll} = query_clients(Node1, #{}),
?assertEqual({200, ClientsAll}, query_clients(Node2, #{})),
?assertMatch(
#{page := 1, limit := 100, count := 20},
maps:get(meta, ClientsAll)
),
?assertMatch(20, length(maps:get(data, ClientsAll))),
%% query the first page, counting in entire cluster
{200, ClientsPage1} = query_clients(Node1, #{<<"limit">> => 5}),
?assertMatch(
#{page := 1, limit := 5, count := 20},
maps:get(meta, ClientsPage1)
),
?assertMatch(5, length(maps:get(data, ClientsPage1))),
%% assert: AllPage = Page1 + Page2 + Page3 + Page4
%% !!!Note: this equation requires that the queried tables must be ordered_set
{200, ClientsPage2} = query_clients(Node1, #{<<"page">> => 2, <<"limit">> => 5}),
{200, ClientsPage3} = query_clients(Node2, #{<<"page">> => 3, <<"limit">> => 5}),
{200, ClientsPage4} = query_clients(Node1, #{<<"page">> => 4, <<"limit">> => 5}),
GetClientIds = fun(L) -> lists:map(fun(#{clientid := Id}) -> Id end, L) end,
?assertEqual(
GetClientIds(maps:get(data, ClientsAll)),
GetClientIds(
maps:get(data, ClientsPage1) ++ maps:get(data, ClientsPage2) ++
maps:get(data, ClientsPage3) ++ maps:get(data, ClientsPage4)
)
),
%% exact match can return non-zero total
{200, ClientsNode1} = query_clients(Node2, #{<<"username">> => <<"corenode1@127.0.0.1">>}),
?assertMatch(
#{count := 10},
maps:get(meta, ClientsNode1)
),
%% fuzzy searching can't return total
{200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}),
?assertMatch(
#{count := 0},
maps:get(meta, ClientsNode2)
),
?assertMatch(10, length(maps:get(data, ClientsNode2))),
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2)
after
emqx_common_test_helpers:stop_slave(Node1),
emqx_common_test_helpers:stop_slave(Node2)
end,
ok.
%%--------------------------------------------------------------------
%% helpers
%%--------------------------------------------------------------------
cluster_specs() ->
Specs =
%% default listeners port
[
{core, corenode1, #{listener_ports => [{tcp, 2883}]}},
{core, corenode2, #{listener_ports => [{tcp, 3883}]}}
],
CommOpts =
[
{env, [{emqx, boot_modules, all}]},
{apps, []},
{conf, [
{[listeners, ssl, default, enabled], false},
{[listeners, ws, default, enabled], false},
{[listeners, wss, default, enabled], false}
]}
],
emqx_common_test_helpers:emqx_cluster(
Specs,
CommOpts
).
start_emqtt_client(Node0, N, Port) ->
Node = atom_to_binary(Node0),
ClientId = iolist_to_binary([Node, "-", integer_to_binary(N)]),
{ok, C} = emqtt:start_link([{clientid, ClientId}, {username, Node}, {port, Port}]),
{ok, _} = emqtt:connect(C),
C.
query_clients(Node, Qs0) ->
Qs = maps:merge(
#{<<"page">> => 1, <<"limit">> => 100},
Qs0
),
rpc:call(Node, emqx_mgmt_api_clients, clients, [get, #{query_string => Qs}]).

View File

@ -93,6 +93,7 @@ t_subscription_api(_) ->
{"match_topic", "t/#"} {"match_topic", "t/#"}
]), ]),
Headers = emqx_mgmt_api_test_util:auth_header_(), Headers = emqx_mgmt_api_test_util:auth_header_(),
{ok, ResponseTopic2} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers), {ok, ResponseTopic2} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
DataTopic2 = emqx_json:decode(ResponseTopic2, [return_maps]), DataTopic2 = emqx_json:decode(ResponseTopic2, [return_maps]),
Meta2 = maps:get(<<"meta">>, DataTopic2), Meta2 = maps:get(<<"meta">>, DataTopic2),
@ -114,7 +115,8 @@ t_subscription_api(_) ->
MatchMeta = maps:get(<<"meta">>, MatchData), MatchMeta = maps:get(<<"meta">>, MatchData),
?assertEqual(1, maps:get(<<"page">>, MatchMeta)), ?assertEqual(1, maps:get(<<"page">>, MatchMeta)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)), ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)),
?assertEqual(1, maps:get(<<"count">>, MatchMeta)), %% count equals 0 in fuzzy searching
?assertEqual(0, maps:get(<<"count">>, MatchMeta)),
MatchSubs = maps:get(<<"data">>, MatchData), MatchSubs = maps:get(<<"data">>, MatchData),
?assertEqual(1, length(MatchSubs)), ?assertEqual(1, length(MatchSubs)),

View File

@ -31,6 +31,7 @@ end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite(). emqx_mgmt_api_test_util:end_suite().
t_nodes_api(_) -> t_nodes_api(_) ->
Node = atom_to_binary(node(), utf8),
Topic = <<"test_topic">>, Topic = <<"test_topic">>,
{ok, Client} = emqtt:start_link(#{ {ok, Client} = emqtt:start_link(#{
username => <<"routes_username">>, clientid => <<"routes_cid">> username => <<"routes_username">>, clientid => <<"routes_cid">>
@ -49,11 +50,30 @@ t_nodes_api(_) ->
Data = maps:get(<<"data">>, RoutesData), Data = maps:get(<<"data">>, RoutesData),
Route = erlang:hd(Data), Route = erlang:hd(Data),
?assertEqual(Topic, maps:get(<<"topic">>, Route)), ?assertEqual(Topic, maps:get(<<"topic">>, Route)),
?assertEqual(atom_to_binary(node(), utf8), maps:get(<<"node">>, Route)), ?assertEqual(Node, maps:get(<<"node">>, Route)),
%% exact match
Topic2 = <<"test_topic_2">>,
{ok, _, _} = emqtt:subscribe(Client, Topic2),
QS = uri_string:compose_query([
{"topic", Topic2},
{"node", atom_to_list(node())}
]),
Headers = emqx_mgmt_api_test_util:auth_header_(),
{ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
MatchData = emqx_json:decode(MatchResponse, [return_maps]),
?assertMatch(
#{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
maps:get(<<"meta">>, MatchData)
),
?assertMatch(
[#{<<"topic">> := Topic2, <<"node">> := Node}],
maps:get(<<"data">>, MatchData)
),
%% get topics/:topic %% get topics/:topic
RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]), RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]),
{ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath), {ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath),
RouteData = emqx_json:decode(RouteResponse, [return_maps]), RouteData = emqx_json:decode(RouteResponse, [return_maps]),
?assertEqual(Topic, maps:get(<<"topic">>, RouteData)), ?assertEqual(Topic, maps:get(<<"topic">>, RouteData)),
?assertEqual(atom_to_binary(node(), utf8), maps:get(<<"node">>, RouteData)). ?assertEqual(Node, maps:get(<<"node">>, RouteData)).

View File

@ -56,16 +56,20 @@
get_delayed_message/2, get_delayed_message/2,
delete_delayed_message/1, delete_delayed_message/1,
delete_delayed_message/2, delete_delayed_message/2,
cluster_list/1, cluster_list/1
cluster_query/4 ]).
%% exports for query
-export([
qs2ms/2,
format_delayed/1,
format_delayed/2
]). ]).
-export([ -export([
post_config_update/5 post_config_update/5
]). ]).
-export([format_delayed/1]).
%% exported for `emqx_telemetry' %% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]). -export([get_basic_usage_info/0]).
@ -166,16 +170,29 @@ list(Params) ->
emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN).
cluster_list(Params) -> cluster_list(Params) ->
emqx_mgmt_api:cluster_query(Params, ?TAB, [], {?MODULE, cluster_query}). emqx_mgmt_api:cluster_query(
?TAB,
Params,
[],
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_delayed/2
).
cluster_query(Table, _QsSpec, Continuation, Limit) -> -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = [{'$1', [], ['$1']}], qs2ms(_Table, {_Qs, _Fuzzy}) ->
emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_delayed/1). #{
match_spec => [{'$1', [], ['$1']}],
fuzzy_fun => undefined
}.
format_delayed(Delayed) -> format_delayed(Delayed) ->
format_delayed(Delayed, false). format_delayed(node(), Delayed).
format_delayed(WhichNode, Delayed) ->
format_delayed(WhichNode, Delayed, false).
format_delayed( format_delayed(
WhichNode,
#delayed_message{ #delayed_message{
key = {ExpectTimeStamp, Id}, key = {ExpectTimeStamp, Id},
delayed = Delayed, delayed = Delayed,
@ -195,7 +212,7 @@ format_delayed(
RemainingTime = ExpectTimeStamp - ?NOW, RemainingTime = ExpectTimeStamp - ?NOW,
Result = #{ Result = #{
msgid => emqx_guid:to_hexstr(Id), msgid => emqx_guid:to_hexstr(Id),
node => node(), node => WhichNode,
publish_at => PublishTime, publish_at => PublishTime,
delayed_interval => Delayed, delayed_interval => Delayed,
delayed_remaining => RemainingTime div 1000, delayed_remaining => RemainingTime div 1000,
@ -222,7 +239,7 @@ get_delayed_message(Id) ->
{error, not_found}; {error, not_found};
Rows -> Rows ->
Message = hd(Rows), Message = hd(Rows),
{ok, format_delayed(Message, true)} {ok, format_delayed(node(), Message, true)}
end. end.
get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Node, Id) when Node =:= node() ->

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_modules, [ {application, emqx_modules, [
{description, "EMQX Modules"}, {description, "EMQX Modules"},
{vsn, "5.0.6"}, {vsn, "5.0.7"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib, emqx]}, {applications, [kernel, stdlib, emqx]},
{mod, {emqx_modules_app, []}}, {mod, {emqx_modules_app, []}},

View File

@ -34,7 +34,7 @@
-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]). -export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]).
%% query callback %% query callback
-export([query/4]). -export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]).
-define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
-define(ERR_BADARGS(REASON), begin -define(ERR_BADARGS(REASON), begin
@ -274,10 +274,11 @@ param_path_id() ->
case case
emqx_mgmt_api:node_query( emqx_mgmt_api:node_query(
node(), node(),
QueryString,
?RULE_TAB, ?RULE_TAB,
QueryString,
?RULE_QS_SCHEMA, ?RULE_QS_SCHEMA,
{?MODULE, query} fun ?MODULE:qs2ms/2,
fun ?MODULE:format_rule_resp/1
) )
of of
{error, page_limit_invalid} -> {error, page_limit_invalid} ->
@ -552,38 +553,40 @@ filter_out_request_body(Conf) ->
], ],
maps:without(ExtraConfs, Conf). maps:without(ExtraConfs, Conf).
query(Tab, {Qs, Fuzzy}, Start, Limit) -> -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
Ms = qs2ms(), qs2ms(_Tab, {Qs, Fuzzy}) ->
FuzzyFun = fuzzy_match_fun(Qs, Ms, Fuzzy), case lists:keytake(from, 1, Qs) of
emqx_mgmt_api:select_table_with_count( false ->
Tab, {Ms, FuzzyFun}, Start, Limit, fun format_rule_resp/1 #{match_spec => generate_match_spec(Qs), fuzzy_fun => fuzzy_match_fun(Fuzzy)};
). {value, {from, '=:=', From}, Ls} ->
#{
%% rule is not a record, so everything is fuzzy filter. match_spec => generate_match_spec(Ls),
qs2ms() -> fuzzy_fun => fuzzy_match_fun([{from, '=:=', From} | Fuzzy])
[{'_', [], ['$_']}]. }
fuzzy_match_fun(Qs, Ms, Fuzzy) ->
MsC = ets:match_spec_compile(Ms),
fun(Rows) ->
Ls = ets:match_spec_run(Rows, MsC),
lists:filter(
fun(E) ->
run_qs_match(E, Qs) andalso
run_fuzzy_match(E, Fuzzy)
end,
Ls
)
end. end.
run_qs_match(_, []) -> generate_match_spec(Qs) ->
true; {MtchHead, Conds} = generate_match_spec(Qs, 2, {#{}, []}),
run_qs_match(E = {_Id, #{enable := Enable}}, [{enable, '=:=', Pattern} | Qs]) -> [{{'_', MtchHead}, Conds, ['$_']}].
Enable =:= Pattern andalso run_qs_match(E, Qs);
run_qs_match(E = {_Id, #{from := From}}, [{from, '=:=', Pattern} | Qs]) -> generate_match_spec([], _, {MtchHead, Conds}) ->
lists:member(Pattern, From) andalso run_qs_match(E, Qs); {MtchHead, lists:reverse(Conds)};
run_qs_match(E, [_ | Qs]) -> generate_match_spec([Qs | Rest], N, {MtchHead, Conds}) ->
run_qs_match(E, Qs). Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
NConds = put_conds(Qs, Holder, Conds),
generate_match_spec(Rest, N + 1, {NMtchHead, NConds}).
put_conds({_, Op, V}, Holder, Conds) ->
[{Op, Holder, V} | Conds].
ms(enable, X) ->
#{enable => X}.
fuzzy_match_fun([]) ->
undefined;
fuzzy_match_fun(Fuzzy) ->
{fun ?MODULE:run_fuzzy_match/2, [Fuzzy]}.
run_fuzzy_match(_, []) -> run_fuzzy_match(_, []) ->
true; true;
@ -591,6 +594,8 @@ run_fuzzy_match(E = {Id, _}, [{id, like, Pattern} | Fuzzy]) ->
binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
run_fuzzy_match(E = {_Id, #{description := Desc}}, [{description, like, Pattern} | Fuzzy]) -> run_fuzzy_match(E = {_Id, #{description := Desc}}, [{description, like, Pattern} | Fuzzy]) ->
binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
run_fuzzy_match(E = {_, #{from := Topics}}, [{from, '=:=', Pattern} | Fuzzy]) ->
lists:member(Pattern, Topics) /= false andalso run_fuzzy_match(E, Fuzzy);
run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, match, Pattern} | Fuzzy]) -> run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, match, Pattern} | Fuzzy]) ->
lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) andalso lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) andalso
run_fuzzy_match(E, Fuzzy); run_fuzzy_match(E, Fuzzy);

View File

@ -133,23 +133,23 @@ t_list_rule_api(_Config) ->
QueryStr2 = #{query_string => #{<<"like_description">> => <<"也能"/utf8>>}}, QueryStr2 = #{query_string => #{<<"like_description">> => <<"也能"/utf8>>}},
{200, Result2} = emqx_rule_engine_api:'/rules'(get, QueryStr2), {200, Result2} = emqx_rule_engine_api:'/rules'(get, QueryStr2),
?assertEqual(Result1, Result2), ?assertEqual(maps:get(data, Result1), maps:get(data, Result2)),
QueryStr3 = #{query_string => #{<<"from">> => <<"t/1">>}}, QueryStr3 = #{query_string => #{<<"from">> => <<"t/1">>}},
{200, #{meta := #{count := Count3}}} = emqx_rule_engine_api:'/rules'(get, QueryStr3), {200, #{data := Data3}} = emqx_rule_engine_api:'/rules'(get, QueryStr3),
?assertEqual(19, Count3), ?assertEqual(19, length(Data3)),
QueryStr4 = #{query_string => #{<<"like_from">> => <<"t/1/+">>}}, QueryStr4 = #{query_string => #{<<"like_from">> => <<"t/1/+">>}},
{200, Result4} = emqx_rule_engine_api:'/rules'(get, QueryStr4), {200, Result4} = emqx_rule_engine_api:'/rules'(get, QueryStr4),
?assertEqual(Result1, Result4), ?assertEqual(maps:get(data, Result1), maps:get(data, Result4)),
QueryStr5 = #{query_string => #{<<"match_from">> => <<"t/+/+">>}}, QueryStr5 = #{query_string => #{<<"match_from">> => <<"t/+/+">>}},
{200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5), {200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5),
?assertEqual(Result1, Result5), ?assertEqual(maps:get(data, Result1), maps:get(data, Result5)),
QueryStr6 = #{query_string => #{<<"like_id">> => RuleID}}, QueryStr6 = #{query_string => #{<<"like_id">> => RuleID}},
{200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6), {200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6),
?assertEqual(Result1, Result6), ?assertEqual(maps:get(data, Result1), maps:get(data, Result6)),
%% clean up %% clean up
lists:foreach( lists:foreach(

View File

@ -27,6 +27,8 @@
- Support message properties in `/publish` API [#9401](https://github.com/emqx/emqx/pull/9401). - Support message properties in `/publish` API [#9401](https://github.com/emqx/emqx/pull/9401).
- Optimize client query performance for HTTP APIs [#9374](https://github.com/emqx/emqx/pull/9374).
## Bug fixes ## Bug fixes
- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307). - Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).

View File

@ -25,6 +25,8 @@
- 支持在 /publish API 中添加消息属性 [#9401](https://github.com/emqx/emqx/pull/9401)。 - 支持在 /publish API 中添加消息属性 [#9401](https://github.com/emqx/emqx/pull/9401)。
- 优化查询客户端列表的 HTTP API 性能 [#9374](https://github.com/emqx/emqx/pull/9374)。
## 修复 ## 修复
- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。 - 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。