feat(route-sync): allow to enable syncer pool only on cores/replicants
So we would able to roll it out and test more gradually.
This commit is contained in:
parent
884f784c1c
commit
2dffd44985
|
@ -604,21 +604,35 @@ do_dispatch({shard, I}, Topic, Msg) ->
|
||||||
%%
|
%%
|
||||||
|
|
||||||
maybe_add_route(_Existed = false, Topic, ReplyTo) ->
|
maybe_add_route(_Existed = false, Topic, ReplyTo) ->
|
||||||
add_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic, ReplyTo);
|
sync_route(add, Topic, #{reply => ReplyTo});
|
||||||
maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
|
maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
add_route(_BatchSync = true, Topic, ReplyTo) ->
|
|
||||||
emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo});
|
|
||||||
add_route(_BatchSync = false, Topic, _ReplyTo) ->
|
|
||||||
emqx_router:do_add_route(Topic, node()).
|
|
||||||
|
|
||||||
maybe_delete_route(_Exists = false, Topic) ->
|
maybe_delete_route(_Exists = false, Topic) ->
|
||||||
delete_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic);
|
sync_route(delete, Topic, #{});
|
||||||
maybe_delete_route(_Exists = true, _Topic) ->
|
maybe_delete_route(_Exists = true, _Topic) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
delete_route(_BatchSync = true, Topic) ->
|
sync_route(Action, Topic, ReplyTo) ->
|
||||||
emqx_router_syncer:push(delete, Topic, node(), #{});
|
EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
|
||||||
delete_route(_BatchSync = false, Topic) ->
|
case EnabledOn of
|
||||||
|
all ->
|
||||||
|
push_sync_route(Action, Topic, ReplyTo);
|
||||||
|
none ->
|
||||||
|
regular_sync_route(Action, Topic);
|
||||||
|
Role ->
|
||||||
|
case mria_config:whoami() of
|
||||||
|
Role ->
|
||||||
|
push_sync_route(Action, Topic, ReplyTo);
|
||||||
|
_Disabled ->
|
||||||
|
regular_sync_route(Action, Topic)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
push_sync_route(Action, Topic, Opts) ->
|
||||||
|
emqx_router_syncer:push(Action, Topic, node(), Opts).
|
||||||
|
|
||||||
|
regular_sync_route(add, Topic) ->
|
||||||
|
emqx_router:do_add_route(Topic, node());
|
||||||
|
regular_sync_route(delete, Topic) ->
|
||||||
emqx_router:do_delete_route(Topic, node()).
|
emqx_router:do_delete_route(Topic, node()).
|
||||||
|
|
|
@ -1413,12 +1413,14 @@ fields("broker_routing") ->
|
||||||
];
|
];
|
||||||
fields("broker_routing_batch_sync") ->
|
fields("broker_routing_batch_sync") ->
|
||||||
[
|
[
|
||||||
{"enable",
|
{"enable_on",
|
||||||
sc(
|
sc(
|
||||||
boolean(),
|
hoconsc:enum([none, core, replicant, all]),
|
||||||
#{
|
#{
|
||||||
default => false,
|
%% TODO
|
||||||
desc => ?DESC(broker_routing_batch_sync_enabled)
|
%% Make `replicant` the default value after initial release.
|
||||||
|
default => none,
|
||||||
|
desc => ?DESC(broker_routing_batch_sync_enable_on)
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
|
|
@ -36,6 +36,7 @@ all() ->
|
||||||
groups() ->
|
groups() ->
|
||||||
GroupVsn = [
|
GroupVsn = [
|
||||||
{group, batch_sync_on},
|
{group, batch_sync_on},
|
||||||
|
{group, batch_sync_replicants},
|
||||||
{group, batch_sync_off}
|
{group, batch_sync_off}
|
||||||
],
|
],
|
||||||
GroupBase = [
|
GroupBase = [
|
||||||
|
@ -50,6 +51,7 @@ groups() ->
|
||||||
{routing_schema_v1, [], GroupVsn},
|
{routing_schema_v1, [], GroupVsn},
|
||||||
{routing_schema_v2, [], GroupVsn},
|
{routing_schema_v2, [], GroupVsn},
|
||||||
{batch_sync_on, [], GroupBase},
|
{batch_sync_on, [], GroupBase},
|
||||||
|
{batch_sync_replicants, [], GroupBase},
|
||||||
{batch_sync_off, [], GroupBase},
|
{batch_sync_off, [], GroupBase},
|
||||||
{cluster, [], ClusterTCs}
|
{cluster, [], ClusterTCs}
|
||||||
].
|
].
|
||||||
|
@ -59,9 +61,11 @@ init_per_group(routing_schema_v1, Config) ->
|
||||||
init_per_group(routing_schema_v2, Config) ->
|
init_per_group(routing_schema_v2, Config) ->
|
||||||
[{emqx_config, "broker.routing.storage_schema = v2"} | Config];
|
[{emqx_config, "broker.routing.storage_schema = v2"} | Config];
|
||||||
init_per_group(batch_sync_on, Config) ->
|
init_per_group(batch_sync_on, Config) ->
|
||||||
[{emqx_config, "broker.routing.batch_sync.enable = true"} | Config];
|
[{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config];
|
||||||
|
init_per_group(batch_sync_replicants, Config) ->
|
||||||
|
[{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
|
||||||
init_per_group(batch_sync_off, Config) ->
|
init_per_group(batch_sync_off, Config) ->
|
||||||
[{emqx_config, "broker.routing.batch_sync.enable = false"} | Config];
|
[{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config];
|
||||||
init_per_group(cluster, Config) ->
|
init_per_group(cluster, Config) ->
|
||||||
WorkDir = emqx_cth_suite:work_dir(Config),
|
WorkDir = emqx_cth_suite:work_dir(Config),
|
||||||
NodeSpecs = [
|
NodeSpecs = [
|
||||||
|
|
|
@ -1541,9 +1541,14 @@ Set <code>v1</code> to use the former schema.
|
||||||
NOTE: Schema <code>v2</code> is still experimental.
|
NOTE: Schema <code>v2</code> is still experimental.
|
||||||
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
|
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
|
||||||
|
|
||||||
broker_routing_batch_sync_enable.desc:
|
broker_routing_batch_sync_enable_on.desc:
|
||||||
"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner.
|
"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner.
|
||||||
Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded."""
|
Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.
|
||||||
|
The selected value determines which nodes in the cluster will have this feature enabled.
|
||||||
|
- <code>all</code>: enables it unconditionally on each node,
|
||||||
|
- <code>replicant</code>: enables it only on replicants (e.g. those where <code>node.role = replicant</code>),
|
||||||
|
- <code>core</code>: enables it only on core nodes,
|
||||||
|
- <code>none</code>: disables this altogether."""
|
||||||
|
|
||||||
broker_perf_trie_compaction.desc:
|
broker_perf_trie_compaction.desc:
|
||||||
"""Enable trie path compaction.
|
"""Enable trie path compaction.
|
||||||
|
|
Loading…
Reference in New Issue