Merge pull request #11935 from keynslug/feat/EMQX-10713/routing-v2-default
feat(router): switch to v2 routing store by default
This commit is contained in:
commit
d019be5806
|
@ -52,6 +52,9 @@
|
||||||
lookup_routes/1
|
lookup_routes/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Topics API
|
||||||
|
-export([select/3]).
|
||||||
|
|
||||||
-export([print_routes/1]).
|
-export([print_routes/1]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -59,7 +62,10 @@
|
||||||
foldr_routes/2
|
foldr_routes/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([topics/0]).
|
-export([
|
||||||
|
topics/0,
|
||||||
|
stats/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% Exported for tests
|
%% Exported for tests
|
||||||
-export([has_route/2]).
|
-export([has_route/2]).
|
||||||
|
@ -219,6 +225,19 @@ mria_delete_route(v2, Topic, Dest) ->
|
||||||
mria_delete_route(v1, Topic, Dest) ->
|
mria_delete_route(v1, Topic, Dest) ->
|
||||||
mria_delete_route_v1(Topic, Dest).
|
mria_delete_route_v1(Topic, Dest).
|
||||||
|
|
||||||
|
-spec select(Spec, _Limit :: pos_integer(), Continuation) ->
|
||||||
|
{[emqx_types:route()], Continuation} | '$end_of_table'
|
||||||
|
when
|
||||||
|
Spec :: {_TopicPat, _DestPat},
|
||||||
|
Continuation :: term() | '$end_of_table'.
|
||||||
|
select(MatchSpec, Limit, Cont) ->
|
||||||
|
select(get_schema_vsn(), MatchSpec, Limit, Cont).
|
||||||
|
|
||||||
|
select(v2, MatchSpec, Limit, Cont) ->
|
||||||
|
select_v2(MatchSpec, Limit, Cont);
|
||||||
|
select(v1, MatchSpec, Limit, Cont) ->
|
||||||
|
select_v1(MatchSpec, Limit, Cont).
|
||||||
|
|
||||||
-spec topics() -> list(emqx_types:topic()).
|
-spec topics() -> list(emqx_types:topic()).
|
||||||
topics() ->
|
topics() ->
|
||||||
topics(get_schema_vsn()).
|
topics(get_schema_vsn()).
|
||||||
|
@ -228,6 +247,15 @@ topics(v2) ->
|
||||||
topics(v1) ->
|
topics(v1) ->
|
||||||
list_topics_v1().
|
list_topics_v1().
|
||||||
|
|
||||||
|
-spec stats(n_routes) -> non_neg_integer().
|
||||||
|
stats(Item) ->
|
||||||
|
stats(get_schema_vsn(), Item).
|
||||||
|
|
||||||
|
stats(v2, Item) ->
|
||||||
|
get_stats_v2(Item);
|
||||||
|
stats(v1, Item) ->
|
||||||
|
get_stats_v1(Item).
|
||||||
|
|
||||||
%% @doc Print routes to a topic
|
%% @doc Print routes to a topic
|
||||||
-spec print_routes(emqx_types:topic()) -> ok.
|
-spec print_routes(emqx_types:topic()) -> ok.
|
||||||
print_routes(Topic) ->
|
print_routes(Topic) ->
|
||||||
|
@ -345,9 +373,17 @@ cleanup_routes_v1(Node) ->
|
||||||
]
|
]
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
select_v1({MTopic, MDest}, Limit, undefined) ->
|
||||||
|
ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit);
|
||||||
|
select_v1(_Spec, _Limit, Cont) ->
|
||||||
|
ets:select(Cont).
|
||||||
|
|
||||||
list_topics_v1() ->
|
list_topics_v1() ->
|
||||||
list_route_tab_topics().
|
list_route_tab_topics().
|
||||||
|
|
||||||
|
get_stats_v1(n_routes) ->
|
||||||
|
emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0).
|
||||||
|
|
||||||
list_route_tab_topics() ->
|
list_route_tab_topics() ->
|
||||||
mnesia:dirty_all_keys(?ROUTE_TAB).
|
mnesia:dirty_all_keys(?ROUTE_TAB).
|
||||||
|
|
||||||
|
@ -436,11 +472,52 @@ get_dest_node({_, Node}) ->
|
||||||
get_dest_node(Node) ->
|
get_dest_node(Node) ->
|
||||||
Node.
|
Node.
|
||||||
|
|
||||||
|
select_v2(Spec, Limit, undefined) ->
|
||||||
|
Stream = mk_route_stream(Spec),
|
||||||
|
select_next(Limit, Stream);
|
||||||
|
select_v2(_Spec, Limit, Stream) ->
|
||||||
|
select_next(Limit, Stream).
|
||||||
|
|
||||||
|
select_next(N, Stream) ->
|
||||||
|
case emqx_utils_stream:consume(N, Stream) of
|
||||||
|
{Routes, SRest} ->
|
||||||
|
{Routes, SRest};
|
||||||
|
Routes ->
|
||||||
|
{Routes, '$end_of_table'}
|
||||||
|
end.
|
||||||
|
|
||||||
|
mk_route_stream(Spec) ->
|
||||||
|
emqx_utils_stream:chain(
|
||||||
|
mk_route_stream(route, Spec),
|
||||||
|
mk_route_stream(filter, Spec)
|
||||||
|
).
|
||||||
|
|
||||||
|
mk_route_stream(route, Spec) ->
|
||||||
|
emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end);
|
||||||
|
mk_route_stream(filter, {MTopic, MDest}) ->
|
||||||
|
emqx_utils_stream:map(
|
||||||
|
fun routeidx_to_route/1,
|
||||||
|
emqx_utils_stream:ets(
|
||||||
|
fun
|
||||||
|
(undefined) ->
|
||||||
|
MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)},
|
||||||
|
ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1);
|
||||||
|
(Cont) ->
|
||||||
|
ets:match_object(Cont)
|
||||||
|
end
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
list_topics_v2() ->
|
list_topics_v2() ->
|
||||||
Pat = #routeidx{entry = '$1'},
|
Pat = #routeidx{entry = '$1'},
|
||||||
Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)],
|
Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)],
|
||||||
list_route_tab_topics() ++ Filters.
|
list_route_tab_topics() ++ Filters.
|
||||||
|
|
||||||
|
get_stats_v2(n_routes) ->
|
||||||
|
NTopics = emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0),
|
||||||
|
NWildcards = emqx_maybe:define(ets:info(?ROUTE_TAB_FILTERS, size), 0),
|
||||||
|
NTopics + NWildcards.
|
||||||
|
|
||||||
fold_routes_v2(FunName, FoldFun, AccIn) ->
|
fold_routes_v2(FunName, FoldFun, AccIn) ->
|
||||||
FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
|
FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
|
||||||
Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB),
|
Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB),
|
||||||
|
@ -449,6 +526,9 @@ fold_routes_v2(FunName, FoldFun, AccIn) ->
|
||||||
mk_filtertab_fold_fun(FoldFun) ->
|
mk_filtertab_fold_fun(FoldFun) ->
|
||||||
fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
|
fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
|
||||||
|
|
||||||
|
routeidx_to_route(#routeidx{entry = M}) ->
|
||||||
|
match_to_route(M).
|
||||||
|
|
||||||
match_to_route(M) ->
|
match_to_route(M) ->
|
||||||
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
|
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
|
||||||
|
|
||||||
|
|
|
@ -190,12 +190,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
stats_fun() ->
|
stats_fun() ->
|
||||||
case ets:info(?ROUTE_TAB, size) of
|
emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)).
|
||||||
undefined ->
|
|
||||||
ok;
|
|
||||||
Size ->
|
|
||||||
emqx_stats:setstat('topics.count', 'topics.max', Size)
|
|
||||||
end.
|
|
||||||
|
|
||||||
cleanup_routes(Node) ->
|
cleanup_routes(Node) ->
|
||||||
emqx_router:cleanup_routes(Node).
|
emqx_router:cleanup_routes(Node).
|
||||||
|
|
|
@ -1382,7 +1382,7 @@ fields("broker_routing") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([v1, v2]),
|
hoconsc:enum([v1, v2]),
|
||||||
#{
|
#{
|
||||||
default => v1,
|
default => v2,
|
||||||
'readOnly' => true,
|
'readOnly' => true,
|
||||||
desc => ?DESC(broker_routing_storage_schema)
|
desc => ?DESC(broker_routing_storage_schema)
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@
|
||||||
|
|
||||||
-module(emqx_trie_search).
|
-module(emqx_trie_search).
|
||||||
|
|
||||||
-export([make_key/2, filter/1]).
|
-export([make_key/2, make_pat/2, filter/1]).
|
||||||
-export([match/2, matches/3, get_id/1, get_topic/1]).
|
-export([match/2, matches/3, get_id/1, get_topic/1]).
|
||||||
-export_type([key/1, word/0, words/0, nextf/0, opts/0]).
|
-export_type([key/1, word/0, words/0, nextf/0, opts/0]).
|
||||||
|
|
||||||
|
@ -127,6 +127,12 @@ make_key(Topic, ID) when is_binary(Topic) ->
|
||||||
make_key(Words, ID) when is_list(Words) ->
|
make_key(Words, ID) when is_list(Words) ->
|
||||||
{Words, {ID}}.
|
{Words, {ID}}.
|
||||||
|
|
||||||
|
-spec make_pat(emqx_types:topic() | words() | '_', _ID | '_') -> _Pat.
|
||||||
|
make_pat(Pattern = '_', ID) ->
|
||||||
|
{Pattern, {ID}};
|
||||||
|
make_pat(Topic, ID) ->
|
||||||
|
make_key(Topic, ID).
|
||||||
|
|
||||||
%% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter.
|
%% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter.
|
||||||
-spec filter(emqx_types:topic()) -> words() | false.
|
-spec filter(emqx_types:topic()) -> words() | false.
|
||||||
filter(Topic) ->
|
filter(Topic) ->
|
||||||
|
|
|
@ -251,7 +251,7 @@ do_handle_message(Message, State) ->
|
||||||
Payload = render(FullMessage, PayloadTemplate),
|
Payload = render(FullMessage, PayloadTemplate),
|
||||||
MQTTTopic = render(FullMessage, MQTTTopicTemplate),
|
MQTTTopic = render(FullMessage, MQTTTopicTemplate),
|
||||||
MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
|
MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
|
||||||
_ = emqx:publish(MQTTMessage),
|
_ = emqx_broker:safe_publish(MQTTMessage),
|
||||||
emqx_hooks:run(Hookpoint, [FullMessage]),
|
emqx_hooks:run(Hookpoint, [FullMessage]),
|
||||||
emqx_resource_metrics:received_inc(ResourceId),
|
emqx_resource_metrics:received_inc(ResourceId),
|
||||||
%% note: just `ack' does not commit the offset to the
|
%% note: just `ack' does not commit the offset to the
|
||||||
|
|
|
@ -35,6 +35,13 @@
|
||||||
b2i/1
|
b2i/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
parse_pager_params/1,
|
||||||
|
parse_qstring/2,
|
||||||
|
init_query_result/0,
|
||||||
|
accumulate_query_rows/4
|
||||||
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([paginate_test_format/1]).
|
-export([paginate_test_format/1]).
|
||||||
-endif.
|
-endif.
|
||||||
|
@ -444,6 +451,8 @@ accumulate_query_rows(
|
||||||
count => Count + length(SubRows),
|
count => Count + length(SubRows),
|
||||||
rows => [{Node, SubRows} | RowsAcc]
|
rows => [{Node, SubRows} | RowsAcc]
|
||||||
}};
|
}};
|
||||||
|
NCursor when NCursor >= PageEnd + Limit ->
|
||||||
|
{enough, ResultAcc#{cursor => NCursor}};
|
||||||
NCursor when NCursor >= PageEnd ->
|
NCursor when NCursor >= PageEnd ->
|
||||||
SubRows = lists:sublist(Rows, Limit - Count),
|
SubRows = lists:sublist(Rows, Limit - Count),
|
||||||
{enough, ResultAcc#{
|
{enough, ResultAcc#{
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("emqx/include/emqx_router.hrl").
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
|
||||||
|
@ -37,8 +36,6 @@
|
||||||
topic/2
|
topic/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([qs2ms/2, format/1]).
|
|
||||||
|
|
||||||
-define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
|
-define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
|
||||||
|
|
||||||
-define(TOPICS_QUERY_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]).
|
-define(TOPICS_QUERY_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]).
|
||||||
|
@ -110,23 +107,15 @@ topic(get, #{bindings := Bindings}) ->
|
||||||
%%%==============================================================================================
|
%%%==============================================================================================
|
||||||
%% api apply
|
%% api apply
|
||||||
do_list(Params) ->
|
do_list(Params) ->
|
||||||
case
|
try
|
||||||
emqx_mgmt_api:node_query(
|
Pager = parse_pager_params(Params),
|
||||||
node(),
|
{_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
|
||||||
?ROUTE_TAB,
|
QState = Pager#{continuation => undefined},
|
||||||
Params,
|
QResult = eval_topic_query(qs2ms(Query), QState),
|
||||||
?TOPICS_QUERY_SCHEMA,
|
{200, format_list_response(Pager, QResult)}
|
||||||
fun ?MODULE:qs2ms/2,
|
catch
|
||||||
fun ?MODULE:format/1
|
throw:{error, page_limit_invalid} ->
|
||||||
)
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}
|
||||||
of
|
|
||||||
{error, page_limit_invalid} ->
|
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
|
||||||
{error, Node, Error} ->
|
|
||||||
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
|
|
||||||
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
|
||||||
Response ->
|
|
||||||
{200, Response}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
lookup(#{topic := Topic}) ->
|
lookup(#{topic := Topic}) ->
|
||||||
|
@ -140,26 +129,63 @@ lookup(#{topic := Topic}) ->
|
||||||
|
|
||||||
%%%==============================================================================================
|
%%%==============================================================================================
|
||||||
%% internal
|
%% internal
|
||||||
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
|
|
||||||
qs2ms(_Tab, {Qs, _}) ->
|
|
||||||
#{
|
|
||||||
match_spec => gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]),
|
|
||||||
fuzzy_fun => undefined
|
|
||||||
}.
|
|
||||||
|
|
||||||
gen_match_spec([], Res) ->
|
parse_pager_params(Params) ->
|
||||||
Res;
|
try emqx_mgmt_api:parse_pager_params(Params) of
|
||||||
gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) ->
|
Pager = #{} ->
|
||||||
{T, D} =
|
Pager;
|
||||||
case emqx_topic:parse(T0) of
|
false ->
|
||||||
{#share{group = Group, topic = Topic}, _SubOpts} ->
|
throw({error, page_limit_invalid})
|
||||||
{Topic, {Group, Node}};
|
catch
|
||||||
{T1, _SubOpts} ->
|
error:badarg ->
|
||||||
{T1, Node}
|
throw({error, page_limit_invalid})
|
||||||
end,
|
end.
|
||||||
gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]);
|
|
||||||
gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
|
-spec qs2ms({list(), list()}) -> tuple().
|
||||||
gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
|
qs2ms({Qs, _}) ->
|
||||||
|
lists:foldl(fun gen_match_spec/2, {'_', '_'}, Qs).
|
||||||
|
|
||||||
|
gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) ->
|
||||||
|
case emqx_topic:parse(QTopic) of
|
||||||
|
{#share{group = Group, topic = Topic}, _SubOpts} ->
|
||||||
|
{Topic, {Group, MNode}};
|
||||||
|
{Topic, _SubOpts} ->
|
||||||
|
{Topic, MNode}
|
||||||
|
end;
|
||||||
|
gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) ->
|
||||||
|
{MTopic, QNode}.
|
||||||
|
|
||||||
|
eval_topic_query(MS, QState) ->
|
||||||
|
finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())).
|
||||||
|
|
||||||
|
eval_topic_query(MS, QState, QResult) ->
|
||||||
|
QPage = eval_topic_query_page(MS, QState),
|
||||||
|
case QPage of
|
||||||
|
{Rows, '$end_of_table'} ->
|
||||||
|
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
|
||||||
|
NQResult#{complete => true};
|
||||||
|
{Rows, NCont} ->
|
||||||
|
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
|
||||||
|
eval_topic_query(MS, QState#{continuation := NCont}, NQResult);
|
||||||
|
'$end_of_table' ->
|
||||||
|
QResult#{complete => true}
|
||||||
|
end.
|
||||||
|
|
||||||
|
eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) ->
|
||||||
|
emqx_router:select(MS, Limit, Cont).
|
||||||
|
|
||||||
|
finalize_query(QResult = #{overflow := Overflow, complete := Complete}) ->
|
||||||
|
HasNext = Overflow orelse not Complete,
|
||||||
|
QResult#{hasnext => HasNext}.
|
||||||
|
|
||||||
|
format_list_response(Meta, _QResult = #{hasnext := HasNext, rows := RowsAcc, cursor := Cursor}) ->
|
||||||
|
#{
|
||||||
|
meta => Meta#{hasnext => HasNext, count => Cursor},
|
||||||
|
data => lists:flatmap(
|
||||||
|
fun({_Node, Rows}) -> [format(R) || R <- Rows] end,
|
||||||
|
RowsAcc
|
||||||
|
)
|
||||||
|
}.
|
||||||
|
|
||||||
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
||||||
#{topic => ?SHARE(Group, Topic), node => Node};
|
#{topic => ?SHARE(Group, Topic), node => Node};
|
||||||
|
|
|
@ -0,0 +1,146 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_utils_stream).
|
||||||
|
|
||||||
|
%% Constructors / Combinators
|
||||||
|
-export([
|
||||||
|
empty/0,
|
||||||
|
list/1,
|
||||||
|
map/2,
|
||||||
|
chain/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Evaluating
|
||||||
|
-export([
|
||||||
|
next/1,
|
||||||
|
consume/1,
|
||||||
|
consume/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Streams from ETS tables
|
||||||
|
-export([
|
||||||
|
ets/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export_type([stream/1]).
|
||||||
|
|
||||||
|
%% @doc A stream is essentially a lazy list.
|
||||||
|
-type stream(T) :: fun(() -> next(T) | []).
|
||||||
|
-type next(T) :: nonempty_improper_list(T, stream(T)).
|
||||||
|
|
||||||
|
-dialyzer(no_improper_lists).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
%% @doc Make a stream that produces no values.
|
||||||
|
-spec empty() -> stream(none()).
|
||||||
|
empty() ->
|
||||||
|
fun() -> [] end.
|
||||||
|
|
||||||
|
%% @doc Make a stream out of the given list.
|
||||||
|
%% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`.
|
||||||
|
-spec list([T]) -> stream(T).
|
||||||
|
list([]) ->
|
||||||
|
empty();
|
||||||
|
list([X | Rest]) ->
|
||||||
|
fun() -> [X | list(Rest)] end.
|
||||||
|
|
||||||
|
%% @doc Make a stream by applying a function to each element of the underlying stream.
|
||||||
|
-spec map(fun((X) -> Y), stream(X)) -> stream(Y).
|
||||||
|
map(F, S) ->
|
||||||
|
fun() ->
|
||||||
|
case next(S) of
|
||||||
|
[X | Rest] ->
|
||||||
|
[F(X) | map(F, Rest)];
|
||||||
|
[] ->
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Make a stream by chaining (concatenating) two streams.
|
||||||
|
%% The second stream begins to produce values only after the first one is exhausted.
|
||||||
|
-spec chain(stream(X), stream(Y)) -> stream(X | Y).
|
||||||
|
chain(SFirst, SThen) ->
|
||||||
|
fun() ->
|
||||||
|
case next(SFirst) of
|
||||||
|
[X | SRest] ->
|
||||||
|
[X | chain(SRest, SThen)];
|
||||||
|
[] ->
|
||||||
|
next(SThen)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
%% @doc Produce the next value from the stream.
|
||||||
|
-spec next(stream(T)) -> next(T) | [].
|
||||||
|
next(S) ->
|
||||||
|
S().
|
||||||
|
|
||||||
|
%% @doc Consume the stream and return a list of all produced values.
|
||||||
|
-spec consume(stream(T)) -> [T].
|
||||||
|
consume(S) ->
|
||||||
|
case next(S) of
|
||||||
|
[X | SRest] ->
|
||||||
|
[X | consume(SRest)];
|
||||||
|
[] ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Consume N values from the stream and return a list of them and the rest of the stream.
|
||||||
|
%% If the stream is exhausted before N values are produced, return just a list of these values.
|
||||||
|
-spec consume(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T].
|
||||||
|
consume(N, S) ->
|
||||||
|
consume(N, S, []).
|
||||||
|
|
||||||
|
consume(0, S, Acc) ->
|
||||||
|
{lists:reverse(Acc), S};
|
||||||
|
consume(N, S, Acc) ->
|
||||||
|
case next(S) of
|
||||||
|
[X | SRest] ->
|
||||||
|
consume(N - 1, SRest, [X | Acc]);
|
||||||
|
[] ->
|
||||||
|
lists:reverse(Acc)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-type select_result(Record, Cont) ::
|
||||||
|
{[Record], Cont}
|
||||||
|
| {[Record], '$end_of_table'}
|
||||||
|
| '$end_of_table'.
|
||||||
|
|
||||||
|
%% @doc Make a stream out of an ETS table, where the ETS table is scanned through in chunks,
|
||||||
|
%% with the given continuation function. The function is assumed to return a result of a call to:
|
||||||
|
%% * `ets:select/1` / `ets:select/3`
|
||||||
|
%% * `ets:match/1` / `ets:match/3`
|
||||||
|
%% * `ets:match_object/1` / `ets:match_object/3`
|
||||||
|
-spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record).
|
||||||
|
ets(ContF) ->
|
||||||
|
ets(undefined, ContF).
|
||||||
|
|
||||||
|
ets(Cont, ContF) ->
|
||||||
|
fun() ->
|
||||||
|
case ContF(Cont) of
|
||||||
|
{Records, '$end_of_table'} ->
|
||||||
|
next(list(Records));
|
||||||
|
{Records, NCont} ->
|
||||||
|
next(chain(list(Records), ets(NCont, ContF)));
|
||||||
|
'$end_of_table' ->
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
end.
|
|
@ -0,0 +1,75 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_utils_stream_tests).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
empty_test() ->
|
||||||
|
S = emqx_utils_stream:empty(),
|
||||||
|
?assertEqual([], emqx_utils_stream:next(S)).
|
||||||
|
|
||||||
|
empty_consume_test() ->
|
||||||
|
S = emqx_utils_stream:empty(),
|
||||||
|
?assertEqual([], emqx_utils_stream:consume(S)).
|
||||||
|
|
||||||
|
chain_empties_test() ->
|
||||||
|
S = emqx_utils_stream:chain(
|
||||||
|
emqx_utils_stream:empty(),
|
||||||
|
emqx_utils_stream:empty()
|
||||||
|
),
|
||||||
|
?assertEqual([], emqx_utils_stream:next(S)).
|
||||||
|
|
||||||
|
chain_list_test() ->
|
||||||
|
S = emqx_utils_stream:chain(
|
||||||
|
emqx_utils_stream:list([1, 2, 3]),
|
||||||
|
emqx_utils_stream:list([4, 5, 6])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[1, 2, 3, 4, 5, 6],
|
||||||
|
emqx_utils_stream:consume(S)
|
||||||
|
).
|
||||||
|
|
||||||
|
chain_take_test() ->
|
||||||
|
S = emqx_utils_stream:chain(
|
||||||
|
emqx_utils_stream:list([1, 2, 3]),
|
||||||
|
emqx_utils_stream:list([4, 5, 6, 7, 8])
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{[1, 2, 3, 4, 5], _SRest},
|
||||||
|
emqx_utils_stream:consume(5, S)
|
||||||
|
),
|
||||||
|
{_, SRest} = emqx_utils_stream:consume(5, S),
|
||||||
|
?assertEqual(
|
||||||
|
[6, 7, 8],
|
||||||
|
emqx_utils_stream:consume(5, SRest)
|
||||||
|
).
|
||||||
|
|
||||||
|
chain_list_map_test() ->
|
||||||
|
S = emqx_utils_stream:map(
|
||||||
|
fun integer_to_list/1,
|
||||||
|
emqx_utils_stream:chain(
|
||||||
|
emqx_utils_stream:list([1, 2, 3]),
|
||||||
|
emqx_utils_stream:chain(
|
||||||
|
emqx_utils_stream:empty(),
|
||||||
|
emqx_utils_stream:list([4, 5, 6])
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
["1", "2", "3", "4", "5", "6"],
|
||||||
|
emqx_utils_stream:consume(S)
|
||||||
|
).
|
|
@ -0,0 +1,5 @@
|
||||||
|
Switch to the new `v2` routing store schema by default. New schema improves both subscription and routing performance, especially so for scenarios with concurrent subscriptions to topic filters sharing common wildcard prefixes, at the cost of slightly increased memory usage. This schema also eliminates the need for a separate index, thus inconsistencies in the routing state rarely encountered in previous versions should no longer be possible.
|
||||||
|
|
||||||
|
If a cluster is rolling upgraded from older version, the cluster will continue to use `v1` store until a full cluster (non-rolling) restart happens.
|
||||||
|
|
||||||
|
The former schema can still be forced by setting `broker.routing.storage_schema` configuration option to `v1` and conducting full non-rolling cluster restart as well.
|
|
@ -1530,7 +1530,7 @@ sys_event_messages.desc:
|
||||||
|
|
||||||
broker_routing_storage_schema.desc:
|
broker_routing_storage_schema.desc:
|
||||||
"""Routing storage schema.
|
"""Routing storage schema.
|
||||||
Set <code>v1</code> to leave the default.
|
Set <code>v1</code> to use the former schema.
|
||||||
<code>v2</code> is introduced in 5.2. It enables routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
|
<code>v2</code> is introduced in 5.2. It enables routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
|
||||||
NOTE: Schema <code>v2</code> is still experimental.
|
NOTE: Schema <code>v2</code> is still experimental.
|
||||||
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
|
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
|
||||||
|
|
Loading…
Reference in New Issue