From 03843c6071adfa6cc6af52d7cc8aef9c556658dc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Nov 2023 17:17:01 +0700 Subject: [PATCH 1/5] feat(router): switch to v2 routing store by default --- apps/emqx/src/emqx_schema.erl | 2 +- rel/i18n/emqx_schema.hocon | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3848e77b4..1eadd8c61 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1388,7 +1388,7 @@ fields("broker_routing") -> sc( hoconsc:enum([v1, v2]), #{ - default => v1, + default => v2, 'readOnly' => true, desc => ?DESC(broker_routing_storage_schema) } diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index e1d086197..3959c8474 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1530,7 +1530,7 @@ sys_event_messages.desc: broker_routing_storage_schema.desc: """Routing storage schema. -Set v1 to leave the default. +Set v1 to use the former schema. v2 is introduced in 5.2. It enables routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. NOTE: Schema v2 is still experimental. NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect.""" From dc3e818e84e441d53a2dd34f13832b0307880632 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Nov 2023 17:44:24 +0700 Subject: [PATCH 2/5] chore: add changelog entry Co-Authored-By: Zaiming (Stone) Shi --- changes/ce/feat-11935.en.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ce/feat-11935.en.md diff --git a/changes/ce/feat-11935.en.md b/changes/ce/feat-11935.en.md new file mode 100644 index 000000000..3cc3a45d5 --- /dev/null +++ b/changes/ce/feat-11935.en.md @@ -0,0 +1,5 @@ +Switch to the new `v2` routing store schema by default. New schema improves both subscription and routing performance, especially so for scenarios with concurrent subscriptions to topic filters sharing common wildcard prefixes, at the cost of slightly increased memory usage. This schema also eliminates the need for a separate index, thus inconsistencies in the routing state rarely encountered in previous versions should no longer be possible. + +If a cluster is rolling upgraded from older version, the cluster will continue to use `v1` store until a full cluster (non-rolling) restart happens. + +The former schema can still be forced by setting `broker.routing.storage_schema` configuration option to `v1` and conducting full non-rolling cluster restart as well. From 6812ee9d0f0732ca96fbbc6c48db167f01ad2d4d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 14 Nov 2023 15:54:18 +0700 Subject: [PATCH 3/5] fix(mgmt): hide route selection behind router interface Also introduce a generic _stream_ concept, mostly to deal with iterating over 2 ETS tables at once with `ets:match_object/3`. --- apps/emqx/src/emqx_router.erl | 82 ++++++++++- apps/emqx/src/emqx_router_helper.erl | 7 +- apps/emqx/src/emqx_trie_search.erl | 8 +- apps/emqx_management/src/emqx_mgmt_api.erl | 9 ++ .../src/emqx_mgmt_api_topics.erl | 104 ++++++++------ apps/emqx_utils/src/emqx_utils_stream.erl | 131 ++++++++++++++++++ .../test/emqx_utils_stream_tests.erl | 75 ++++++++++ 7 files changed, 369 insertions(+), 47 deletions(-) create mode 100644 apps/emqx_utils/src/emqx_utils_stream.erl create mode 100644 apps/emqx_utils/test/emqx_utils_stream_tests.erl diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index fd988eda1..892a4e5ba 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -52,6 +52,9 @@ lookup_routes/1 ]). +%% Topics API +-export([select/3]). + -export([print_routes/1]). -export([ @@ -59,7 +62,10 @@ foldr_routes/2 ]). --export([topics/0]). +-export([ + topics/0, + stats/1 +]). %% Exported for tests -export([has_route/2]). @@ -219,6 +225,19 @@ mria_delete_route(v2, Topic, Dest) -> mria_delete_route(v1, Topic, Dest) -> mria_delete_route_v1(Topic, Dest). +-spec select(Spec, _Limit :: pos_integer(), Continuation) -> + {[emqx_types:route()], Continuation} | '$end_of_table' +when + Spec :: {_TopicPat, _DestPat}, + Continuation :: term() | '$end_of_table'. +select(MatchSpec, Limit, Cont) -> + select(get_schema_vsn(), MatchSpec, Limit, Cont). + +select(v2, MatchSpec, Limit, Cont) -> + select_v2(MatchSpec, Limit, Cont); +select(v1, MatchSpec, Limit, Cont) -> + select_v1(MatchSpec, Limit, Cont). + -spec topics() -> list(emqx_types:topic()). topics() -> topics(get_schema_vsn()). @@ -228,6 +247,15 @@ topics(v2) -> topics(v1) -> list_topics_v1(). +-spec stats(n_routes) -> non_neg_integer(). +stats(Item) -> + stats(get_schema_vsn(), Item). + +stats(v2, Item) -> + get_stats_v2(Item); +stats(v1, Item) -> + get_stats_v1(Item). + %% @doc Print routes to a topic -spec print_routes(emqx_types:topic()) -> ok. print_routes(Topic) -> @@ -345,9 +373,17 @@ cleanup_routes_v1(Node) -> ] end). +select_v1({MTopic, MDest}, Limit, undefined) -> + ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit); +select_v1(_Spec, _Limit, Cont) -> + ets:select(Cont). + list_topics_v1() -> list_route_tab_topics(). +get_stats_v1(n_routes) -> + emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0). + list_route_tab_topics() -> mnesia:dirty_all_keys(?ROUTE_TAB). @@ -436,11 +472,52 @@ get_dest_node({_, Node}) -> get_dest_node(Node) -> Node. +select_v2(Spec, Limit, undefined) -> + Stream = mk_route_stream(Spec), + select_next(Limit, Stream); +select_v2(_Spec, Limit, Stream) -> + select_next(Limit, Stream). + +select_next(N, Stream) -> + case emqx_utils_stream:take(N, Stream) of + {Routes, SRest} -> + {Routes, SRest}; + Routes -> + {Routes, '$end_of_table'} + end. + +mk_route_stream(Spec) -> + emqx_utils_stream:chain( + mk_route_stream(route, Spec), + mk_route_stream(filter, Spec) + ). + +mk_route_stream(route, Spec) -> + emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end); +mk_route_stream(filter, {MTopic, MDest}) -> + emqx_utils_stream:map( + fun routeidx_to_route/1, + emqx_utils_stream:ets( + fun + (undefined) -> + MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)}, + ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1); + (Cont) -> + ets:match_object(Cont) + end + ) + ). + list_topics_v2() -> Pat = #routeidx{entry = '$1'}, Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)], list_route_tab_topics() ++ Filters. +get_stats_v2(n_routes) -> + NTopics = emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0), + NWildcards = emqx_maybe:define(ets:info(?ROUTE_TAB_FILTERS, size), 0), + NTopics + NWildcards. + fold_routes_v2(FunName, FoldFun, AccIn) -> FilterFoldFun = mk_filtertab_fold_fun(FoldFun), Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB), @@ -449,6 +526,9 @@ fold_routes_v2(FunName, FoldFun, AccIn) -> mk_filtertab_fold_fun(FoldFun) -> fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end. +routeidx_to_route(#routeidx{entry = M}) -> + match_to_route(M). + match_to_route(M) -> #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index b9cdbae4b..c43192d4e 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -190,12 +190,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- stats_fun() -> - case ets:info(?ROUTE_TAB, size) of - undefined -> - ok; - Size -> - emqx_stats:setstat('topics.count', 'topics.max', Size) - end. + emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)). cleanup_routes(Node) -> emqx_router:cleanup_routes(Node). diff --git a/apps/emqx/src/emqx_trie_search.erl b/apps/emqx/src/emqx_trie_search.erl index c8c088b58..da37f2b21 100644 --- a/apps/emqx/src/emqx_trie_search.erl +++ b/apps/emqx/src/emqx_trie_search.erl @@ -98,7 +98,7 @@ -module(emqx_trie_search). --export([make_key/2, filter/1]). +-export([make_key/2, make_pat/2, filter/1]). -export([match/2, matches/3, get_id/1, get_topic/1]). -export_type([key/1, word/0, words/0, nextf/0, opts/0]). @@ -127,6 +127,12 @@ make_key(Topic, ID) when is_binary(Topic) -> make_key(Words, ID) when is_list(Words) -> {Words, {ID}}. +-spec make_pat(emqx_types:topic() | words() | '_', _ID | '_') -> _Pat. +make_pat(Pattern = '_', ID) -> + {Pattern, {ID}}; +make_pat(Topic, ID) -> + make_key(Topic, ID). + %% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter. -spec filter(emqx_types:topic()) -> words() | false. filter(Topic) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index dffa39ae5..b14d1d316 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -35,6 +35,13 @@ b2i/1 ]). +-export([ + parse_pager_params/1, + parse_qstring/2, + init_query_result/0, + accumulate_query_rows/4 +]). + -ifdef(TEST). -export([paginate_test_format/1]). -endif. @@ -444,6 +451,8 @@ accumulate_query_rows( count => Count + length(SubRows), rows => [{Node, SubRows} | RowsAcc] }}; + NCursor when NCursor >= PageEnd + Limit -> + {enough, ResultAcc#{cursor => NCursor}}; NCursor when NCursor >= PageEnd -> SubRows = lists:sublist(Rows, Limit - Count), {enough, ResultAcc#{ diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 31c70573f..c1d5f8e74 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -18,7 +18,6 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("emqx/include/emqx_router.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -37,8 +36,6 @@ topic/2 ]). --export([qs2ms/2, format/1]). - -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND'). -define(TOPICS_QUERY_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]). @@ -110,23 +107,15 @@ topic(get, #{bindings := Bindings}) -> %%%============================================================================================== %% api apply do_list(Params) -> - case - emqx_mgmt_api:node_query( - node(), - ?ROUTE_TAB, - Params, - ?TOPICS_QUERY_SCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format/1 - ) - of - {error, page_limit_invalid} -> - {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; - {error, Node, Error} -> - Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])), - {500, #{code => <<"NODE_DOWN">>, message => Message}}; - Response -> - {200, Response} + try + Pager = parse_pager_params(Params), + {_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA), + QState = Pager#{continuation => undefined}, + QResult = eval_topic_query(qs2ms(Query), QState), + {200, format_list_response(Pager, QResult)} + catch + throw:{error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}} end. lookup(#{topic := Topic}) -> @@ -140,26 +129,63 @@ lookup(#{topic := Topic}) -> %%%============================================================================================== %% internal --spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). -qs2ms(_Tab, {Qs, _}) -> - #{ - match_spec => gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]), - fuzzy_fun => undefined - }. -gen_match_spec([], Res) -> - Res; -gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) -> - {T, D} = - case emqx_topic:parse(T0) of - {#share{group = Group, topic = Topic}, _SubOpts} -> - {Topic, {Group, Node}}; - {T1, _SubOpts} -> - {T1, Node} - end, - gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]); -gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> - gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). +parse_pager_params(Params) -> + try emqx_mgmt_api:parse_pager_params(Params) of + Pager = #{} -> + Pager; + false -> + throw({error, page_limit_invalid}) + catch + error:badarg -> + throw({error, page_limit_invalid}) + end. + +-spec qs2ms({list(), list()}) -> tuple(). +qs2ms({Qs, _}) -> + lists:foldl(fun gen_match_spec/2, {'_', '_'}, Qs). + +gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) -> + case emqx_topic:parse(QTopic) of + {#share{group = Group, topic = Topic}, _SubOpts} -> + {Topic, {Group, MNode}}; + {Topic, _SubOpts} -> + {Topic, MNode} + end; +gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) -> + {MTopic, QNode}. + +eval_topic_query(MS, QState) -> + finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())). + +eval_topic_query(MS, QState, QResult) -> + QPage = eval_topic_query_page(MS, QState), + case QPage of + {Rows, '$end_of_table'} -> + {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult), + NQResult#{complete => true}; + {Rows, NCont} -> + {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult), + eval_topic_query(MS, QState#{continuation := NCont}, NQResult); + '$end_of_table' -> + QResult#{complete => true} + end. + +eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) -> + emqx_router:select(MS, Limit, Cont). + +finalize_query(QResult = #{overflow := Overflow, complete := Complete}) -> + HasNext = Overflow orelse not Complete, + QResult#{hasnext => HasNext}. + +format_list_response(Meta, _QResult = #{hasnext := HasNext, rows := RowsAcc, cursor := Cursor}) -> + #{ + meta => Meta#{hasnext => HasNext, count => Cursor}, + data => lists:flatmap( + fun({_Node, Rows}) -> [format(R) || R <- Rows] end, + RowsAcc + ) + }. format(#route{topic = Topic, dest = {Group, Node}}) -> #{topic => ?SHARE(Group, Topic), node => Node}; diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl new file mode 100644 index 000000000..e7374c861 --- /dev/null +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -0,0 +1,131 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_stream). + +%% Constructors / Combinators +-export([ + empty/0, + list/1, + map/2, + chain/2 +]). + +%% Evaluating +-export([ + next/1, + take/2, + consume/1 +]). + +%% Streams from ETS tables +-export([ + ets/1 +]). + +-export_type([stream/1]). + +%% @doc A stream is essentially a lazy list. +-type stream(T) :: fun(() -> next(T) | []). +-type next(T) :: nonempty_improper_list(T, stream(T)). + +-dialyzer(no_improper_lists). + +%% + +-spec empty() -> stream(none()). +empty() -> + fun() -> [] end. + +-spec list([T]) -> stream(T). +list([]) -> + empty(); +list([X | Rest]) -> + fun() -> [X | list(Rest)] end. + +-spec map(fun((X) -> Y), stream(X)) -> stream(Y). +map(F, S) -> + fun() -> + case next(S) of + [X | Rest] -> + [F(X) | map(F, Rest)]; + [] -> + [] + end + end. + +-spec chain(stream(X), stream(Y)) -> stream(X | Y). +chain(SFirst, SThen) -> + fun() -> + case next(SFirst) of + [X | SRest] -> + [X | chain(SRest, SThen)]; + [] -> + next(SThen) + end + end. + +%% + +-spec next(stream(T)) -> next(T) | []. +next(S) -> + S(). + +-spec take(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T]. +take(N, S) -> + take(N, S, []). + +take(0, S, Acc) -> + {lists:reverse(Acc), S}; +take(N, S, Acc) -> + case next(S) of + [X | SRest] -> + take(N - 1, SRest, [X | Acc]); + [] -> + lists:reverse(Acc) + end. + +-spec consume(stream(T)) -> [T]. +consume(S) -> + case next(S) of + [X | SRest] -> + [X | consume(SRest)]; + [] -> + [] + end. + +%% + +-type select_result(Record, Cont) :: + {[Record], Cont} + | {[Record], '$end_of_table'} + | '$end_of_table'. + +-spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). +ets(ContF) -> + ets(undefined, ContF). + +ets(Cont, ContF) -> + fun() -> + case ContF(Cont) of + {Records, '$end_of_table'} -> + next(list(Records)); + {Records, NCont} -> + next(chain(list(Records), ets(NCont, ContF))); + '$end_of_table' -> + [] + end + end. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl new file mode 100644 index 000000000..0f98bae21 --- /dev/null +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_stream_tests). + +-include_lib("eunit/include/eunit.hrl"). + +empty_test() -> + S = emqx_utils_stream:empty(), + ?assertEqual([], emqx_utils_stream:next(S)). + +empty_consume_test() -> + S = emqx_utils_stream:empty(), + ?assertEqual([], emqx_utils_stream:consume(S)). + +chain_empties_test() -> + S = emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:empty() + ), + ?assertEqual([], emqx_utils_stream:next(S)). + +chain_list_test() -> + S = emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6]) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(S) + ). + +chain_take_test() -> + S = emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7, 8]) + ), + ?assertMatch( + {[1, 2, 3, 4, 5], _SRest}, + emqx_utils_stream:take(5, S) + ), + {_, SRest} = emqx_utils_stream:take(5, S), + ?assertEqual( + [6, 7, 8], + emqx_utils_stream:take(5, SRest) + ). + +chain_list_map_test() -> + S = emqx_utils_stream:map( + fun integer_to_list/1, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + ["1", "2", "3", "4", "5", "6"], + emqx_utils_stream:consume(S) + ). From 8919b08207a5f2b53931a08d634285442646f04a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 15 Nov 2023 17:20:40 +0700 Subject: [PATCH 4/5] fix(utils): rename `emqx_utils_stream:take/2` to `consume/2` Which is more neutral and harder to confuse with a destructive `take` in collections. --- apps/emqx/src/emqx_router.erl | 2 +- apps/emqx_utils/src/emqx_utils_stream.erl | 47 ++++++++++++------- .../test/emqx_utils_stream_tests.erl | 6 +-- 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 892a4e5ba..1aebb1b21 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -479,7 +479,7 @@ select_v2(_Spec, Limit, Stream) -> select_next(Limit, Stream). select_next(N, Stream) -> - case emqx_utils_stream:take(N, Stream) of + case emqx_utils_stream:consume(N, Stream) of {Routes, SRest} -> {Routes, SRest}; Routes -> diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index e7374c861..79ce5ce7b 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -27,8 +27,8 @@ %% Evaluating -export([ next/1, - take/2, - consume/1 + consume/1, + consume/2 ]). %% Streams from ETS tables @@ -46,16 +46,20 @@ %% +%% @doc Make a stream that produces no values. -spec empty() -> stream(none()). empty() -> fun() -> [] end. +%% @doc Make a stream out of the given list. +%% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`. -spec list([T]) -> stream(T). list([]) -> empty(); list([X | Rest]) -> fun() -> [X | list(Rest)] end. +%% @doc Make a stream by applying a function to each element of the underlying stream. -spec map(fun((X) -> Y), stream(X)) -> stream(Y). map(F, S) -> fun() -> @@ -67,6 +71,8 @@ map(F, S) -> end end. +%% @doc Make a stream by chaining (concatenating) two streams. +%% The second stream begins to produce values only after the first one is exhausted. -spec chain(stream(X), stream(Y)) -> stream(X | Y). chain(SFirst, SThen) -> fun() -> @@ -80,24 +86,12 @@ chain(SFirst, SThen) -> %% +%% @doc Produce the next value from the stream. -spec next(stream(T)) -> next(T) | []. next(S) -> S(). --spec take(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T]. -take(N, S) -> - take(N, S, []). - -take(0, S, Acc) -> - {lists:reverse(Acc), S}; -take(N, S, Acc) -> - case next(S) of - [X | SRest] -> - take(N - 1, SRest, [X | Acc]); - [] -> - lists:reverse(Acc) - end. - +%% @doc Consume the stream and return a list of all produced values. -spec consume(stream(T)) -> [T]. consume(S) -> case next(S) of @@ -107,6 +101,22 @@ consume(S) -> [] end. +%% @doc Consume N values from the stream and return a list of them and the rest of the stream. +%% If the stream is exhausted before N values are produced, return just a list of these values. +-spec consume(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T]. +consume(N, S) -> + consume(N, S, []). + +consume(0, S, Acc) -> + {lists:reverse(Acc), S}; +consume(N, S, Acc) -> + case next(S) of + [X | SRest] -> + consume(N - 1, SRest, [X | Acc]); + [] -> + lists:reverse(Acc) + end. + %% -type select_result(Record, Cont) :: @@ -114,6 +124,11 @@ consume(S) -> | {[Record], '$end_of_table'} | '$end_of_table'. +%% @doc Make a stream out of an ETS table, where the ETS table is scanned through in chunks, +%% with the given continuation function. The function is assumed to return a result of a call to: +%% * `ets:select/1` / `ets:select/3` +%% * `ets:match/1` / `ets:match/3` +%% * `ets:match_object/1` / `ets:match_object/3` -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). ets(ContF) -> ets(undefined, ContF). diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 0f98bae21..4a48ae45d 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -50,12 +50,12 @@ chain_take_test() -> ), ?assertMatch( {[1, 2, 3, 4, 5], _SRest}, - emqx_utils_stream:take(5, S) + emqx_utils_stream:consume(5, S) ), - {_, SRest} = emqx_utils_stream:take(5, S), + {_, SRest} = emqx_utils_stream:consume(5, S), ?assertEqual( [6, 7, 8], - emqx_utils_stream:take(5, SRest) + emqx_utils_stream:consume(5, SRest) ). chain_list_map_test() -> From 893e90b3724107af91d98f5a3aa917fac11652e4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 15 Nov 2023 22:37:00 +0700 Subject: [PATCH 5/5] fix(kafka): use safe publish in consumer Routing with v2 schema is actually more strict with respect to input to `emqx_router` module routines. This causes Kafka consumer bridge to crash when it tries to publish a message to a topic that looks like a topic filter. --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 89cb9a78f..24ea4d300 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -251,7 +251,7 @@ do_handle_message(Message, State) -> Payload = render(FullMessage, PayloadTemplate), MQTTTopic = render(FullMessage, MQTTTopicTemplate), MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), - _ = emqx:publish(MQTTMessage), + _ = emqx_broker:safe_publish(MQTTMessage), emqx_hooks:run(Hookpoint, [FullMessage]), emqx_resource_metrics:received_inc(ResourceId), %% note: just `ack' does not commit the offset to the