diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl
index 856a3f7ae..ac9116cbd 100644
--- a/apps/emqx/src/emqx_broker.erl
+++ b/apps/emqx/src/emqx_broker.erl
@@ -167,7 +167,13 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
%% `unsubscribe` codepath. So we have to pick a worker according to the topic,
%% but not shard. If there are topics with high number of shards, then the
%% load across the pool will be unbalanced.
- call(pick(Topic), {subscribe, Topic, SubPid, I});
+ Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}),
+ case Sync of
+ ok ->
+ ok;
+ Ref when is_reference(Ref) ->
+ emqx_router_syncer:wait(Ref)
+ end;
do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
is_binary(RealTopic)
->
@@ -491,8 +497,8 @@ safe_update_stats(Tab, Stat, MaxStat) ->
call(Broker, Req) ->
gen_server:call(Broker, Req, infinity).
-cast(Broker, Msg) ->
- gen_server:cast(Broker, Msg).
+cast(Broker, Req) ->
+ gen_server:cast(Broker, Req).
%% Pick a broker
pick(Topic) ->
@@ -506,18 +512,18 @@ init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
-handle_call({subscribe, Topic, SubPid, 0}, _From, State) ->
+handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) ->
Existed = ets:member(?SUBSCRIBER, Topic),
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
- Result = maybe_add_route(Existed, Topic),
+ Result = maybe_add_route(Existed, Topic, From),
{reply, Result, State};
-handle_call({subscribe, Topic, SubPid, I}, _From, State) ->
+handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) ->
Existed = ets:member(?SUBSCRIBER, Topic),
true = ets:insert(?SUBSCRIBER, [
{Topic, {shard, I}},
{{shard, Topic, I}, SubPid}
]),
- Result = maybe_add_route(Existed, Topic),
+ Result = maybe_add_route(Existed, Topic, From),
{reply, Result, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
@@ -597,12 +603,36 @@ do_dispatch({shard, I}, Topic, Msg) ->
%%
-maybe_add_route(_Existed = false, Topic) ->
- emqx_router:do_add_route(Topic);
-maybe_add_route(_Existed = true, _Topic) ->
+maybe_add_route(_Existed = false, Topic, ReplyTo) ->
+ sync_route(add, Topic, #{reply => ReplyTo});
+maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
ok.
maybe_delete_route(_Exists = false, Topic) ->
- emqx_router:do_delete_route(Topic);
+ sync_route(delete, Topic, #{});
maybe_delete_route(_Exists = true, _Topic) ->
ok.
+
+sync_route(Action, Topic, ReplyTo) ->
+ EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
+ case EnabledOn of
+ all ->
+ push_sync_route(Action, Topic, ReplyTo);
+ none ->
+ regular_sync_route(Action, Topic);
+ Role ->
+ case Role =:= mria_config:whoami() of
+ true ->
+ push_sync_route(Action, Topic, ReplyTo);
+ false ->
+ regular_sync_route(Action, Topic)
+ end
+ end.
+
+push_sync_route(Action, Topic, Opts) ->
+ emqx_router_syncer:push(Action, Topic, node(), Opts).
+
+regular_sync_route(add, Topic) ->
+ emqx_router:do_add_route(Topic, node());
+regular_sync_route(delete, Topic) ->
+ emqx_router:do_delete_route(Topic, node()).
diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl
index 74baf5674..aee8dff5d 100644
--- a/apps/emqx/src/emqx_broker_sup.erl
+++ b/apps/emqx/src/emqx_broker_sup.erl
@@ -32,13 +32,20 @@ start_link() ->
init([]) ->
%% Broker pool
PoolSize = emqx:get_config([node, broker_pool_size], emqx_vm:schedulers() * 2),
- BrokerPool = emqx_pool_sup:spec([
+ BrokerPool = emqx_pool_sup:spec(broker_pool_sup, [
broker_pool,
hash,
PoolSize,
{emqx_broker, start_link, []}
]),
+ SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [
+ router_syncer_pool,
+ hash,
+ PoolSize,
+ {emqx_router_syncer, start_link, []}
+ ]),
+
%% Shared subscription
SharedSub = #{
id => shared_sub,
@@ -59,4 +66,4 @@ init([]) ->
modules => [emqx_broker_helper]
},
- {ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}.
+ {ok, {{one_for_all, 0, 1}, [SyncerPool, BrokerPool, SharedSub, Helper]}}.
diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl
index 13efbe4ea..e7ab37ace 100644
--- a/apps/emqx/src/emqx_router.erl
+++ b/apps/emqx/src/emqx_router.erl
@@ -45,6 +45,13 @@
do_delete_route/2
]).
+%% Mria Activity RPC targets
+-export([
+ mria_batch_run/2
+]).
+
+-export([do_batch/1]).
+
-export([cleanup_routes/1]).
-export([
@@ -86,10 +93,15 @@
deinit_schema/0
]).
--type group() :: binary().
+-export_type([dest/0]).
+-type group() :: binary().
-type dest() :: node() | {group(), node()}.
+%% Operation :: {add, ...} | {delete, ...}.
+-type batch() :: #{batch_route() => _Operation :: tuple()}.
+-type batch_route() :: {emqx_types:topic(), dest()}.
+
-record(routeidx, {
entry :: '$1' | emqx_topic_index:key(dest()),
unused = [] :: nil()
@@ -173,12 +185,12 @@ do_add_route(Topic) when is_binary(Topic) ->
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_add_route(Topic, Dest) when is_binary(Topic) ->
ok = emqx_router_helper:monitor(Dest),
- mria_insert_route(get_schema_vsn(), Topic, Dest).
+ mria_insert_route(get_schema_vsn(), Topic, Dest, single).
-mria_insert_route(v2, Topic, Dest) ->
- mria_insert_route_v2(Topic, Dest);
-mria_insert_route(v1, Topic, Dest) ->
- mria_insert_route_v1(Topic, Dest).
+mria_insert_route(v2, Topic, Dest, Ctx) ->
+ mria_insert_route_v2(Topic, Dest, Ctx);
+mria_insert_route(v1, Topic, Dest, Ctx) ->
+ mria_insert_route_v1(Topic, Dest, Ctx).
%% @doc Take a real topic (not filter) as input, return the matching topics and topic
%% filters associated with route destination.
@@ -225,12 +237,35 @@ do_delete_route(Topic) when is_binary(Topic) ->
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_delete_route(Topic, Dest) ->
- mria_delete_route(get_schema_vsn(), Topic, Dest).
+ mria_delete_route(get_schema_vsn(), Topic, Dest, single).
-mria_delete_route(v2, Topic, Dest) ->
- mria_delete_route_v2(Topic, Dest);
-mria_delete_route(v1, Topic, Dest) ->
- mria_delete_route_v1(Topic, Dest).
+mria_delete_route(v2, Topic, Dest, Ctx) ->
+ mria_delete_route_v2(Topic, Dest, Ctx);
+mria_delete_route(v1, Topic, Dest, Ctx) ->
+ mria_delete_route_v1(Topic, Dest, Ctx).
+
+-spec do_batch(batch()) -> #{batch_route() => _Error}.
+do_batch(Batch) ->
+ mria_batch(get_schema_vsn(), Batch).
+
+mria_batch(v2, Batch) ->
+ mria_batch_v2(Batch);
+mria_batch(v1, Batch) ->
+ mria_batch_v1(Batch).
+
+mria_batch_v2(Batch) ->
+ mria:async_dirty(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v2, Batch]).
+
+mria_batch_v1(Batch) ->
+ case mria:transaction(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v1, Batch]) of
+ {atomic, Result} ->
+ Result;
+ Error ->
+ Error
+ end.
+
+batch_get_action(Op) ->
+ element(1, Op).
-spec select(Spec, _Limit :: pos_integer(), Continuation) ->
{[emqx_types:route()], Continuation} | '$end_of_table'
@@ -301,47 +336,79 @@ call(Router, Msg) ->
pick(Topic) ->
gproc_pool:pick_worker(router_pool, Topic).
+%%--------------------------------------------------------------------
+%% Route batch RPC targets
+%%--------------------------------------------------------------------
+
+-spec mria_batch_run(schemavsn(), batch()) -> #{batch_route() => _Error}.
+mria_batch_run(SchemaVsn, Batch) ->
+ maps:fold(
+ fun({Topic, Dest}, Op, Errors) ->
+ case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of
+ ok ->
+ Errors;
+ Error ->
+ Errors#{{Topic, Dest} => Error}
+ end
+ end,
+ #{},
+ Batch
+ ).
+
+mria_batch_operation(SchemaVsn, add, Topic, Dest) ->
+ mria_insert_route(SchemaVsn, Topic, Dest, batch);
+mria_batch_operation(SchemaVsn, delete, Topic, Dest) ->
+ mria_delete_route(SchemaVsn, Topic, Dest, batch).
+
%%--------------------------------------------------------------------
%% Schema v1
%% --------------------------------------------------------------------
-mria_insert_route_v1(Topic, Dest) ->
+mria_insert_route_v1(Topic, Dest, Ctx) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
- mria_route_tab_insert_update_trie(Route);
+ mria_route_tab_insert_update_trie(Route, Ctx);
false ->
- mria_route_tab_insert(Route)
+ mria_route_tab_insert(Route, Ctx)
end.
-mria_route_tab_insert_update_trie(Route) ->
+mria_route_tab_insert_update_trie(Route, single) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:insert_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
- ).
+ );
+mria_route_tab_insert_update_trie(Route, batch) ->
+ emqx_router_utils:insert_trie_route(?ROUTE_TAB, Route).
-mria_route_tab_insert(Route) ->
- mria:dirty_write(?ROUTE_TAB, Route).
+mria_route_tab_insert(Route, single) ->
+ mria:dirty_write(?ROUTE_TAB, Route);
+mria_route_tab_insert(Route, batch) ->
+ mnesia:write(?ROUTE_TAB, Route, write).
-mria_delete_route_v1(Topic, Dest) ->
+mria_delete_route_v1(Topic, Dest, Ctx) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
- mria_route_tab_delete_update_trie(Route);
+ mria_route_tab_delete_update_trie(Route, Ctx);
false ->
- mria_route_tab_delete(Route)
+ mria_route_tab_delete(Route, Ctx)
end.
-mria_route_tab_delete_update_trie(Route) ->
+mria_route_tab_delete_update_trie(Route, single) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:delete_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
- ).
+ );
+mria_route_tab_delete_update_trie(Route, batch) ->
+ emqx_router_utils:delete_trie_route(?ROUTE_TAB, Route).
-mria_route_tab_delete(Route) ->
- mria:dirty_delete_object(?ROUTE_TAB, Route).
+mria_route_tab_delete(Route, single) ->
+ mria:dirty_delete_object(?ROUTE_TAB, Route);
+mria_route_tab_delete(Route, batch) ->
+ mnesia:delete_object(?ROUTE_TAB, Route, write).
match_routes_v1(Topic) ->
lookup_route_tab(Topic) ++
@@ -410,24 +477,34 @@ fold_routes_v1(FunName, FoldFun, AccIn) ->
%% topics. Writes go to only one of the two tables at a time.
%% --------------------------------------------------------------------
-mria_insert_route_v2(Topic, Dest) ->
+mria_insert_route_v2(Topic, Dest, Ctx) ->
case emqx_trie_search:filter(Topic) of
Words when is_list(Words) ->
K = emqx_topic_index:make_key(Words, Dest),
- mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
+ mria_filter_tab_insert(K, Ctx);
false ->
- mria_route_tab_insert(#route{topic = Topic, dest = Dest})
+ mria_route_tab_insert(#route{topic = Topic, dest = Dest}, Ctx)
end.
-mria_delete_route_v2(Topic, Dest) ->
+mria_filter_tab_insert(K, single) ->
+ mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
+mria_filter_tab_insert(K, batch) ->
+ mnesia:write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}, write).
+
+mria_delete_route_v2(Topic, Dest, Ctx) ->
case emqx_trie_search:filter(Topic) of
Words when is_list(Words) ->
K = emqx_topic_index:make_key(Words, Dest),
- mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+ mria_filter_tab_delete(K, Ctx);
false ->
- mria_route_tab_delete(#route{topic = Topic, dest = Dest})
+ mria_route_tab_delete(#route{topic = Topic, dest = Dest}, Ctx)
end.
+mria_filter_tab_delete(K, single) ->
+ mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+mria_filter_tab_delete(K, batch) ->
+ mnesia:delete(?ROUTE_TAB_FILTERS, K, write).
+
match_routes_v2(Topic) ->
lookup_route_tab(Topic) ++
[match_to_route(M) || M <- match_filters(Topic)].
diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl
new file mode 100644
index 000000000..dccc681c8
--- /dev/null
+++ b/apps/emqx/src/emqx_router_syncer.erl
@@ -0,0 +1,410 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_router_syncer).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/2]).
+
+-export([push/4]).
+-export([wait/1]).
+
+-export([stats/0]).
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2
+]).
+
+-type action() :: add | delete.
+
+-define(POOL, router_syncer_pool).
+
+-define(MAX_BATCH_SIZE, 1000).
+
+%% How long to idle (ms) after receiving a new operation before triggering batch sync?
+%% Zero effectively just schedules out the process, so that it has a chance to receive
+%% more operations, and introduce no minimum delay.
+-define(MIN_SYNC_INTERVAL, 0).
+
+%% How long (ms) to idle after observing a batch sync error?
+%% Should help to avoid excessive retries in situations when errors are caused by
+%% conditions that take some time to resolve (e.g. restarting an upstream core node).
+-define(ERROR_DELAY, 10).
+
+%% How soon (ms) to retry last failed batch sync attempt?
+%% Only matter in absence of new operations, otherwise batch sync is triggered as
+%% soon as `?ERROR_DELAY` is over.
+-define(ERROR_RETRY_INTERVAL, 500).
+
+-define(PRIO_HI, 1).
+-define(PRIO_LO, 2).
+-define(PRIO_BG, 3).
+
+-define(PUSH(PRIO, OP), {PRIO, OP}).
+-define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}).
+
+-define(ROUTEOP(ACT), {ACT, _, _}).
+-define(ROUTEOP(ACT, PRIO), {ACT, PRIO, _}).
+-define(ROUTEOP(ACT, PRIO, CTX), {ACT, PRIO, CTX}).
+
+-ifdef(TEST).
+-undef(MAX_BATCH_SIZE).
+-undef(MIN_SYNC_INTERVAL).
+-define(MAX_BATCH_SIZE, 40).
+-define(MIN_SYNC_INTERVAL, 10).
+-endif.
+
+%%
+
+-spec start_link(atom(), pos_integer()) ->
+ {ok, pid()}.
+start_link(Pool, Id) ->
+ gen_server:start_link(
+ {local, emqx_utils:proc_name(?MODULE, Id)},
+ ?MODULE,
+ [Pool, Id],
+ []
+ ).
+
+-spec push(action(), emqx_types:topic(), emqx_router:dest(), Opts) ->
+ ok | _WaitRef :: reference()
+when
+ Opts :: #{reply => pid()}.
+push(Action, Topic, Dest, Opts) ->
+ Worker = gproc_pool:pick_worker(?POOL, Topic),
+ Prio = designate_prio(Action, Opts),
+ Context = mk_push_context(Opts),
+ _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})),
+ case Context of
+ {MRef, _} ->
+ MRef;
+ [] ->
+ ok
+ end.
+
+-spec wait(_WaitRef :: reference()) ->
+ ok | {error, _Reason}.
+wait(MRef) ->
+ %% NOTE
+ %% No timeouts here because (as in `emqx_broker:call/2` case) callers do not
+ %% really expect this to fail with timeout exception. However, waiting
+ %% indefinitely is not the best option since it blocks the caller from receiving
+ %% other messages, so for instance channel (connection) process may not be able
+ %% to react to socket close event in time. Better option would probably be to
+ %% introduce cancellable operation, which will be able to check if the caller
+ %% would still be interested in the result.
+ receive
+ {MRef, Result} ->
+ Result
+ end.
+
+designate_prio(_, #{reply := _To}) ->
+ ?PRIO_HI;
+designate_prio(add, #{}) ->
+ ?PRIO_LO;
+designate_prio(delete, #{}) ->
+ ?PRIO_BG.
+
+mk_push_context(#{reply := To}) ->
+ MRef = erlang:make_ref(),
+ {MRef, To};
+mk_push_context(_) ->
+ [].
+
+%%
+
+-type stats() :: #{
+ size := non_neg_integer(),
+ n_add := non_neg_integer(),
+ n_delete := non_neg_integer(),
+ prio_highest := non_neg_integer() | undefined,
+ prio_lowest := non_neg_integer() | undefined
+}.
+
+-spec stats() -> [stats()].
+stats() ->
+ Workers = gproc_pool:active_workers(?POOL),
+ [gen_server:call(Pid, stats, infinity) || {_Name, Pid} <- Workers].
+
+%%
+
+init([Pool, Id]) ->
+ true = gproc_pool:connect_worker(Pool, {Pool, Id}),
+ {ok, #{stash => stash_new()}}.
+
+handle_call(stats, _From, State = #{stash := Stash}) ->
+ {reply, stash_stats(Stash), State};
+handle_call(_Call, _From, State) ->
+ {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({timeout, _TRef, retry}, State) ->
+ NState = run_batch_loop([], maps:remove(retry_timer, State)),
+ {noreply, NState};
+handle_info(Push = ?PUSH(_, _), State) ->
+ %% NOTE: Wait a bit to collect potentially overlapping operations.
+ ok = timer:sleep(?MIN_SYNC_INTERVAL),
+ NState = run_batch_loop([Push], State),
+ {noreply, NState}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+%%
+
+run_batch_loop(Incoming, State = #{stash := Stash0}) ->
+ Stash1 = stash_add(Incoming, Stash0),
+ Stash2 = stash_drain(Stash1),
+ {Batch, Stash3} = mk_batch(Stash2),
+ ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)),
+ case run_batch(Batch) of
+ Status = #{} ->
+ ok = send_replies(Status, Batch),
+ NState = cancel_retry_timer(State#{stash := Stash3}),
+ %% NOTE
+ %% We could postpone batches where only `?PRIO_BG` operations left, which
+ %% would allow to do less work in situations when there are intermittently
+ %% reconnecting clients with moderately unique subscriptions. However, this
+ %% would also require us to forego the idempotency of batch syncs (see
+ %% `merge_route_op/2`).
+ case is_stash_empty(Stash3) of
+ true ->
+ NState;
+ false ->
+ run_batch_loop([], NState)
+ end;
+ BatchError ->
+ ?SLOG(warning, #{
+ msg => "router_batch_sync_failed",
+ reason => BatchError,
+ batch => batch_stats(Batch, Stash3)
+ }),
+ NState = State#{stash := Stash2},
+ ok = timer:sleep(?ERROR_DELAY),
+ ensure_retry_timer(NState)
+ end.
+
+ensure_retry_timer(State = #{retry_timer := _TRef}) ->
+ State;
+ensure_retry_timer(State) ->
+ TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry),
+ State#{retry_timer => TRef}.
+
+cancel_retry_timer(State = #{retry_timer := TRef}) ->
+ ok = emqx_utils:cancel_timer(TRef),
+ maps:remove(retry_timer, State);
+cancel_retry_timer(State) ->
+ State.
+
+%%
+
+mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE ->
+ %% This is perfect situation, we just use stash as batch w/o extra reallocations.
+ {Stash, stash_new()};
+mk_batch(Stash) ->
+ %% Take a subset of stashed operations to form a batch.
+ %% Note that stash is an unordered map, it's not a queue. The order of operations is
+ %% not preserved strictly, only loosely, because of how we start from high priority
+ %% operations and go down to low priority ones. This might cause some operations to
+ %% stay in stash for unfairly long time, when there are many high priority operations.
+ %% However, it's unclear how likely this is to happen in practice.
+ mk_batch(Stash, ?MAX_BATCH_SIZE).
+
+mk_batch(Stash, BatchSize) ->
+ mk_batch(?PRIO_HI, #{}, BatchSize, Stash).
+
+mk_batch(Prio, Batch, SizeLeft, Stash) ->
+ mk_batch(Prio, Batch, SizeLeft, Stash, maps:iterator(Stash)).
+
+mk_batch(Prio, Batch, SizeLeft, Stash, It) when SizeLeft > 0 ->
+ %% Iterating over stash, only taking operations with priority equal to `Prio`.
+ case maps:next(It) of
+ {Route, Op = ?ROUTEOP(_Action, Prio), NIt} ->
+ NBatch = Batch#{Route => Op},
+ NStash = maps:remove(Route, Stash),
+ mk_batch(Prio, NBatch, SizeLeft - 1, NStash, NIt);
+ {_Route, _Op, NIt} ->
+ %% This is lower priority operation, skip it.
+ mk_batch(Prio, Batch, SizeLeft, Stash, NIt);
+ none ->
+ %% No more operations with priority `Prio`, go to the next priority level.
+ true = Prio < ?PRIO_BG,
+ mk_batch(Prio + 1, Batch, SizeLeft, Stash)
+ end;
+mk_batch(_Prio, Batch, _, Stash, _It) ->
+ {Batch, Stash}.
+
+send_replies(Errors, Batch) ->
+ maps:foreach(
+ fun(Route, {_Action, _Prio, Ctx}) ->
+ case Ctx of
+ [] ->
+ ok;
+ _ ->
+ replyctx_send(maps:get(Route, Errors, ok), Ctx)
+ end
+ end,
+ Batch
+ ).
+
+replyctx_send(_Result, []) ->
+ noreply;
+replyctx_send(Result, {MRef, Pid}) ->
+ _ = erlang:send(Pid, {MRef, Result}),
+ ok.
+
+%%
+
+run_batch(Batch) when map_size(Batch) > 0 ->
+ catch emqx_router:do_batch(Batch);
+run_batch(_Empty) ->
+ #{}.
+
+%%
+
+stash_new() ->
+ #{}.
+
+is_stash_empty(Stash) ->
+ maps:size(Stash) =:= 0.
+
+stash_drain(Stash) ->
+ receive
+ ?PUSH(Prio, Op) ->
+ stash_drain(stash_add(Prio, Op, Stash))
+ after 0 ->
+ Stash
+ end.
+
+stash_add(Pushes, Stash) ->
+ lists:foldl(
+ fun(?PUSH(Prio, Op), QAcc) -> stash_add(Prio, Op, QAcc) end,
+ Stash,
+ Pushes
+ ).
+
+stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) ->
+ Route = {Topic, Dest},
+ case maps:get(Route, Stash, undefined) of
+ undefined ->
+ Stash#{Route => {Action, Prio, Ctx}};
+ RouteOp ->
+ RouteOpMerged = merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)),
+ Stash#{Route := RouteOpMerged}
+ end.
+
+merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) ->
+ %% NOTE: This should not happen anyway.
+ _ = replyctx_send(ignored, Ctx1),
+ DestOp;
+merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) ->
+ %% NOTE: Latter cancel the former.
+ %% Strictly speaking, in ideal conditions we could just cancel both, because they
+ %% essentially do not change the global state. However, we decided to stay on the
+ %% safe side and cancel only the former, making batch syncs idempotent.
+ _ = replyctx_send(ok, Ctx1),
+ DestOp.
+
+%%
+
+batch_stats(Batch, Stash) ->
+ BatchStats = stash_stats(Batch),
+ BatchStats#{
+ stashed => maps:size(Stash)
+ }.
+
+stash_stats(Stash) ->
+ #{
+ size => maps:size(Stash),
+ n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Stash)),
+ n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Stash)),
+ prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Stash),
+ prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Stash)
+ }.
+
+%%
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+batch_test() ->
+ Dest = node(),
+ Ctx = fun(N) -> {N, self()} end,
+ Stash = stash_add(
+ [
+ ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))),
+ ?PUSH(?PRIO_HI, ?OP(add, <<"t/1">>, Dest, Ctx(2))),
+ ?PUSH(?PRIO_LO, ?OP(add, <<"t/1">>, Dest, Ctx(3))),
+ ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(4))),
+ ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(5))),
+ ?PUSH(?PRIO_HI, ?OP(add, <<"t/4">>, Dest, Ctx(6))),
+ ?PUSH(?PRIO_LO, ?OP(delete, <<"t/3">>, Dest, Ctx(7))),
+ ?PUSH(?PRIO_BG, ?OP(delete, <<"t/3">>, Dest, Ctx(8))),
+ ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(9))),
+ ?PUSH(?PRIO_BG, ?OP(delete, <<"old/1">>, Dest, Ctx(10))),
+ ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(11))),
+ ?PUSH(?PRIO_BG, ?OP(delete, <<"old/2">>, Dest, Ctx(12))),
+ ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(13))),
+ ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(14))),
+ ?PUSH(?PRIO_LO, ?OP(delete, <<"old/3">>, Dest, Ctx(15))),
+ ?PUSH(?PRIO_LO, ?OP(delete, <<"t/2">>, Dest, Ctx(16)))
+ ],
+ stash_new()
+ ),
+ {Batch, StashLeft} = mk_batch(Stash, 5),
+ ?assertMatch(
+ #{
+ {<<"t/1">>, Dest} := {add, ?PRIO_LO, _},
+ {<<"t/3">>, Dest} := {add, ?PRIO_HI, _},
+ {<<"t/2">>, Dest} := {delete, ?PRIO_LO, _},
+ {<<"t/4">>, Dest} := {add, ?PRIO_HI, _},
+ {<<"old/3">>, Dest} := {delete, ?PRIO_LO, _}
+ },
+ Batch
+ ),
+ ?assertMatch(
+ #{
+ {<<"old/1">>, Dest} := {delete, ?PRIO_BG, _},
+ {<<"old/2">>, Dest} := {delete, ?PRIO_BG, _}
+ },
+ StashLeft
+ ),
+ ?assertEqual(
+ [
+ {2, ignored},
+ {1, ok},
+ {5, ok},
+ {7, ignored},
+ {4, ok},
+ {9, ok},
+ {8, ok},
+ {13, ignored},
+ {11, ok}
+ ],
+ emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))
+ ).
+
+-endif.
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index e972c57e0..1dd0a55ed 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1404,6 +1404,24 @@ fields("broker_routing") ->
'readOnly' => true,
desc => ?DESC(broker_routing_storage_schema)
}
+ )},
+ {"batch_sync",
+ sc(
+ ref("broker_routing_batch_sync"),
+ #{importance => ?IMPORTANCE_HIDDEN}
+ )}
+ ];
+fields("broker_routing_batch_sync") ->
+ [
+ {"enable_on",
+ sc(
+ hoconsc:enum([none, core, replicant, all]),
+ #{
+ %% TODO
+ %% Make `replicant` the default value after initial release.
+ default => none,
+ desc => ?DESC(broker_routing_batch_sync_enable_on)
+ }
)}
];
fields("shared_subscription_group") ->
diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl
index 042ef91db..fbb9da595 100644
--- a/apps/emqx/test/emqx_cth_suite.erl
+++ b/apps/emqx/test/emqx_cth_suite.erl
@@ -72,6 +72,7 @@
-export([stop_apps/1]).
-export([merge_appspec/2]).
+-export([merge_config/2]).
%% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs
-export([schema_module/0, upgrade_raw_conf/1]).
diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl
index 10ca866fe..96ca0b297 100644
--- a/apps/emqx/test/emqx_routing_SUITE.erl
+++ b/apps/emqx/test/emqx_routing_SUITE.erl
@@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_router.hrl").
@@ -33,27 +34,80 @@ all() ->
].
groups() ->
- TCs = [
+ GroupVsn = [
+ {group, batch_sync_on},
+ {group, batch_sync_replicants},
+ {group, batch_sync_off}
+ ],
+ ClusterTCs = [
t_cluster_routing,
t_slow_rlog_routing_consistency
],
+ SingleTCs = [t_concurrent_routing_updates],
+ BatchSyncTCs = lists:duplicate(5, t_concurrent_routing_updates_with_errors),
[
- {routing_schema_v1, [], TCs},
- {routing_schema_v2, [], TCs}
+ {routing_schema_v1, [], GroupVsn},
+ {routing_schema_v2, [], GroupVsn},
+ {batch_sync_on, [], [{group, cluster}, {group, single_batch_on}]},
+ {batch_sync_replicants, [], [{group, cluster}, {group, single}]},
+ {batch_sync_off, [], [{group, cluster}, {group, single}]},
+ {cluster, [], ClusterTCs},
+ {single_batch_on, [], SingleTCs ++ BatchSyncTCs},
+ {single, [], SingleTCs}
].
-init_per_group(GroupName, Config) ->
- WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
+init_per_group(routing_schema_v1, Config) ->
+ [{emqx_config, "broker.routing.storage_schema = v1"} | Config];
+init_per_group(routing_schema_v2, Config) ->
+ [{emqx_config, "broker.routing.storage_schema = v2"} | Config];
+init_per_group(batch_sync_on, Config) ->
+ [{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config];
+init_per_group(batch_sync_replicants, Config) ->
+ [{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
+init_per_group(batch_sync_off, Config) ->
+ [{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config];
+init_per_group(cluster, Config) ->
+ WorkDir = emqx_cth_suite:work_dir(Config),
NodeSpecs = [
- {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}},
- {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}},
- {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}}
+ {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
+ {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}},
+ {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
],
Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
- [{cluster, Nodes} | Config].
+ [{cluster, Nodes} | Config];
+init_per_group(GroupName, Config) when
+ GroupName =:= single_batch_on;
+ GroupName =:= single
+->
+ WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
+ Apps = emqx_cth_suite:start(
+ [
+ {emqx, #{
+ config => mk_config_broker(Config),
+ %% NOTE
+ %% Artificially increasing pool workers contention by forcing small pool size.
+ before_start => fun() ->
+ % NOTE
+ % This one is actually defined on `emqx_conf_schema` level, but used
+ % in `emqx_broker`. Thus we have to resort to this ugly hack.
+ emqx_config:force_put([node, broker_pool_size], 2),
+ emqx_app:set_config_loader(?MODULE)
+ end
+ }}
+ ],
+ #{work_dir => WorkDir}
+ ),
+ [{group_apps, Apps} | Config].
-end_per_group(_GroupName, Config) ->
- emqx_cth_cluster:stop(?config(cluster, Config)).
+end_per_group(cluster, Config) ->
+ emqx_cth_cluster:stop(?config(cluster, Config));
+end_per_group(GroupName, Config) when
+ GroupName =:= single_batch_on;
+ GroupName =:= single
+->
+ emqx_cth_suite:stop(?config(group_apps, Config));
+end_per_group(_, _Config) ->
+ ok.
init_per_testcase(TC, Config) ->
emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
@@ -61,9 +115,9 @@ init_per_testcase(TC, Config) ->
end_per_testcase(TC, Config) ->
emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
-mk_emqx_appspec(GroupName, N) ->
+mk_emqx_appspec(N, Config) ->
{emqx, #{
- config => mk_config(GroupName, N),
+ config => mk_config(N, Config),
after_start => fun() ->
% NOTE
% This one is actually defined on `emqx_conf_schema` level, but used
@@ -77,24 +131,28 @@ mk_genrpc_appspec() ->
override_env => [{port_discovery, stateless}]
}}.
-mk_config(GroupName, N) ->
- #{
- broker => mk_config_broker(GroupName),
- listeners => mk_config_listeners(N)
- }.
+mk_config(N, ConfigOrVsn) ->
+ emqx_cth_suite:merge_config(
+ mk_config_broker(ConfigOrVsn),
+ mk_config_listeners(N)
+ ).
-mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 ->
- #{routing => #{storage_schema => v1}};
-mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 ->
- #{routing => #{storage_schema => v2}}.
+mk_config_broker(v1) ->
+ "broker.routing.storage_schema = v1";
+mk_config_broker(v2) ->
+ "broker.routing.storage_schema = v2";
+mk_config_broker(CTConfig) ->
+ string:join(proplists:get_all_values(emqx_config, CTConfig), "\n").
mk_config_listeners(N) ->
Port = 1883 + N,
#{
- tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}},
- ssl => #{default => #{enable => false}},
- ws => #{default => #{enable => false}},
- wss => #{default => #{enable => false}}
+ listeners => #{
+ tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}},
+ ssl => #{default => #{enable => false}},
+ ws => #{default => #{enable => false}},
+ wss => #{default => #{enable => false}}
+ }
}.
%%
@@ -182,6 +240,169 @@ unsubscribe(C, Topic) ->
%%
+-define(SUBSCRIBE_TOPICS, [
+ <<"t/#">>,
+ <<"t/fixed">>,
+ <<"t/1/+">>,
+ <<"t/2/+">>,
+ <<"t/42/+/+">>,
+ <<"client/${i}/+">>,
+ <<"client/${i}/fixed">>,
+ <<"client/${i}/#">>,
+ <<"rand/${r}/+">>,
+ <<"rand/${r}/fixed">>
+]).
+
+t_concurrent_routing_updates(init, Config) ->
+ ok = snabbkaffe:start_trace(),
+ Config;
+t_concurrent_routing_updates('end', _Config) ->
+ ok = snabbkaffe:stop().
+
+t_concurrent_routing_updates(_Config) ->
+ NClients = 400,
+ NRTopics = 250,
+ MCommands = 8,
+ Port = get_mqtt_tcp_port(node()),
+ Clients = [
+ spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics])
+ || I <- lists:seq(1, NClients)
+ ],
+ ok = lists:foreach(fun ping_concurrent_client/1, Clients),
+ ok = timer:sleep(200),
+ Subscribers = ets:tab2list(?SUBSCRIBER),
+ Topics = maps:keys(maps:from_list(Subscribers)),
+ ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())),
+ ok = lists:foreach(fun stop_concurrent_client/1, Clients),
+ ok = timer:sleep(1000),
+ ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]),
+ ?assertEqual([], ets:tab2list(?SUBSCRIBER)),
+ ?assertEqual([], emqx_router:topics()).
+
+t_concurrent_routing_updates_with_errors(init, Config) ->
+ ok = snabbkaffe:start_trace(),
+ ok = meck:new(emqx_router, [passthrough, no_history]),
+ Config;
+t_concurrent_routing_updates_with_errors('end', _Config) ->
+ ok = meck:unload(emqx_router),
+ ok = snabbkaffe:stop().
+
+t_concurrent_routing_updates_with_errors(_Config) ->
+ NClients = 100,
+ NRTopics = 80,
+ MCommands = 6,
+ PSyncError = 0.1,
+ Port = get_mqtt_tcp_port(node()),
+ %% Crash the batch sync operation with some small probability.
+ ok = meck:expect(emqx_router, mria_batch_run, fun(Vsn, Batch) ->
+ case rand:uniform() < PSyncError of
+ false -> meck:passthrough([Vsn, Batch]);
+ true -> error(overload)
+ end
+ end),
+ Clients = [
+ spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics])
+ || I <- lists:seq(1, NClients)
+ ],
+ ok = lists:foreach(fun ping_concurrent_client/1, Clients),
+ 0 = ?retry(
+ _Interval = 500,
+ _NTimes = 10,
+ 0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()])
+ ),
+ Subscribers = ets:tab2list(?SUBSCRIBER),
+ Topics = maps:keys(maps:from_list(Subscribers)),
+ ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())),
+ ok = lists:foreach(fun stop_concurrent_client/1, Clients),
+ ok = timer:sleep(100),
+ 0 = ?retry(
+ 500,
+ 10,
+ 0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()])
+ ),
+ ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]),
+ ?assertEqual([], ets:tab2list(?SUBSCRIBER)),
+ ?assertEqual([], emqx_router:topics()).
+
+run_concurrent_client(I, Port, MCommands, NRTopics) ->
+ % _ = rand:seed(default, I),
+ Ctx = #{
+ i => I,
+ r => rand:uniform(NRTopics)
+ },
+ {ok, C} = emqtt:start_link(#{port => Port, clientid => render("client:${i}", Ctx)}),
+ {ok, _Props} = emqtt:connect(C),
+ NCommands = rand:uniform(MCommands),
+ Commands = gen_concurrent_client_plan(NCommands, Ctx),
+ ok = subscribe_concurrent_client(C, Commands),
+ run_concurrent_client_loop(C).
+
+gen_concurrent_client_plan(N, Ctx) ->
+ lists:foldl(
+ fun(_, Acc) -> mixin(pick_random_command(Ctx), Acc) end,
+ [],
+ lists:seq(1, N)
+ ).
+
+subscribe_concurrent_client(C, Commands) ->
+ lists:foreach(
+ fun
+ ({subscribe, Topic}) ->
+ {ok, _Props, [0]} = emqtt:subscribe(C, Topic);
+ ({unsubscribe, Topic}) ->
+ {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic)
+ end,
+ Commands
+ ).
+
+pick_random_command(Ctx) ->
+ Topic = render(randpick(?SUBSCRIBE_TOPICS), Ctx),
+ randpick([
+ [{subscribe, Topic}],
+ [{subscribe, Topic}, {unsubscribe, Topic}]
+ ]).
+
+render(Template, Ctx) ->
+ iolist_to_binary(emqx_template:render_strict(emqx_template:parse(Template), Ctx)).
+
+run_concurrent_client_loop(C) ->
+ receive
+ {From, Ref, F} ->
+ Reply = F(C),
+ From ! {Ref, Reply},
+ run_concurrent_client_loop(C)
+ end.
+
+ping_concurrent_client(Pid) ->
+ Ref = make_ref(),
+ Pid ! {self(), Ref, fun emqtt:ping/1},
+ receive
+ {Ref, Reply} -> Reply
+ after 5000 ->
+ error(timeout)
+ end.
+
+stop_concurrent_client(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ true = erlang:unlink(Pid),
+ true = erlang:exit(Pid, shutdown),
+ receive
+ {'DOWN', MRef, process, Pid, Reason} -> Reason
+ end.
+
+randpick(List) ->
+ lists:nth(rand:uniform(length(List)), List).
+
+mixin(L = [H | T], Into = [HInto | TInto]) ->
+ case rand:uniform(length(Into) + 1) of
+ 1 -> [H | mixin(T, Into)];
+ _ -> [HInto | mixin(L, TInto)]
+ end;
+mixin(L, Into) ->
+ L ++ Into.
+
+%%
+
t_routing_schema_switch_v1(Config) ->
WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
t_routing_schema_switch(_From = v2, _To = v1, WorkDir).
@@ -195,7 +416,7 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
[Node1] = emqx_cth_cluster:start(
[
{routing_schema_switch1, #{
- apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)]
+ apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, VTo)]
}}
],
#{work_dir => WorkDir}
@@ -208,12 +429,12 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
[Node2, Node3] = emqx_cth_cluster:start(
[
{routing_schema_switch2, #{
- apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)],
+ apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, VFrom)],
base_port => 20000,
join_to => Node1
}},
{routing_schema_switch3, #{
- apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)],
+ apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, VFrom)],
base_port => 20100,
join_to => Node1
}}
diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl
index 79ce5ce7b..21321400d 100644
--- a/apps/emqx_utils/src/emqx_utils_stream.erl
+++ b/apps/emqx_utils/src/emqx_utils_stream.erl
@@ -20,6 +20,7 @@
-export([
empty/0,
list/1,
+ mqueue/1,
map/2,
chain/2
]).
@@ -59,6 +60,18 @@ list([]) ->
list([X | Rest]) ->
fun() -> [X | list(Rest)] end.
+%% @doc Make a stream out of process message queue.
+-spec mqueue(timeout()) -> stream(any()).
+mqueue(Timeout) ->
+ fun() ->
+ receive
+ X ->
+ [X | mqueue(Timeout)]
+ after Timeout ->
+ []
+ end
+ end.
+
%% @doc Make a stream by applying a function to each element of the underlying stream.
-spec map(fun((X) -> Y), stream(X)) -> stream(Y).
map(F, S) ->
diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl
index 4a48ae45d..ef8185a94 100644
--- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl
+++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl
@@ -73,3 +73,12 @@ chain_list_map_test() ->
["1", "2", "3", "4", "5", "6"],
emqx_utils_stream:consume(S)
).
+
+mqueue_test() ->
+ _ = erlang:send_after(1, self(), 1),
+ _ = erlang:send_after(100, self(), 2),
+ _ = erlang:send_after(20, self(), 42),
+ ?assertEqual(
+ [1, 42, 2],
+ emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
+ ).
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index 84305317f..fe315b5d7 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -1541,6 +1541,15 @@ Set v1
to use the former schema.
NOTE: Schema v2
is still experimental.
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
+broker_routing_batch_sync_enable_on.desc:
+"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner.
+Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.
+The selected value determines which nodes in the cluster will have this feature enabled.
+- all
: enables it unconditionally on each node,
+- replicant
: enables it only on replicants (e.g. those where node.role = replicant
),
+- core
: enables it only on core nodes,
+- none
: disables this altogether."""
+
broker_perf_trie_compaction.desc:
"""Enable trie path compaction.
Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber.