diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl
index 4851a20ea..7dd3d08be 100644
--- a/apps/emqx/src/emqx_broker.erl
+++ b/apps/emqx/src/emqx_broker.erl
@@ -604,21 +604,35 @@ do_dispatch({shard, I}, Topic, Msg) ->
%%
maybe_add_route(_Existed = false, Topic, ReplyTo) ->
- add_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic, ReplyTo);
+ sync_route(add, Topic, #{reply => ReplyTo});
maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
ok.
-add_route(_BatchSync = true, Topic, ReplyTo) ->
- emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo});
-add_route(_BatchSync = false, Topic, _ReplyTo) ->
- emqx_router:do_add_route(Topic, node()).
-
maybe_delete_route(_Exists = false, Topic) ->
- delete_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic);
+ sync_route(delete, Topic, #{});
maybe_delete_route(_Exists = true, _Topic) ->
ok.
-delete_route(_BatchSync = true, Topic) ->
- emqx_router_syncer:push(delete, Topic, node(), #{});
-delete_route(_BatchSync = false, Topic) ->
+sync_route(Action, Topic, ReplyTo) ->
+ EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
+ case EnabledOn of
+ all ->
+ push_sync_route(Action, Topic, ReplyTo);
+ none ->
+ regular_sync_route(Action, Topic);
+ Role ->
+ case mria_config:whoami() of
+ Role ->
+ push_sync_route(Action, Topic, ReplyTo);
+ _Disabled ->
+ regular_sync_route(Action, Topic)
+ end
+ end.
+
+push_sync_route(Action, Topic, Opts) ->
+ emqx_router_syncer:push(Action, Topic, node(), Opts).
+
+regular_sync_route(add, Topic) ->
+ emqx_router:do_add_route(Topic, node());
+regular_sync_route(delete, Topic) ->
emqx_router:do_delete_route(Topic, node()).
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 14eef30d3..1dd0a55ed 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1413,12 +1413,14 @@ fields("broker_routing") ->
];
fields("broker_routing_batch_sync") ->
[
- {"enable",
+ {"enable_on",
sc(
- boolean(),
+ hoconsc:enum([none, core, replicant, all]),
#{
- default => false,
- desc => ?DESC(broker_routing_batch_sync_enabled)
+ %% TODO
+ %% Make `replicant` the default value after initial release.
+ default => none,
+ desc => ?DESC(broker_routing_batch_sync_enable_on)
}
)}
];
diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl
index 9753e1d9b..70b4eaf51 100644
--- a/apps/emqx/test/emqx_routing_SUITE.erl
+++ b/apps/emqx/test/emqx_routing_SUITE.erl
@@ -36,6 +36,7 @@ all() ->
groups() ->
GroupVsn = [
{group, batch_sync_on},
+ {group, batch_sync_replicants},
{group, batch_sync_off}
],
GroupBase = [
@@ -50,6 +51,7 @@ groups() ->
{routing_schema_v1, [], GroupVsn},
{routing_schema_v2, [], GroupVsn},
{batch_sync_on, [], GroupBase},
+ {batch_sync_replicants, [], GroupBase},
{batch_sync_off, [], GroupBase},
{cluster, [], ClusterTCs}
].
@@ -59,9 +61,11 @@ init_per_group(routing_schema_v1, Config) ->
init_per_group(routing_schema_v2, Config) ->
[{emqx_config, "broker.routing.storage_schema = v2"} | Config];
init_per_group(batch_sync_on, Config) ->
- [{emqx_config, "broker.routing.batch_sync.enable = true"} | Config];
+ [{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config];
+init_per_group(batch_sync_replicants, Config) ->
+ [{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
init_per_group(batch_sync_off, Config) ->
- [{emqx_config, "broker.routing.batch_sync.enable = false"} | Config];
+ [{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config];
init_per_group(cluster, Config) ->
WorkDir = emqx_cth_suite:work_dir(Config),
NodeSpecs = [
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index bd8909c3d..fe315b5d7 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -1541,9 +1541,14 @@ Set v1
to use the former schema.
NOTE: Schema v2
is still experimental.
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
-broker_routing_batch_sync_enable.desc:
+broker_routing_batch_sync_enable_on.desc:
"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner.
-Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded."""
+Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.
+The selected value determines which nodes in the cluster will have this feature enabled.
+- all
: enables it unconditionally on each node,
+- replicant
: enables it only on replicants (e.g. those where node.role = replicant
),
+- core
: enables it only on core nodes,
+- none
: disables this altogether."""
broker_perf_trie_compaction.desc:
"""Enable trie path compaction.