diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 3ca152749..4851a20ea 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -604,11 +604,21 @@ do_dispatch({shard, I}, Topic, Msg) -> %% maybe_add_route(_Existed = false, Topic, ReplyTo) -> - emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); + add_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic, ReplyTo); maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. +add_route(_BatchSync = true, Topic, ReplyTo) -> + emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); +add_route(_BatchSync = false, Topic, _ReplyTo) -> + emqx_router:do_add_route(Topic, node()). + maybe_delete_route(_Exists = false, Topic) -> - emqx_router_syncer:push(delete, Topic, node(), #{}); + delete_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic); maybe_delete_route(_Exists = true, _Topic) -> ok. + +delete_route(_BatchSync = true, Topic) -> + emqx_router_syncer:push(delete, Topic, node(), #{}); +delete_route(_BatchSync = false, Topic) -> + emqx_router:do_delete_route(Topic, node()). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e972c57e0..14eef30d3 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1404,6 +1404,22 @@ fields("broker_routing") -> 'readOnly' => true, desc => ?DESC(broker_routing_storage_schema) } + )}, + {"batch_sync", + sc( + ref("broker_routing_batch_sync"), + #{importance => ?IMPORTANCE_HIDDEN} + )} + ]; +fields("broker_routing_batch_sync") -> + [ + {"enable", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(broker_routing_batch_sync_enabled) + } )} ]; fields("shared_subscription_group") -> diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 042ef91db..fbb9da595 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -72,6 +72,7 @@ -export([stop_apps/1]). -export([merge_appspec/2]). +-export([merge_config/2]). %% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs -export([schema_module/0, upgrade_raw_conf/1]). diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index af8af737b..9753e1d9b 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -30,32 +30,52 @@ all() -> {group, routing_schema_v1}, {group, routing_schema_v2}, t_routing_schema_switch_v1, - t_routing_schema_switch_v2, - t_concurrent_routing_updates + t_routing_schema_switch_v2 ]. groups() -> - TCs = [ + GroupVsn = [ + {group, batch_sync_on}, + {group, batch_sync_off} + ], + GroupBase = [ + {group, cluster}, + t_concurrent_routing_updates + ], + ClusterTCs = [ t_cluster_routing, t_slow_rlog_routing_consistency ], [ - {routing_schema_v1, [], TCs}, - {routing_schema_v2, [], TCs} + {routing_schema_v1, [], GroupVsn}, + {routing_schema_v2, [], GroupVsn}, + {batch_sync_on, [], GroupBase}, + {batch_sync_off, [], GroupBase}, + {cluster, [], ClusterTCs} ]. -init_per_group(GroupName, Config) -> - WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), +init_per_group(routing_schema_v1, Config) -> + [{emqx_config, "broker.routing.storage_schema = v1"} | Config]; +init_per_group(routing_schema_v2, Config) -> + [{emqx_config, "broker.routing.storage_schema = v2"} | Config]; +init_per_group(batch_sync_on, Config) -> + [{emqx_config, "broker.routing.batch_sync.enable = true"} | Config]; +init_per_group(batch_sync_off, Config) -> + [{emqx_config, "broker.routing.batch_sync.enable = false"} | Config]; +init_per_group(cluster, Config) -> + WorkDir = emqx_cth_suite:work_dir(Config), NodeSpecs = [ - {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}}, - {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}}, - {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}} + {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}}, + {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}}, + {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}} ], Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), [{cluster, Nodes} | Config]. -end_per_group(_GroupName, Config) -> - emqx_cth_cluster:stop(?config(cluster, Config)). +end_per_group(cluster, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)); +end_per_group(_, _Config) -> + ok. init_per_testcase(TC, Config) -> emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config). @@ -63,9 +83,9 @@ init_per_testcase(TC, Config) -> end_per_testcase(TC, Config) -> emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config). -mk_emqx_appspec(GroupName, N) -> +mk_emqx_appspec(N, Config) -> {emqx, #{ - config => mk_config(GroupName, N), + config => mk_config(N, Config), after_start => fun() -> % NOTE % This one is actually defined on `emqx_conf_schema` level, but used @@ -79,24 +99,28 @@ mk_genrpc_appspec() -> override_env => [{port_discovery, stateless}] }}. -mk_config(GroupName, N) -> - #{ - broker => mk_config_broker(GroupName), - listeners => mk_config_listeners(N) - }. +mk_config(N, ConfigOrVsn) -> + emqx_cth_suite:merge_config( + mk_config_broker(ConfigOrVsn), + mk_config_listeners(N) + ). -mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 -> - #{routing => #{storage_schema => v1}}; -mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 -> - #{routing => #{storage_schema => v2}}. +mk_config_broker(v1) -> + "broker.routing.storage_schema = v1"; +mk_config_broker(v2) -> + "broker.routing.storage_schema = v2"; +mk_config_broker(CTConfig) -> + string:join(proplists:get_all_values(emqx_config, CTConfig), "\n"). mk_config_listeners(N) -> Port = 1883 + N, #{ - tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}}, - ssl => #{default => #{enable => false}}, - ws => #{default => #{enable => false}}, - wss => #{default => #{enable => false}} + listeners => #{ + tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}}, + ssl => #{default => #{enable => false}}, + ws => #{default => #{enable => false}}, + wss => #{default => #{enable => false}} + } }. %% @@ -202,12 +226,15 @@ t_concurrent_routing_updates(init, Config) -> Apps = emqx_cth_suite:start( [ {emqx, #{ - config => #{broker => #{routing => #{storage_schema => v2}}}, + config => mk_config_broker(Config), + %% NOTE + %% Artificially increasing pool workers contention by forcing small pool size. before_start => fun() -> % NOTE % This one is actually defined on `emqx_conf_schema` level, but used % in `emqx_broker`. Thus we have to resort to this ugly hack. - emqx_config:force_put([node, broker_pool_size], 2) + emqx_config:force_put([node, broker_pool_size], 2), + emqx_app:set_config_loader(?MODULE) end }} ], @@ -331,7 +358,7 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) -> [Node1] = emqx_cth_cluster:start( [ {routing_schema_switch1, #{ - apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)] + apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, VTo)] }} ], #{work_dir => WorkDir} @@ -344,12 +371,12 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) -> [Node2, Node3] = emqx_cth_cluster:start( [ {routing_schema_switch2, #{ - apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)], + apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, VFrom)], base_port => 20000, join_to => Node1 }}, {routing_schema_switch3, #{ - apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)], + apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, VFrom)], base_port => 20100, join_to => Node1 }} diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 84305317f..bd8909c3d 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1541,6 +1541,10 @@ Set v1 to use the former schema. NOTE: Schema v2 is still experimental. NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect.""" +broker_routing_batch_sync_enable.desc: +"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner. +Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.""" + broker_perf_trie_compaction.desc: """Enable trie path compaction. Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber.