From 6812ee9d0f0732ca96fbbc6c48db167f01ad2d4d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 14 Nov 2023 15:54:18 +0700 Subject: [PATCH] fix(mgmt): hide route selection behind router interface Also introduce a generic _stream_ concept, mostly to deal with iterating over 2 ETS tables at once with `ets:match_object/3`. --- apps/emqx/src/emqx_router.erl | 82 ++++++++++- apps/emqx/src/emqx_router_helper.erl | 7 +- apps/emqx/src/emqx_trie_search.erl | 8 +- apps/emqx_management/src/emqx_mgmt_api.erl | 9 ++ .../src/emqx_mgmt_api_topics.erl | 104 ++++++++------ apps/emqx_utils/src/emqx_utils_stream.erl | 131 ++++++++++++++++++ .../test/emqx_utils_stream_tests.erl | 75 ++++++++++ 7 files changed, 369 insertions(+), 47 deletions(-) create mode 100644 apps/emqx_utils/src/emqx_utils_stream.erl create mode 100644 apps/emqx_utils/test/emqx_utils_stream_tests.erl diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index fd988eda1..892a4e5ba 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -52,6 +52,9 @@ lookup_routes/1 ]). +%% Topics API +-export([select/3]). + -export([print_routes/1]). -export([ @@ -59,7 +62,10 @@ foldr_routes/2 ]). --export([topics/0]). +-export([ + topics/0, + stats/1 +]). %% Exported for tests -export([has_route/2]). @@ -219,6 +225,19 @@ mria_delete_route(v2, Topic, Dest) -> mria_delete_route(v1, Topic, Dest) -> mria_delete_route_v1(Topic, Dest). +-spec select(Spec, _Limit :: pos_integer(), Continuation) -> + {[emqx_types:route()], Continuation} | '$end_of_table' +when + Spec :: {_TopicPat, _DestPat}, + Continuation :: term() | '$end_of_table'. +select(MatchSpec, Limit, Cont) -> + select(get_schema_vsn(), MatchSpec, Limit, Cont). + +select(v2, MatchSpec, Limit, Cont) -> + select_v2(MatchSpec, Limit, Cont); +select(v1, MatchSpec, Limit, Cont) -> + select_v1(MatchSpec, Limit, Cont). + -spec topics() -> list(emqx_types:topic()). topics() -> topics(get_schema_vsn()). @@ -228,6 +247,15 @@ topics(v2) -> topics(v1) -> list_topics_v1(). +-spec stats(n_routes) -> non_neg_integer(). +stats(Item) -> + stats(get_schema_vsn(), Item). + +stats(v2, Item) -> + get_stats_v2(Item); +stats(v1, Item) -> + get_stats_v1(Item). + %% @doc Print routes to a topic -spec print_routes(emqx_types:topic()) -> ok. print_routes(Topic) -> @@ -345,9 +373,17 @@ cleanup_routes_v1(Node) -> ] end). +select_v1({MTopic, MDest}, Limit, undefined) -> + ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit); +select_v1(_Spec, _Limit, Cont) -> + ets:select(Cont). + list_topics_v1() -> list_route_tab_topics(). +get_stats_v1(n_routes) -> + emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0). + list_route_tab_topics() -> mnesia:dirty_all_keys(?ROUTE_TAB). @@ -436,11 +472,52 @@ get_dest_node({_, Node}) -> get_dest_node(Node) -> Node. +select_v2(Spec, Limit, undefined) -> + Stream = mk_route_stream(Spec), + select_next(Limit, Stream); +select_v2(_Spec, Limit, Stream) -> + select_next(Limit, Stream). + +select_next(N, Stream) -> + case emqx_utils_stream:take(N, Stream) of + {Routes, SRest} -> + {Routes, SRest}; + Routes -> + {Routes, '$end_of_table'} + end. + +mk_route_stream(Spec) -> + emqx_utils_stream:chain( + mk_route_stream(route, Spec), + mk_route_stream(filter, Spec) + ). + +mk_route_stream(route, Spec) -> + emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end); +mk_route_stream(filter, {MTopic, MDest}) -> + emqx_utils_stream:map( + fun routeidx_to_route/1, + emqx_utils_stream:ets( + fun + (undefined) -> + MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)}, + ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1); + (Cont) -> + ets:match_object(Cont) + end + ) + ). + list_topics_v2() -> Pat = #routeidx{entry = '$1'}, Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)], list_route_tab_topics() ++ Filters. +get_stats_v2(n_routes) -> + NTopics = emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0), + NWildcards = emqx_maybe:define(ets:info(?ROUTE_TAB_FILTERS, size), 0), + NTopics + NWildcards. + fold_routes_v2(FunName, FoldFun, AccIn) -> FilterFoldFun = mk_filtertab_fold_fun(FoldFun), Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB), @@ -449,6 +526,9 @@ fold_routes_v2(FunName, FoldFun, AccIn) -> mk_filtertab_fold_fun(FoldFun) -> fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end. +routeidx_to_route(#routeidx{entry = M}) -> + match_to_route(M). + match_to_route(M) -> #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index b9cdbae4b..c43192d4e 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -190,12 +190,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- stats_fun() -> - case ets:info(?ROUTE_TAB, size) of - undefined -> - ok; - Size -> - emqx_stats:setstat('topics.count', 'topics.max', Size) - end. + emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)). cleanup_routes(Node) -> emqx_router:cleanup_routes(Node). diff --git a/apps/emqx/src/emqx_trie_search.erl b/apps/emqx/src/emqx_trie_search.erl index c8c088b58..da37f2b21 100644 --- a/apps/emqx/src/emqx_trie_search.erl +++ b/apps/emqx/src/emqx_trie_search.erl @@ -98,7 +98,7 @@ -module(emqx_trie_search). --export([make_key/2, filter/1]). +-export([make_key/2, make_pat/2, filter/1]). -export([match/2, matches/3, get_id/1, get_topic/1]). -export_type([key/1, word/0, words/0, nextf/0, opts/0]). @@ -127,6 +127,12 @@ make_key(Topic, ID) when is_binary(Topic) -> make_key(Words, ID) when is_list(Words) -> {Words, {ID}}. +-spec make_pat(emqx_types:topic() | words() | '_', _ID | '_') -> _Pat. +make_pat(Pattern = '_', ID) -> + {Pattern, {ID}}; +make_pat(Topic, ID) -> + make_key(Topic, ID). + %% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter. -spec filter(emqx_types:topic()) -> words() | false. filter(Topic) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index dffa39ae5..b14d1d316 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -35,6 +35,13 @@ b2i/1 ]). +-export([ + parse_pager_params/1, + parse_qstring/2, + init_query_result/0, + accumulate_query_rows/4 +]). + -ifdef(TEST). -export([paginate_test_format/1]). -endif. @@ -444,6 +451,8 @@ accumulate_query_rows( count => Count + length(SubRows), rows => [{Node, SubRows} | RowsAcc] }}; + NCursor when NCursor >= PageEnd + Limit -> + {enough, ResultAcc#{cursor => NCursor}}; NCursor when NCursor >= PageEnd -> SubRows = lists:sublist(Rows, Limit - Count), {enough, ResultAcc#{ diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 31c70573f..c1d5f8e74 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -18,7 +18,6 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("emqx/include/emqx_router.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -37,8 +36,6 @@ topic/2 ]). --export([qs2ms/2, format/1]). - -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND'). -define(TOPICS_QUERY_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]). @@ -110,23 +107,15 @@ topic(get, #{bindings := Bindings}) -> %%%============================================================================================== %% api apply do_list(Params) -> - case - emqx_mgmt_api:node_query( - node(), - ?ROUTE_TAB, - Params, - ?TOPICS_QUERY_SCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format/1 - ) - of - {error, page_limit_invalid} -> - {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; - {error, Node, Error} -> - Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])), - {500, #{code => <<"NODE_DOWN">>, message => Message}}; - Response -> - {200, Response} + try + Pager = parse_pager_params(Params), + {_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA), + QState = Pager#{continuation => undefined}, + QResult = eval_topic_query(qs2ms(Query), QState), + {200, format_list_response(Pager, QResult)} + catch + throw:{error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}} end. lookup(#{topic := Topic}) -> @@ -140,26 +129,63 @@ lookup(#{topic := Topic}) -> %%%============================================================================================== %% internal --spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). -qs2ms(_Tab, {Qs, _}) -> - #{ - match_spec => gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]), - fuzzy_fun => undefined - }. -gen_match_spec([], Res) -> - Res; -gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) -> - {T, D} = - case emqx_topic:parse(T0) of - {#share{group = Group, topic = Topic}, _SubOpts} -> - {Topic, {Group, Node}}; - {T1, _SubOpts} -> - {T1, Node} - end, - gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]); -gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> - gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). +parse_pager_params(Params) -> + try emqx_mgmt_api:parse_pager_params(Params) of + Pager = #{} -> + Pager; + false -> + throw({error, page_limit_invalid}) + catch + error:badarg -> + throw({error, page_limit_invalid}) + end. + +-spec qs2ms({list(), list()}) -> tuple(). +qs2ms({Qs, _}) -> + lists:foldl(fun gen_match_spec/2, {'_', '_'}, Qs). + +gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) -> + case emqx_topic:parse(QTopic) of + {#share{group = Group, topic = Topic}, _SubOpts} -> + {Topic, {Group, MNode}}; + {Topic, _SubOpts} -> + {Topic, MNode} + end; +gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) -> + {MTopic, QNode}. + +eval_topic_query(MS, QState) -> + finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())). + +eval_topic_query(MS, QState, QResult) -> + QPage = eval_topic_query_page(MS, QState), + case QPage of + {Rows, '$end_of_table'} -> + {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult), + NQResult#{complete => true}; + {Rows, NCont} -> + {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult), + eval_topic_query(MS, QState#{continuation := NCont}, NQResult); + '$end_of_table' -> + QResult#{complete => true} + end. + +eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) -> + emqx_router:select(MS, Limit, Cont). + +finalize_query(QResult = #{overflow := Overflow, complete := Complete}) -> + HasNext = Overflow orelse not Complete, + QResult#{hasnext => HasNext}. + +format_list_response(Meta, _QResult = #{hasnext := HasNext, rows := RowsAcc, cursor := Cursor}) -> + #{ + meta => Meta#{hasnext => HasNext, count => Cursor}, + data => lists:flatmap( + fun({_Node, Rows}) -> [format(R) || R <- Rows] end, + RowsAcc + ) + }. format(#route{topic = Topic, dest = {Group, Node}}) -> #{topic => ?SHARE(Group, Topic), node => Node}; diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl new file mode 100644 index 000000000..e7374c861 --- /dev/null +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -0,0 +1,131 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_stream). + +%% Constructors / Combinators +-export([ + empty/0, + list/1, + map/2, + chain/2 +]). + +%% Evaluating +-export([ + next/1, + take/2, + consume/1 +]). + +%% Streams from ETS tables +-export([ + ets/1 +]). + +-export_type([stream/1]). + +%% @doc A stream is essentially a lazy list. +-type stream(T) :: fun(() -> next(T) | []). +-type next(T) :: nonempty_improper_list(T, stream(T)). + +-dialyzer(no_improper_lists). + +%% + +-spec empty() -> stream(none()). +empty() -> + fun() -> [] end. + +-spec list([T]) -> stream(T). +list([]) -> + empty(); +list([X | Rest]) -> + fun() -> [X | list(Rest)] end. + +-spec map(fun((X) -> Y), stream(X)) -> stream(Y). +map(F, S) -> + fun() -> + case next(S) of + [X | Rest] -> + [F(X) | map(F, Rest)]; + [] -> + [] + end + end. + +-spec chain(stream(X), stream(Y)) -> stream(X | Y). +chain(SFirst, SThen) -> + fun() -> + case next(SFirst) of + [X | SRest] -> + [X | chain(SRest, SThen)]; + [] -> + next(SThen) + end + end. + +%% + +-spec next(stream(T)) -> next(T) | []. +next(S) -> + S(). + +-spec take(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T]. +take(N, S) -> + take(N, S, []). + +take(0, S, Acc) -> + {lists:reverse(Acc), S}; +take(N, S, Acc) -> + case next(S) of + [X | SRest] -> + take(N - 1, SRest, [X | Acc]); + [] -> + lists:reverse(Acc) + end. + +-spec consume(stream(T)) -> [T]. +consume(S) -> + case next(S) of + [X | SRest] -> + [X | consume(SRest)]; + [] -> + [] + end. + +%% + +-type select_result(Record, Cont) :: + {[Record], Cont} + | {[Record], '$end_of_table'} + | '$end_of_table'. + +-spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). +ets(ContF) -> + ets(undefined, ContF). + +ets(Cont, ContF) -> + fun() -> + case ContF(Cont) of + {Records, '$end_of_table'} -> + next(list(Records)); + {Records, NCont} -> + next(chain(list(Records), ets(NCont, ContF))); + '$end_of_table' -> + [] + end + end. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl new file mode 100644 index 000000000..0f98bae21 --- /dev/null +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_stream_tests). + +-include_lib("eunit/include/eunit.hrl"). + +empty_test() -> + S = emqx_utils_stream:empty(), + ?assertEqual([], emqx_utils_stream:next(S)). + +empty_consume_test() -> + S = emqx_utils_stream:empty(), + ?assertEqual([], emqx_utils_stream:consume(S)). + +chain_empties_test() -> + S = emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:empty() + ), + ?assertEqual([], emqx_utils_stream:next(S)). + +chain_list_test() -> + S = emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6]) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(S) + ). + +chain_take_test() -> + S = emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7, 8]) + ), + ?assertMatch( + {[1, 2, 3, 4, 5], _SRest}, + emqx_utils_stream:take(5, S) + ), + {_, SRest} = emqx_utils_stream:take(5, S), + ?assertEqual( + [6, 7, 8], + emqx_utils_stream:take(5, SRest) + ). + +chain_list_map_test() -> + S = emqx_utils_stream:map( + fun integer_to_list/1, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + ["1", "2", "3", "4", "5", "6"], + emqx_utils_stream:consume(S) + ).