Merge pull request #12490 from keynslug/fix/mgmt-api-topics-fullscan
fix(api-topics): avoid doing full scans over router tables
This commit is contained in:
commit
bad79965c0
|
@ -166,9 +166,7 @@ node_query(Node, Tab, QString, QSchema, MsFun, FmtFun, Options) ->
|
||||||
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
||||||
ResultAcc = init_query_result(),
|
ResultAcc = init_query_result(),
|
||||||
QueryState = init_query_state(Tab, NQString, MsFun, Meta, Options),
|
QueryState = init_query_state(Tab, NQString, MsFun, Meta, Options),
|
||||||
NResultAcc = do_node_query(
|
NResultAcc = do_node_query(Node, QueryState, ResultAcc),
|
||||||
Node, QueryState, ResultAcc
|
|
||||||
),
|
|
||||||
format_query_result(FmtFun, Meta, NResultAcc)
|
format_query_result(FmtFun, Meta, NResultAcc)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,7 @@ do_list(Params) ->
|
||||||
{_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
|
{_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
|
||||||
QState = Pager#{continuation => undefined},
|
QState = Pager#{continuation => undefined},
|
||||||
QResult = eval_topic_query(qs2ms(Query), QState),
|
QResult = eval_topic_query(qs2ms(Query), QState),
|
||||||
{200, format_list_response(Pager, QResult)}
|
{200, format_list_response(Pager, Query, QResult)}
|
||||||
catch
|
catch
|
||||||
throw:{error, page_limit_invalid} ->
|
throw:{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
|
@ -164,14 +164,17 @@ eval_topic_query(MS, QState) ->
|
||||||
finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())).
|
finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())).
|
||||||
|
|
||||||
eval_topic_query(MS, QState, QResult) ->
|
eval_topic_query(MS, QState, QResult) ->
|
||||||
QPage = eval_topic_query_page(MS, QState),
|
case eval_topic_query_page(MS, QState) of
|
||||||
case QPage of
|
|
||||||
{Rows, '$end_of_table'} ->
|
{Rows, '$end_of_table'} ->
|
||||||
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
|
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
|
||||||
NQResult#{complete => true};
|
NQResult#{complete => true};
|
||||||
{Rows, NCont} ->
|
{Rows, NCont} ->
|
||||||
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
|
case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of
|
||||||
eval_topic_query(MS, QState#{continuation := NCont}, NQResult);
|
{more, NQResult} ->
|
||||||
|
eval_topic_query(MS, QState#{continuation := NCont}, NQResult);
|
||||||
|
{enough, NQResult} ->
|
||||||
|
NQResult#{complete => false}
|
||||||
|
end;
|
||||||
'$end_of_table' ->
|
'$end_of_table' ->
|
||||||
QResult#{complete => true}
|
QResult#{complete => true}
|
||||||
end.
|
end.
|
||||||
|
@ -183,15 +186,22 @@ finalize_query(QResult = #{overflow := Overflow, complete := Complete}) ->
|
||||||
HasNext = Overflow orelse not Complete,
|
HasNext = Overflow orelse not Complete,
|
||||||
QResult#{hasnext => HasNext}.
|
QResult#{hasnext => HasNext}.
|
||||||
|
|
||||||
format_list_response(Meta, _QResult = #{hasnext := HasNext, rows := RowsAcc, cursor := Cursor}) ->
|
format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
|
||||||
#{
|
#{
|
||||||
meta => Meta#{hasnext => HasNext, count => Cursor},
|
meta => format_response_meta(Meta, Query, QResult),
|
||||||
data => lists:flatmap(
|
data => lists:flatmap(
|
||||||
fun({_Node, Rows}) -> [format(R) || R <- Rows] end,
|
fun({_Node, Rows}) -> [format(R) || R <- Rows] end,
|
||||||
RowsAcc
|
RowsAcc
|
||||||
)
|
)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
format_response_meta(Meta, _Query, #{hasnext := HasNext, complete := true, cursor := Cursor}) ->
|
||||||
|
Meta#{hasnext => HasNext, count => Cursor};
|
||||||
|
format_response_meta(Meta, _Query = {[], []}, #{hasnext := HasNext}) ->
|
||||||
|
Meta#{hasnext => HasNext, count => emqx_router:stats(n_routes)};
|
||||||
|
format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
|
||||||
|
Meta#{hasnext => HasNext}.
|
||||||
|
|
||||||
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
||||||
#{topic => ?SHARE(Group, Topic), node => Node};
|
#{topic => ?SHARE(Group, Topic), node => Node};
|
||||||
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
|
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx_router.hrl").
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
@ -26,55 +25,50 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
Apps = emqx_cth_suite:start(
|
||||||
|
[
|
||||||
|
emqx,
|
||||||
|
emqx_management,
|
||||||
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||||
|
),
|
||||||
Peer = emqx_common_test_helpers:start_peer(node1, []),
|
Peer = emqx_common_test_helpers:start_peer(node1, []),
|
||||||
[{peer, Peer} | Config].
|
[{apps, Apps}, {peer, Peer} | Config].
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
end_per_suite(Config) ->
|
||||||
Peer = ?config(peer, Config),
|
_ = emqx_common_test_helpers:stop_peer(?config(peer, Config)),
|
||||||
emqx_common_test_helpers:stop_peer(Peer),
|
ok = emqx_cth_suite:stop(?config(apps, Config)).
|
||||||
mria:clear_table(?ROUTE_TAB),
|
|
||||||
emqx_mgmt_api_test_util:end_suite().
|
|
||||||
|
|
||||||
t_nodes_api(Config) ->
|
t_nodes_api(Config) ->
|
||||||
Node = atom_to_binary(node(), utf8),
|
Node = atom_to_binary(node(), utf8),
|
||||||
Topic = <<"test_topic">>,
|
Topic = <<"test_topic">>,
|
||||||
{ok, Client} = emqtt:start_link(#{
|
Client = client(?FUNCTION_NAME),
|
||||||
username => <<"routes_username">>, clientid => <<"routes_cid">>
|
|
||||||
}),
|
|
||||||
{ok, _} = emqtt:connect(Client),
|
|
||||||
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
|
||||||
%% list all
|
%% list all
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["topics"]),
|
RoutesData = request_json(get, ["topics"]),
|
||||||
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path),
|
|
||||||
RoutesData = emqx_utils_json:decode(Response, [return_maps]),
|
|
||||||
Meta = maps:get(<<"meta">>, RoutesData),
|
Meta = maps:get(<<"meta">>, RoutesData),
|
||||||
?assertEqual(1, maps:get(<<"page">>, Meta)),
|
?assertEqual(1, maps:get(<<"page">>, Meta)),
|
||||||
?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)),
|
?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)),
|
||||||
?assertEqual(1, maps:get(<<"count">>, Meta)),
|
?assertEqual(1, maps:get(<<"count">>, Meta)),
|
||||||
Data = maps:get(<<"data">>, RoutesData),
|
[Route | _] = maps:get(<<"data">>, RoutesData),
|
||||||
Route = erlang:hd(Data),
|
|
||||||
?assertEqual(Topic, maps:get(<<"topic">>, Route)),
|
?assertEqual(Topic, maps:get(<<"topic">>, Route)),
|
||||||
?assertEqual(Node, maps:get(<<"node">>, Route)),
|
?assertEqual(Node, maps:get(<<"node">>, Route)),
|
||||||
|
|
||||||
%% exact match
|
%% exact match
|
||||||
Topic2 = <<"test_topic_2">>,
|
Topic2 = <<"test_topic_2">>,
|
||||||
{ok, _, _} = emqtt:subscribe(Client, Topic2),
|
{ok, _, _} = emqtt:subscribe(Client, Topic2),
|
||||||
QS = uri_string:compose_query([
|
MatchData = request_json(get, ["topics"], [
|
||||||
{"topic", Topic2},
|
{"topic", Topic2},
|
||||||
{"node", atom_to_list(node())}
|
{"node", atom_to_list(node())}
|
||||||
]),
|
]),
|
||||||
Headers = emqx_mgmt_api_test_util:auth_header_(),
|
|
||||||
{ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
|
|
||||||
MatchData = emqx_utils_json:decode(MatchResponse, [return_maps]),
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
|
#{
|
||||||
maps:get(<<"meta">>, MatchData)
|
<<"data">> := [#{<<"topic">> := Topic2, <<"node">> := Node}],
|
||||||
),
|
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 1}
|
||||||
?assertMatch(
|
},
|
||||||
[#{<<"topic">> := Topic2, <<"node">> := Node}],
|
MatchData
|
||||||
maps:get(<<"data">>, MatchData)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
%% get topics/:topic
|
%% get topics/:topic
|
||||||
|
@ -82,44 +76,89 @@ t_nodes_api(Config) ->
|
||||||
%% multiple routes for a single topic
|
%% multiple routes for a single topic
|
||||||
Peer = ?config(peer, Config),
|
Peer = ?config(peer, Config),
|
||||||
ok = emqx_router:add_route(Topic, Peer),
|
ok = emqx_router:add_route(Topic, Peer),
|
||||||
RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]),
|
RouteResponse = request_json(get, ["topics", Topic]),
|
||||||
{ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath),
|
|
||||||
ok = emqx_router:delete_route(Topic, Peer),
|
ok = emqx_router:delete_route(Topic, Peer),
|
||||||
|
|
||||||
[
|
[
|
||||||
#{<<"topic">> := Topic, <<"node">> := Node1},
|
#{<<"topic">> := Topic, <<"node">> := Node1},
|
||||||
#{<<"topic">> := Topic, <<"node">> := Node2}
|
#{<<"topic">> := Topic, <<"node">> := Node2}
|
||||||
] = emqx_utils_json:decode(RouteResponse, [return_maps]),
|
] = RouteResponse,
|
||||||
|
|
||||||
?assertEqual(lists:usort([Node, atom_to_binary(Peer)]), lists:usort([Node1, Node2])),
|
?assertEqual(lists:sort([Node, atom_to_binary(Peer)]), lists:sort([Node1, Node2])),
|
||||||
|
|
||||||
ok = emqtt:stop(Client).
|
ok = emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_paging(_Config) ->
|
||||||
|
Node = atom_to_list(node()),
|
||||||
|
Client1 = client(c_paging_1),
|
||||||
|
Client2 = client(c_paging_2),
|
||||||
|
Topics1 = [
|
||||||
|
<<"t/+">>,
|
||||||
|
<<"test/client/#">>,
|
||||||
|
<<"test/1">>,
|
||||||
|
<<"test/2">>,
|
||||||
|
<<"test/3">>
|
||||||
|
],
|
||||||
|
Topics2 = [
|
||||||
|
<<"t/+">>,
|
||||||
|
<<"test/client/#">>,
|
||||||
|
<<"test/4">>,
|
||||||
|
<<"test/5">>,
|
||||||
|
<<"test/6">>
|
||||||
|
],
|
||||||
|
ok = lists:foreach(fun(T) -> {ok, _, _} = emqtt:subscribe(Client1, T) end, Topics1),
|
||||||
|
ok = lists:foreach(fun(T) -> {ok, _, _} = emqtt:subscribe(Client2, T) end, Topics2),
|
||||||
|
Matched = request_json(get, ["topics"]),
|
||||||
|
?assertEqual(
|
||||||
|
Matched,
|
||||||
|
request_json(get, ["topics"], [{"node", Node}])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
%% NOTE: No `count` in this case.
|
||||||
|
#{<<"hasnext">> => true, <<"page">> => 1, <<"limit">> => 3},
|
||||||
|
maps:get(<<"meta">>, request_json(get, ["topics"], [{"node", Node}, {"limit", "3"}]))
|
||||||
|
),
|
||||||
|
R1 = #{<<"data">> := Data1} = request_json(get, ["topics"], [{"page", "1"}, {"limit", "5"}]),
|
||||||
|
R2 = #{<<"data">> := Data2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "5"}]),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"meta">> := #{<<"hasnext">> := true, <<"page">> := 1, <<"count">> := 8},
|
||||||
|
<<"data">> := [_1, _2, _3, _4, _5]
|
||||||
|
},
|
||||||
|
R1
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"meta">> := #{<<"hasnext">> := false, <<"page">> := 2, <<"count">> := 8},
|
||||||
|
<<"data">> := [_6, _7, _8]
|
||||||
|
},
|
||||||
|
R2
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
lists:usort(Topics1 ++ Topics2),
|
||||||
|
lists:sort([T || #{<<"topic">> := T} <- Data1 ++ Data2])
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client1),
|
||||||
|
ok = emqtt:stop(Client2).
|
||||||
|
|
||||||
t_percent_topics(_Config) ->
|
t_percent_topics(_Config) ->
|
||||||
Node = atom_to_binary(node(), utf8),
|
Node = atom_to_binary(node(), utf8),
|
||||||
Topic = <<"test_%%1">>,
|
Topic = <<"test_%%1">>,
|
||||||
{ok, Client} = emqtt:start_link(#{
|
Client = client(?FUNCTION_NAME),
|
||||||
username => <<"routes_username">>, clientid => <<"routes_cid">>
|
|
||||||
}),
|
|
||||||
{ok, _} = emqtt:connect(Client),
|
|
||||||
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
|
||||||
%% exact match with percent encoded topic
|
%% exact match with percent encoded topic
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["topics"]),
|
MatchData = request_json(get, ["topics"], [
|
||||||
QS = uri_string:compose_query([
|
|
||||||
{"topic", Topic},
|
{"topic", Topic},
|
||||||
{"node", atom_to_list(node())}
|
{"node", atom_to_list(node())}
|
||||||
]),
|
]),
|
||||||
Headers = emqx_mgmt_api_test_util:auth_header_(),
|
|
||||||
{ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
|
|
||||||
MatchData = emqx_utils_json:decode(MatchResponse, [return_maps]),
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
|
#{
|
||||||
maps:get(<<"meta">>, MatchData)
|
<<"data">> := [#{<<"topic">> := Topic, <<"node">> := Node}],
|
||||||
),
|
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 1}
|
||||||
?assertMatch(
|
},
|
||||||
[#{<<"topic">> := Topic, <<"node">> := Node}],
|
MatchData
|
||||||
maps:get(<<"data">>, MatchData)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
ok = emqtt:stop(Client).
|
ok = emqtt:stop(Client).
|
||||||
|
@ -129,29 +168,21 @@ t_shared_topics(_Configs) ->
|
||||||
RealTopic = <<"t/+">>,
|
RealTopic = <<"t/+">>,
|
||||||
Topic = <<"$share/g1/", RealTopic/binary>>,
|
Topic = <<"$share/g1/", RealTopic/binary>>,
|
||||||
|
|
||||||
{ok, Client} = emqtt:start_link(#{
|
Client = client(?FUNCTION_NAME),
|
||||||
username => <<"routes_username">>, clientid => <<"routes_cid">>
|
|
||||||
}),
|
|
||||||
{ok, _} = emqtt:connect(Client),
|
|
||||||
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
|
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
|
||||||
|
|
||||||
%% exact match with shared topic
|
%% exact match with shared topic
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["topics"]),
|
MatchData = request_json(get, ["topics"], [
|
||||||
QS = uri_string:compose_query([
|
|
||||||
{"topic", Topic},
|
{"topic", Topic},
|
||||||
{"node", atom_to_list(node())}
|
{"node", atom_to_list(node())}
|
||||||
]),
|
]),
|
||||||
Headers = emqx_mgmt_api_test_util:auth_header_(),
|
|
||||||
{ok, MatchResponse1} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
|
|
||||||
MatchData = emqx_utils_json:decode(MatchResponse1, [return_maps]),
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
|
#{
|
||||||
maps:get(<<"meta">>, MatchData)
|
<<"data">> := [#{<<"topic">> := Topic, <<"node">> := Node}],
|
||||||
),
|
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 1}
|
||||||
?assertMatch(
|
},
|
||||||
[#{<<"topic">> := Topic, <<"node">> := Node}],
|
MatchData
|
||||||
maps:get(<<"data">>, MatchData)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
ok = emqtt:stop(Client).
|
ok = emqtt:stop(Client).
|
||||||
|
@ -172,3 +203,33 @@ t_shared_topics_invalid(_Config) ->
|
||||||
#{<<"code">> := <<"INVALID_PARAMTER">>, <<"message">> := <<"topic_filter_invalid">>},
|
#{<<"code">> := <<"INVALID_PARAMTER">>, <<"message">> := <<"topic_filter_invalid">>},
|
||||||
emqx_utils_json:decode(Body, [return_maps])
|
emqx_utils_json:decode(Body, [return_maps])
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% Utilities
|
||||||
|
|
||||||
|
client(Name) ->
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
username => emqx_utils_conv:bin(Name),
|
||||||
|
clientid => emqx_utils_conv:bin(Name)
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
Client.
|
||||||
|
|
||||||
|
request_json(Method, Path) ->
|
||||||
|
decode_response(request_api(Method, Path)).
|
||||||
|
|
||||||
|
request_json(Method, Path, QS) ->
|
||||||
|
decode_response(request_api(Method, Path, QS)).
|
||||||
|
|
||||||
|
decode_response({ok, Response}) ->
|
||||||
|
emqx_utils_json:decode(Response, [return_maps]);
|
||||||
|
decode_response({error, Reason}) ->
|
||||||
|
error({request_api_error, Reason}).
|
||||||
|
|
||||||
|
request_api(Method, API) ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(API),
|
||||||
|
emqx_mgmt_api_test_util:request_api(Method, Path).
|
||||||
|
|
||||||
|
request_api(Method, API, QS) ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(API),
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
emqx_mgmt_api_test_util:request_api(Method, Path, uri_string:compose_query(QS), Auth).
|
||||||
|
|
Loading…
Reference in New Issue