From 2dffd44985e2290a391e36225e973777938dfc1b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 20:08:40 +0100 Subject: [PATCH] feat(route-sync): allow to enable syncer pool only on cores/replicants So we would able to roll it out and test more gradually. --- apps/emqx/src/emqx_broker.erl | 34 +++++++++++++++++++-------- apps/emqx/src/emqx_schema.erl | 10 ++++---- apps/emqx/test/emqx_routing_SUITE.erl | 8 +++++-- rel/i18n/emqx_schema.hocon | 9 +++++-- 4 files changed, 43 insertions(+), 18 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 4851a20ea..7dd3d08be 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -604,21 +604,35 @@ do_dispatch({shard, I}, Topic, Msg) -> %% maybe_add_route(_Existed = false, Topic, ReplyTo) -> - add_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic, ReplyTo); + sync_route(add, Topic, #{reply => ReplyTo}); maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. -add_route(_BatchSync = true, Topic, ReplyTo) -> - emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); -add_route(_BatchSync = false, Topic, _ReplyTo) -> - emqx_router:do_add_route(Topic, node()). - maybe_delete_route(_Exists = false, Topic) -> - delete_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic); + sync_route(delete, Topic, #{}); maybe_delete_route(_Exists = true, _Topic) -> ok. -delete_route(_BatchSync = true, Topic) -> - emqx_router_syncer:push(delete, Topic, node(), #{}); -delete_route(_BatchSync = false, Topic) -> +sync_route(Action, Topic, ReplyTo) -> + EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]), + case EnabledOn of + all -> + push_sync_route(Action, Topic, ReplyTo); + none -> + regular_sync_route(Action, Topic); + Role -> + case mria_config:whoami() of + Role -> + push_sync_route(Action, Topic, ReplyTo); + _Disabled -> + regular_sync_route(Action, Topic) + end + end. + +push_sync_route(Action, Topic, Opts) -> + emqx_router_syncer:push(Action, Topic, node(), Opts). + +regular_sync_route(add, Topic) -> + emqx_router:do_add_route(Topic, node()); +regular_sync_route(delete, Topic) -> emqx_router:do_delete_route(Topic, node()). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 14eef30d3..1dd0a55ed 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1413,12 +1413,14 @@ fields("broker_routing") -> ]; fields("broker_routing_batch_sync") -> [ - {"enable", + {"enable_on", sc( - boolean(), + hoconsc:enum([none, core, replicant, all]), #{ - default => false, - desc => ?DESC(broker_routing_batch_sync_enabled) + %% TODO + %% Make `replicant` the default value after initial release. + default => none, + desc => ?DESC(broker_routing_batch_sync_enable_on) } )} ]; diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 9753e1d9b..70b4eaf51 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -36,6 +36,7 @@ all() -> groups() -> GroupVsn = [ {group, batch_sync_on}, + {group, batch_sync_replicants}, {group, batch_sync_off} ], GroupBase = [ @@ -50,6 +51,7 @@ groups() -> {routing_schema_v1, [], GroupVsn}, {routing_schema_v2, [], GroupVsn}, {batch_sync_on, [], GroupBase}, + {batch_sync_replicants, [], GroupBase}, {batch_sync_off, [], GroupBase}, {cluster, [], ClusterTCs} ]. @@ -59,9 +61,11 @@ init_per_group(routing_schema_v1, Config) -> init_per_group(routing_schema_v2, Config) -> [{emqx_config, "broker.routing.storage_schema = v2"} | Config]; init_per_group(batch_sync_on, Config) -> - [{emqx_config, "broker.routing.batch_sync.enable = true"} | Config]; + [{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config]; +init_per_group(batch_sync_replicants, Config) -> + [{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config]; init_per_group(batch_sync_off, Config) -> - [{emqx_config, "broker.routing.batch_sync.enable = false"} | Config]; + [{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config]; init_per_group(cluster, Config) -> WorkDir = emqx_cth_suite:work_dir(Config), NodeSpecs = [ diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index bd8909c3d..fe315b5d7 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1541,9 +1541,14 @@ Set v1 to use the former schema. NOTE: Schema v2 is still experimental. NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect.""" -broker_routing_batch_sync_enable.desc: +broker_routing_batch_sync_enable_on.desc: """Use separate process pool to synchronize subscriptions with the global routing table in a batched manner. -Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.""" +Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded. +The selected value determines which nodes in the cluster will have this feature enabled. +- all: enables it unconditionally on each node, +- replicant: enables it only on replicants (e.g. those where node.role = replicant), +- core: enables it only on core nodes, +- none: disables this altogether.""" broker_perf_trie_compaction.desc: """Enable trie path compaction.