From 0f1aaa65bc35dc928a75f5d3b2b332a15a272400 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 12 Jan 2024 17:52:31 +0800 Subject: [PATCH 01/21] fix(iotdb): move the `iot_version` into IoTDB connector --- .../src/emqx_bridge_iotdb.erl | 11 +--- .../src/emqx_bridge_iotdb_connector.erl | 52 +++++++++++++------ .../test/emqx_bridge_iotdb_impl_SUITE.erl | 10 ++-- 3 files changed, 41 insertions(+), 32 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 781eae4b6..562678a17 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -89,14 +89,6 @@ fields(action_parameters) -> desc => ?DESC("config_device_id") } )}, - {iotdb_version, - mk( - hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), - #{ - desc => ?DESC("config_iotdb_version"), - default => ?VSN_1_1_X - } - )}, {data, mk( array(ref(?MODULE, action_parameters_data)), @@ -310,8 +302,7 @@ action_values() -> } ], is_aligned => false, - device_id => <<"my_device">>, - iotdb_version => ?VSN_1_1_X + device_id => <<"my_device">> } }. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 4286a59e4..75161bdc2 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -47,6 +47,7 @@ connect_timeout := pos_integer(), pool_type := random | hash, pool_size := pos_integer(), + iotdb_version := atom(), request => undefined | map(), atom() => _ }. @@ -57,6 +58,7 @@ connect_timeout := pos_integer(), pool_type := random | hash, channels := map(), + iotdb_version := atom(), request => undefined | map(), atom() => _ }. @@ -88,6 +90,7 @@ connector_example_values() -> name => <<"iotdb_connector">>, type => iotdb, enable => true, + iotdb_version => ?VSN_1_1_X, authentication => #{ <<"username">> => <<"root">>, <<"password">> => <<"*****">> @@ -121,6 +124,14 @@ fields("connection_fields") -> desc => ?DESC(emqx_bridge_iotdb, "config_base_url") } )}, + {iotdb_version, + mk( + hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), + #{ + desc => ?DESC(emqx_bridge_iotdb, "iotdb_version"), + default => ?VSN_1_1_X + } + )}, {authentication, mk( hoconsc:union([ref(?MODULE, auth_basic)]), @@ -190,7 +201,7 @@ proplists_without(Keys, List) -> callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). -on_start(InstanceId, Config) -> +on_start(InstanceId, #{iotdb_version := Version} = Config) -> %% [FIXME] The configuration passed in here is pre-processed and transformed %% in emqx_bridge_resource:parse_confs/2. case emqx_bridge_http_connector:on_start(InstanceId, Config) of @@ -201,7 +212,7 @@ on_start(InstanceId, Config) -> request => maps:get(request, State, <<>>) }), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), - {ok, State#{channels => #{}}}; + {ok, State#{iotdb_version => Version, channels => #{}}}; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", @@ -231,7 +242,11 @@ on_get_status(InstanceId, State) -> {ok, pos_integer(), [term()], term()} | {ok, pos_integer(), [term()]} | {error, term()}. -on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) -> +on_query( + InstanceId, + {ChannelId, _Message} = Req, + #{iotdb_version := IoTDBVsn, channels := Channels} = State +) -> ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_called", @@ -240,7 +255,7 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of + case try_render_message(Req, IoTDBVsn, Channels) of {ok, IoTDBPayload} -> handle_response( emqx_bridge_http_connector:on_query( @@ -254,7 +269,10 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. on_query_async( - InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State + InstanceId, + {ChannelId, _Message} = Req, + ReplyFunAndArgs0, + #{iotdb_version := IoTDBVsn, channels := Channels} = State ) -> ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ @@ -263,7 +281,7 @@ on_query_async( send_message => Req, state => emqx_utils:redact(State) }), - case try_render_message(Req, Channels) of + case try_render_message(Req, IoTDBVsn, Channels) of {ok, IoTDBPayload} -> ReplyFunAndArgs = { @@ -282,10 +300,10 @@ on_query_async( on_add_channel( InstanceId, - #{channels := Channels} = OldState0, + #{iotdb_version := Version, channels := Channels} = OldState0, ChannelId, #{ - parameters := #{iotdb_version := Version, data := Data} = Parameter + parameters := #{data := Data} = Parameter } ) -> case maps:is_key(ChannelId, Channels) of @@ -495,18 +513,18 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> +make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) -> InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, - Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), + Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn), {ok, maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId + iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned, + iotdb_field_key(device_id, IoTDBVsn) => DeviceId })}. -replace_dtypes(Rows0, IotDBVsn) -> +replace_dtypes(Rows0, IoTDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), - Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}. + Rows#{iotdb_field_key(data_types, IoTDBVsn) => Types}. aggregate_rows(DataList, InitAcc) -> lists:foldr( @@ -645,15 +663,15 @@ preproc_data_template(DataList) -> DataList ). -try_render_message({ChannelId, Msg}, Channels) -> +try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) -> case maps:find(ChannelId, Channels) of {ok, Channel} -> - render_channel_message(Channel, Msg); + render_channel_message(Channel, IoTDBVsn, Msg); _ -> {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} end. -render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> +render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) -> Payloads = to_list(parse_payload(get_payload(Message))), case device_id(Message, Payloads, Channel) of undefined -> diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 6b9af7b9a..d2d5760e5 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -255,7 +255,6 @@ is_error_check(Reason) -> end. action_config(Name, Config) -> - Version = ?config(iotdb_version, Config), Type = ?config(bridge_type, Config), ConfigString = io_lib:format( @@ -263,15 +262,13 @@ action_config(Name, Config) -> " enable = true\n" " connector = \"~s\"\n" " parameters = {\n" - " iotdb_version = \"~s\"\n" " data = []\n" " }\n" "}\n", [ Type, Name, - Name, - Version + Name ] ), ct:pal("ActionConfig:~ts~n", [ConfigString]), @@ -281,12 +278,14 @@ connector_config(Name, Config) -> Host = ?config(bridge_host, Config), Port = ?config(bridge_port, Config), Type = ?config(bridge_type, Config), + Version = ?config(iotdb_version, Config), ServerURL = iotdb_server_url(Host, Port), ConfigString = io_lib:format( "connectors.~s.~s {\n" " enable = true\n" " base_url = \"~s\"\n" + " iotdb_version = \"~s\"\n" " authentication = {\n" " username = \"root\"\n" " password = \"root\"\n" @@ -295,7 +294,8 @@ connector_config(Name, Config) -> [ Type, Name, - ServerURL + ServerURL, + Version ] ), ct:pal("ConnectorConfig:~ts~n", [ConfigString]), From 54f8b4745572671cb7cc5b87f663e42ce8b6f620 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 27 Dec 2023 18:31:25 +0100 Subject: [PATCH 02/21] feat(routing): add route sync process pool Dedicated to synchronizing local state updates with the global view of the routing state. --- apps/emqx/src/emqx_broker.erl | 28 ++-- apps/emqx/src/emqx_broker_sup.erl | 11 +- apps/emqx/src/emqx_router.erl | 130 +++++++++++---- apps/emqx/src/emqx_router_syncer.erl | 235 +++++++++++++++++++++++++++ 4 files changed, 361 insertions(+), 43 deletions(-) create mode 100644 apps/emqx/src/emqx_router_syncer.erl diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 856a3f7ae..3ca152749 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -167,7 +167,13 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% `unsubscribe` codepath. So we have to pick a worker according to the topic, %% but not shard. If there are topics with high number of shards, then the %% load across the pool will be unbalanced. - call(pick(Topic), {subscribe, Topic, SubPid, I}); + Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}), + case Sync of + ok -> + ok; + Ref when is_reference(Ref) -> + emqx_router_syncer:wait(Ref) + end; do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) -> @@ -491,8 +497,8 @@ safe_update_stats(Tab, Stat, MaxStat) -> call(Broker, Req) -> gen_server:call(Broker, Req, infinity). -cast(Broker, Msg) -> - gen_server:cast(Broker, Msg). +cast(Broker, Req) -> + gen_server:cast(Broker, Req). %% Pick a broker pick(Topic) -> @@ -506,18 +512,18 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({subscribe, Topic, SubPid, 0}, _From, State) -> +handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) -> Existed = ets:member(?SUBSCRIBER, Topic), true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - Result = maybe_add_route(Existed, Topic), + Result = maybe_add_route(Existed, Topic, From), {reply, Result, State}; -handle_call({subscribe, Topic, SubPid, I}, _From, State) -> +handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) -> Existed = ets:member(?SUBSCRIBER, Topic), true = ets:insert(?SUBSCRIBER, [ {Topic, {shard, I}}, {{shard, Topic, I}, SubPid} ]), - Result = maybe_add_route(Existed, Topic), + Result = maybe_add_route(Existed, Topic, From), {reply, Result, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -597,12 +603,12 @@ do_dispatch({shard, I}, Topic, Msg) -> %% -maybe_add_route(_Existed = false, Topic) -> - emqx_router:do_add_route(Topic); -maybe_add_route(_Existed = true, _Topic) -> +maybe_add_route(_Existed = false, Topic, ReplyTo) -> + emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); +maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. maybe_delete_route(_Exists = false, Topic) -> - emqx_router:do_delete_route(Topic); + emqx_router_syncer:push(delete, Topic, node(), #{}); maybe_delete_route(_Exists = true, _Topic) -> ok. diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl index 74baf5674..d05cb7718 100644 --- a/apps/emqx/src/emqx_broker_sup.erl +++ b/apps/emqx/src/emqx_broker_sup.erl @@ -32,13 +32,20 @@ start_link() -> init([]) -> %% Broker pool PoolSize = emqx:get_config([node, broker_pool_size], emqx_vm:schedulers() * 2), - BrokerPool = emqx_pool_sup:spec([ + BrokerPool = emqx_pool_sup:spec(broker_pool_sup, [ broker_pool, hash, PoolSize, {emqx_broker, start_link, []} ]), + SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [ + router_syncer_pool, + hash, + emqx:get_config([node, syncer_pool_size], emqx_vm:schedulers() * 2), + {emqx_router_syncer, start_link, []} + ]), + %% Shared subscription SharedSub = #{ id => shared_sub, @@ -59,4 +66,4 @@ init([]) -> modules => [emqx_broker_helper] }, - {ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}. + {ok, {{one_for_all, 0, 1}, [BrokerPool, SyncerPool, SharedSub, Helper]}}. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 13efbe4ea..a10fde1cc 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -45,6 +45,8 @@ do_delete_route/2 ]). +-export([do_batch/1]). + -export([cleanup_routes/1]). -export([ @@ -86,6 +88,8 @@ deinit_schema/0 ]). +-export_type([dest/0]). + -type group() :: binary(). -type dest() :: node() | {group(), node()}. @@ -173,12 +177,12 @@ do_add_route(Topic) when is_binary(Topic) -> -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, Dest) when is_binary(Topic) -> ok = emqx_router_helper:monitor(Dest), - mria_insert_route(get_schema_vsn(), Topic, Dest). + mria_insert_route(get_schema_vsn(), Topic, Dest, single). -mria_insert_route(v2, Topic, Dest) -> - mria_insert_route_v2(Topic, Dest); -mria_insert_route(v1, Topic, Dest) -> - mria_insert_route_v1(Topic, Dest). +mria_insert_route(v2, Topic, Dest, Ctx) -> + mria_insert_route_v2(Topic, Dest, Ctx); +mria_insert_route(v1, Topic, Dest, Ctx) -> + mria_insert_route_v1(Topic, Dest, Ctx). %% @doc Take a real topic (not filter) as input, return the matching topics and topic %% filters associated with route destination. @@ -225,12 +229,60 @@ do_delete_route(Topic) when is_binary(Topic) -> -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, Dest) -> - mria_delete_route(get_schema_vsn(), Topic, Dest). + mria_delete_route(get_schema_vsn(), Topic, Dest, single). -mria_delete_route(v2, Topic, Dest) -> - mria_delete_route_v2(Topic, Dest); -mria_delete_route(v1, Topic, Dest) -> - mria_delete_route_v1(Topic, Dest). +mria_delete_route(v2, Topic, Dest, Ctx) -> + mria_delete_route_v2(Topic, Dest, Ctx); +mria_delete_route(v1, Topic, Dest, Ctx) -> + mria_delete_route_v1(Topic, Dest, Ctx). + +do_batch(Batch) -> + Nodes = batch_get_dest_nodes(Batch), + ok = lists:foreach(fun emqx_router_helper:monitor/1, Nodes), + mria_batch(get_schema_vsn(), Batch). + +mria_batch(v2, Batch) -> + mria_batch_v2(Batch); +mria_batch(v1, Batch) -> + mria_batch_v1(Batch). + +mria_batch_v2(Batch) -> + mria:async_dirty(?ROUTE_SHARD, fun mria_batch_run/2, [v2, Batch]). + +mria_batch_v1(Batch) -> + {atomic, Res} = mria:transaction(?ROUTE_SHARD, fun mria_batch_run/2, [v1, Batch]), + Res. + +mria_batch_run(SchemaVsn, Batch) -> + maps:fold( + fun({Topic, Dest}, Op, Errors) -> + case mria_batch_operation(SchemaVsn, Op, Topic, Dest) of + ok -> + Errors; + Error -> + Errors#{{Topic, Dest} => Error} + end + end, + #{}, + Batch + ). + +mria_batch_operation(SchemaVsn, add, Topic, Dest) -> + mria_insert_route(SchemaVsn, Topic, Dest, batch); +mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> + mria_delete_route(SchemaVsn, Topic, Dest, batch). + +batch_get_dest_nodes(Batch) -> + maps:fold( + fun + ({_Topic, Dest}, add, Acc) -> + ordsets:add_element(get_dest_node(Dest), Acc); + (_, delete, Acc) -> + Acc + end, + ordsets:new(), + Batch + ). -spec select(Spec, _Limit :: pos_integer(), Continuation) -> {[emqx_types:route()], Continuation} | '$end_of_table' @@ -305,43 +357,51 @@ pick(Topic) -> %% Schema v1 %% -------------------------------------------------------------------- -mria_insert_route_v1(Topic, Dest) -> +mria_insert_route_v1(Topic, Dest, Ctx) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - mria_route_tab_insert_update_trie(Route); + mria_route_tab_insert_update_trie(Route, Ctx); false -> - mria_route_tab_insert(Route) + mria_route_tab_insert(Route, Ctx) end. -mria_route_tab_insert_update_trie(Route) -> +mria_route_tab_insert_update_trie(Route, single) -> emqx_router_utils:maybe_trans( fun emqx_router_utils:insert_trie_route/2, [?ROUTE_TAB, Route], ?ROUTE_SHARD - ). + ); +mria_route_tab_insert_update_trie(Route, batch) -> + emqx_router_utils:insert_trie_route(?ROUTE_TAB, Route). -mria_route_tab_insert(Route) -> - mria:dirty_write(?ROUTE_TAB, Route). +mria_route_tab_insert(Route, single) -> + mria:dirty_write(?ROUTE_TAB, Route); +mria_route_tab_insert(Route, batch) -> + mnesia:write(?ROUTE_TAB, Route, write). -mria_delete_route_v1(Topic, Dest) -> +mria_delete_route_v1(Topic, Dest, Ctx) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - mria_route_tab_delete_update_trie(Route); + mria_route_tab_delete_update_trie(Route, Ctx); false -> - mria_route_tab_delete(Route) + mria_route_tab_delete(Route, Ctx) end. -mria_route_tab_delete_update_trie(Route) -> +mria_route_tab_delete_update_trie(Route, single) -> emqx_router_utils:maybe_trans( fun emqx_router_utils:delete_trie_route/2, [?ROUTE_TAB, Route], ?ROUTE_SHARD - ). + ); +mria_route_tab_delete_update_trie(Route, batch) -> + emqx_router_utils:delete_trie_route(?ROUTE_TAB, Route). -mria_route_tab_delete(Route) -> - mria:dirty_delete_object(?ROUTE_TAB, Route). +mria_route_tab_delete(Route, single) -> + mria:dirty_delete_object(?ROUTE_TAB, Route); +mria_route_tab_delete(Route, batch) -> + mnesia:delete_object(?ROUTE_TAB, Route, write). match_routes_v1(Topic) -> lookup_route_tab(Topic) ++ @@ -410,24 +470,34 @@ fold_routes_v1(FunName, FoldFun, AccIn) -> %% topics. Writes go to only one of the two tables at a time. %% -------------------------------------------------------------------- -mria_insert_route_v2(Topic, Dest) -> +mria_insert_route_v2(Topic, Dest, Ctx) -> case emqx_trie_search:filter(Topic) of Words when is_list(Words) -> K = emqx_topic_index:make_key(Words, Dest), - mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}); + mria_filter_tab_insert(K, Ctx); false -> - mria_route_tab_insert(#route{topic = Topic, dest = Dest}) + mria_route_tab_insert(#route{topic = Topic, dest = Dest}, Ctx) end. -mria_delete_route_v2(Topic, Dest) -> +mria_filter_tab_insert(K, single) -> + mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}); +mria_filter_tab_insert(K, batch) -> + mnesia:write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}, write). + +mria_delete_route_v2(Topic, Dest, Ctx) -> case emqx_trie_search:filter(Topic) of Words when is_list(Words) -> K = emqx_topic_index:make_key(Words, Dest), - mria:dirty_delete(?ROUTE_TAB_FILTERS, K); + mria_filter_tab_delete(K, Ctx); false -> - mria_route_tab_delete(#route{topic = Topic, dest = Dest}) + mria_route_tab_delete(#route{topic = Topic, dest = Dest}, Ctx) end. +mria_filter_tab_delete(K, single) -> + mria:dirty_delete(?ROUTE_TAB_FILTERS, K); +mria_filter_tab_delete(K, batch) -> + mnesia:delete(?ROUTE_TAB_FILTERS, K, write). + match_routes_v2(Topic) -> lookup_route_tab(Topic) ++ [match_to_route(M) || M <- match_filters(Topic)]. diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl new file mode 100644 index 000000000..4ffb724df --- /dev/null +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -0,0 +1,235 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_syncer). + +-behaviour(gen_server). + +-export([start_link/2]). + +-export([push/4]). +-export([wait/1]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-type action() :: add | delete. + +-define(POOL, router_syncer_pool). + +-define(MAX_BATCH_SIZE, 4000). +-define(MIN_SYNC_INTERVAL, 1). + +-define(HIGHEST_PRIO, 1). +-define(LOWEST_PRIO, 4). + +-define(PUSH(PRIO, OP), {PRIO, OP}). + +-define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}). + +%% + +-spec start_link(atom(), pos_integer()) -> + {ok, pid()}. +start_link(Pool, Id) -> + gen_server:start_link( + {local, emqx_utils:proc_name(?MODULE, Id)}, + ?MODULE, + [Pool, Id], + [] + ). + +-spec push(action(), emqx_types:topic(), emqx_router:dest(), Opts) -> + ok | _WaitRef :: reference() +when + Opts :: #{reply => pid()}. +push(Action, Topic, Dest, Opts) -> + Worker = gproc_pool:pick_worker(?POOL, Topic), + Prio = designate_prio(Action, Opts), + Context = mk_push_context(Opts), + Worker ! ?PUSH(Prio, {Action, Topic, Dest, Context}), + case Context of + {MRef, _} -> + MRef; + [] -> + ok + end. + +-spec wait(_WaitRef :: reference()) -> + ok | {error, _Reason}. +wait(MRef) -> + %% FIXME: timeouts + receive + {MRef, Result} -> + Result + end. + +designate_prio(_, #{reply := true}) -> + ?HIGHEST_PRIO; +designate_prio(add, #{}) -> + 2; +designate_prio(delete, #{}) -> + 3. + +mk_push_context(#{reply := To}) -> + MRef = erlang:make_ref(), + {MRef, To}; +mk_push_context(_) -> + []. + +%% + +init([Pool, Id]) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #{queue => []}}. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(Push = ?PUSH(_, _), State) -> + %% NOTE: Wait a bit to collect potentially overlapping operations. + ok = timer:sleep(?MIN_SYNC_INTERVAL), + NState = run_batch_loop([Push], State), + {noreply, NState}. + +terminate(_Reason, _State) -> + ok. + +%% + +run_batch_loop(Incoming, State = #{queue := Queue}) -> + NQueue = queue_join(Queue, gather_operations(Incoming)), + {Batch, N, FQueue} = mk_batch(NQueue), + %% TODO: retry if error? + Errors = run_batch(Batch), + 0 = send_replies(Errors, N, NQueue), + %% TODO: squash queue + NState = State#{queue := queue_fix(FQueue)}, + case queue_empty(FQueue) of + true -> + NState; + false -> + run_batch_loop([], NState) + end. + +%% + +mk_batch(Queue) -> + mk_batch(Queue, 0, #{}). + +mk_batch(Queue, N, Batch) when map_size(Batch) =:= ?MAX_BATCH_SIZE -> + {Batch, N, Queue}; +mk_batch([Op = ?OP(_, _, _, _) | Queue], N, Batch) -> + NBatch = batch_add_operation(Op, Batch), + mk_batch(Queue, N + 1, NBatch); +mk_batch([Run | Queue], N, Batch) when is_list(Run) -> + case mk_batch(Run, N, Batch) of + {NBatch, N1, []} -> + mk_batch(Queue, N1, NBatch); + {NBatch, N1, Left} -> + {NBatch, N1, [Left | Queue]} + end; +mk_batch([], N, Batch) -> + {Batch, N, []}. + +batch_add_operation(?OP(Action, Topic, Dest, _ReplyCtx), Batch) -> + case Batch of + #{{Topic, Dest} := Action} -> + Batch; + #{{Topic, Dest} := delete} when Action == add -> + Batch#{{Topic, Dest} := add}; + #{{Topic, Dest} := add} when Action == delete -> + maps:remove({Topic, Dest}, Batch); + #{} -> + maps:put({Topic, Dest}, Action, Batch) + end. + +send_replies(_Result, 0, _Queue) -> + 0; +send_replies(Result, N, [Op = ?OP(_, _, _, _) | Queue]) -> + _ = replyctx_send(Result, Op), + send_replies(Result, N - 1, Queue); +send_replies(Result, N, [Run | Queue]) when is_list(Run) -> + N1 = send_replies(Result, N, Run), + send_replies(Result, N1, Queue); +send_replies(_Result, N, []) -> + N. + +replyctx_send(_Result, ?OP(_, _, _, [])) -> + noreply; +replyctx_send(Result, ?OP(_, Topic, Dest, {MRef, Pid})) -> + case Result of + #{{Topic, Dest} := Error} -> + Pid ! {MRef, Error}; + #{} -> + Pid ! {MRef, ok} + end. + +%% + +run_batch(Batch) -> + emqx_router:do_batch(Batch). + +%% + +queue_fix([]) -> + []; +queue_fix(Queue) when length(Queue) < ?LOWEST_PRIO -> + queue_fix([[] | Queue]); +queue_fix(Queue) -> + Queue. + +queue_join(Q1, []) -> + Q1; +queue_join([], Q2) -> + Q2; +queue_join(Q1, Q2) -> + lists:zipwith(fun join_list/2, Q1, Q2). + +join_list(L1, []) -> + L1; +join_list([], L2) -> + L2; +join_list(L1, L2) -> + [L1, L2]. + +queue_empty(Queue) -> + lists:all(fun(L) -> L == [] end, Queue). + +gather_operations(Incoming) -> + [ + pick_operations(Prio, Incoming) ++ drain_operations(Prio) + || Prio <- lists:seq(?HIGHEST_PRIO, ?LOWEST_PRIO) + ]. + +drain_operations(Prio) -> + receive + {Prio, Op} -> + [Op | drain_operations(Prio)] + after 0 -> + [] + end. + +pick_operations(Prio, Incoming) -> + [Op || {P, Op} <- Incoming, P =:= Prio]. From f92b5b3f32bf74c291d216aa58c7e3f09264604f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 5 Jan 2024 12:53:44 +0100 Subject: [PATCH 03/21] feat(stream): add simple stream over process message queue --- apps/emqx_utils/src/emqx_utils_stream.erl | 13 +++++++++++++ apps/emqx_utils/test/emqx_utils_stream_tests.erl | 9 +++++++++ 2 files changed, 22 insertions(+) diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 79ce5ce7b..21321400d 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -20,6 +20,7 @@ -export([ empty/0, list/1, + mqueue/1, map/2, chain/2 ]). @@ -59,6 +60,18 @@ list([]) -> list([X | Rest]) -> fun() -> [X | list(Rest)] end. +%% @doc Make a stream out of process message queue. +-spec mqueue(timeout()) -> stream(any()). +mqueue(Timeout) -> + fun() -> + receive + X -> + [X | mqueue(Timeout)] + after Timeout -> + [] + end + end. + %% @doc Make a stream by applying a function to each element of the underlying stream. -spec map(fun((X) -> Y), stream(X)) -> stream(Y). map(F, S) -> diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 4a48ae45d..ef8185a94 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -73,3 +73,12 @@ chain_list_map_test() -> ["1", "2", "3", "4", "5", "6"], emqx_utils_stream:consume(S) ). + +mqueue_test() -> + _ = erlang:send_after(1, self(), 1), + _ = erlang:send_after(100, self(), 2), + _ = erlang:send_after(20, self(), 42), + ?assertEqual( + [1, 42, 2], + emqx_utils_stream:consume(emqx_utils_stream:mqueue(400)) + ). From 5aeff20f8b8f30618040f26f3650254bfe520593 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 5 Jan 2024 18:34:39 +0100 Subject: [PATCH 04/21] fix(routesync): ensure causal relationships are preserved At the cost of strict FIFO semantics though. --- apps/emqx/src/emqx_broker_sup.erl | 2 +- apps/emqx/src/emqx_router.erl | 29 ++- apps/emqx/src/emqx_router_syncer.erl | 283 ++++++++++++++++++--------- 3 files changed, 211 insertions(+), 103 deletions(-) diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl index d05cb7718..e6d93d92f 100644 --- a/apps/emqx/src/emqx_broker_sup.erl +++ b/apps/emqx/src/emqx_broker_sup.erl @@ -42,7 +42,7 @@ init([]) -> SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [ router_syncer_pool, hash, - emqx:get_config([node, syncer_pool_size], emqx_vm:schedulers() * 2), + PoolSize, {emqx_router_syncer, start_link, []} ]), diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index a10fde1cc..54667065a 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -45,6 +45,11 @@ do_delete_route/2 ]). +%% Mria Activity RPC targets +% -export([ +% mria_insert_route/2 +% ]). + -export([do_batch/1]). -export([cleanup_routes/1]). @@ -236,9 +241,14 @@ mria_delete_route(v2, Topic, Dest, Ctx) -> mria_delete_route(v1, Topic, Dest, Ctx) -> mria_delete_route_v1(Topic, Dest, Ctx). +-spec do_batch(Batch) -> Errors when + %% Operation :: {add, ...} | {delete, ...}. + Batch :: #{Route => _Operation :: tuple()}, + Errors :: #{Route => _Error}, + Route :: {emqx_types:topic(), dest()}. do_batch(Batch) -> Nodes = batch_get_dest_nodes(Batch), - ok = lists:foreach(fun emqx_router_helper:monitor/1, Nodes), + ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)), mria_batch(get_schema_vsn(), Batch). mria_batch(v2, Batch) -> @@ -256,7 +266,7 @@ mria_batch_v1(Batch) -> mria_batch_run(SchemaVsn, Batch) -> maps:fold( fun({Topic, Dest}, Op, Errors) -> - case mria_batch_operation(SchemaVsn, Op, Topic, Dest) of + case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of ok -> Errors; Error -> @@ -274,16 +284,21 @@ mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> batch_get_dest_nodes(Batch) -> maps:fold( - fun - ({_Topic, Dest}, add, Acc) -> - ordsets:add_element(get_dest_node(Dest), Acc); - (_, delete, Acc) -> - Acc + fun({_Topic, Dest}, Op, Acc) -> + case batch_get_action(Op) of + add -> + ordsets:add_element(get_dest_node(Dest), Acc); + delete -> + Acc + end end, ordsets:new(), Batch ). +batch_get_action(Op) -> + element(1, Op). + -spec select(Spec, _Limit :: pos_integer(), Continuation) -> {[emqx_types:route()], Continuation} | '$end_of_table' when diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index 4ffb724df..bdbeffb4b 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -16,6 +16,8 @@ -module(emqx_router_syncer). +-include_lib("snabbkaffe/include/trace.hrl"). + -behaviour(gen_server). -export([start_link/2]). @@ -35,16 +37,27 @@ -define(POOL, router_syncer_pool). --define(MAX_BATCH_SIZE, 4000). +-define(MAX_BATCH_SIZE, 1000). -define(MIN_SYNC_INTERVAL, 1). --define(HIGHEST_PRIO, 1). --define(LOWEST_PRIO, 4). +-define(PRIO_HI, 1). +-define(PRIO_LO, 2). +-define(PRIO_BG, 3). -define(PUSH(PRIO, OP), {PRIO, OP}). - -define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}). +-define(ROUTEOP(ACT), {ACT, _, _}). +-define(ROUTEOP(ACT, PRIO), {ACT, PRIO, _}). +-define(ROUTEOP(ACT, PRIO, CTX), {ACT, PRIO, CTX}). + +-ifdef(TEST). +-undef(MAX_BATCH_SIZE). +-undef(MIN_SYNC_INTERVAL). +-define(MAX_BATCH_SIZE, 40). +-define(MIN_SYNC_INTERVAL, 10). +-endif. + %% -spec start_link(atom(), pos_integer()) -> @@ -82,12 +95,12 @@ wait(MRef) -> Result end. -designate_prio(_, #{reply := true}) -> - ?HIGHEST_PRIO; +designate_prio(_, #{reply := _To}) -> + ?PRIO_HI; designate_prio(add, #{}) -> - 2; + ?PRIO_LO; designate_prio(delete, #{}) -> - 3. + ?PRIO_BG. mk_push_context(#{reply := To}) -> MRef = erlang:make_ref(), @@ -99,7 +112,7 @@ mk_push_context(_) -> init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #{queue => []}}. + {ok, #{stash => stash_new()}}. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -118,15 +131,24 @@ terminate(_Reason, _State) -> %% -run_batch_loop(Incoming, State = #{queue := Queue}) -> - NQueue = queue_join(Queue, gather_operations(Incoming)), - {Batch, N, FQueue} = mk_batch(NQueue), +run_batch_loop(Incoming, State = #{stash := Stash0}) -> + Stash1 = stash_add(Incoming, Stash0), + Stash2 = stash_drain(Stash1), + {Batch, Stash3} = mk_batch(Stash2), + ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, #{ + size => maps:size(Batch), + stashed => maps:size(Stash3), + n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Batch)), + n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Batch)), + prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Batch), + prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Batch) + }), %% TODO: retry if error? Errors = run_batch(Batch), - 0 = send_replies(Errors, N, NQueue), - %% TODO: squash queue - NState = State#{queue := queue_fix(FQueue)}, - case queue_empty(FQueue) of + ok = send_replies(Errors, Batch), + NState = State#{stash := Stash3}, + %% TODO: postpone if only ?PRIO_BG operations left? + case stash_empty(Stash3) of true -> NState; false -> @@ -135,56 +157,59 @@ run_batch_loop(Incoming, State = #{queue := Queue}) -> %% -mk_batch(Queue) -> - mk_batch(Queue, 0, #{}). +mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE -> + %% This is perfect situation, we just use stash as batch w/o extra reallocations. + {Stash, stash_new()}; +mk_batch(Stash) -> + %% Take a subset of stashed operations to form a batch. + %% Note that stash is an unordered map, it's not a queue. The order of operations is + %% not preserved strictly, only loosely, because of how we start from high priority + %% operations and go down to low priority ones. This might cause some operations to + %% stay in stash for unfairly long time, when there are many high priority operations. + %% However, it's unclear how likely this is to happen in practice. + mk_batch(Stash, ?MAX_BATCH_SIZE). -mk_batch(Queue, N, Batch) when map_size(Batch) =:= ?MAX_BATCH_SIZE -> - {Batch, N, Queue}; -mk_batch([Op = ?OP(_, _, _, _) | Queue], N, Batch) -> - NBatch = batch_add_operation(Op, Batch), - mk_batch(Queue, N + 1, NBatch); -mk_batch([Run | Queue], N, Batch) when is_list(Run) -> - case mk_batch(Run, N, Batch) of - {NBatch, N1, []} -> - mk_batch(Queue, N1, NBatch); - {NBatch, N1, Left} -> - {NBatch, N1, [Left | Queue]} +mk_batch(Stash, BatchSize) -> + mk_batch(?PRIO_HI, #{}, BatchSize, Stash). + +mk_batch(Prio, Batch, SizeLeft, Stash) -> + mk_batch(Prio, Batch, SizeLeft, Stash, maps:iterator(Stash)). + +mk_batch(Prio, Batch, SizeLeft, Stash, It) when SizeLeft > 0 -> + %% Iterating over stash, only taking operations with priority equal to `Prio`. + case maps:next(It) of + {Route, Op = ?ROUTEOP(_Action, Prio), NIt} -> + NBatch = Batch#{Route => Op}, + NStash = maps:remove(Route, Stash), + mk_batch(Prio, NBatch, SizeLeft - 1, NStash, NIt); + {_Route, _Op, NIt} -> + %% This is lower priority operation, skip it. + mk_batch(Prio, Batch, SizeLeft, Stash, NIt); + none -> + %% No more operations with priority `Prio`, go to the next priority level. + true = Prio < ?PRIO_BG, + mk_batch(Prio + 1, Batch, SizeLeft, Stash) end; -mk_batch([], N, Batch) -> - {Batch, N, []}. +mk_batch(_Prio, Batch, _, Stash, _It) -> + {Batch, Stash}. -batch_add_operation(?OP(Action, Topic, Dest, _ReplyCtx), Batch) -> - case Batch of - #{{Topic, Dest} := Action} -> - Batch; - #{{Topic, Dest} := delete} when Action == add -> - Batch#{{Topic, Dest} := add}; - #{{Topic, Dest} := add} when Action == delete -> - maps:remove({Topic, Dest}, Batch); - #{} -> - maps:put({Topic, Dest}, Action, Batch) - end. +send_replies(Errors, Batch) -> + maps:foreach( + fun(Route, {_Action, _Prio, Ctx}) -> + case Ctx of + [] -> + ok; + _ -> + replyctx_send(maps:get(Route, Errors, ok), Ctx) + end + end, + Batch + ). -send_replies(_Result, 0, _Queue) -> - 0; -send_replies(Result, N, [Op = ?OP(_, _, _, _) | Queue]) -> - _ = replyctx_send(Result, Op), - send_replies(Result, N - 1, Queue); -send_replies(Result, N, [Run | Queue]) when is_list(Run) -> - N1 = send_replies(Result, N, Run), - send_replies(Result, N1, Queue); -send_replies(_Result, N, []) -> - N. - -replyctx_send(_Result, ?OP(_, _, _, [])) -> +replyctx_send(_Result, []) -> noreply; -replyctx_send(Result, ?OP(_, Topic, Dest, {MRef, Pid})) -> - case Result of - #{{Topic, Dest} := Error} -> - Pid ! {MRef, Error}; - #{} -> - Pid ! {MRef, ok} - end. +replyctx_send(Result, {MRef, Pid}) -> + Pid ! {MRef, Result}. %% @@ -193,43 +218,111 @@ run_batch(Batch) -> %% -queue_fix([]) -> - []; -queue_fix(Queue) when length(Queue) < ?LOWEST_PRIO -> - queue_fix([[] | Queue]); -queue_fix(Queue) -> - Queue. +stash_new() -> + #{}. -queue_join(Q1, []) -> - Q1; -queue_join([], Q2) -> - Q2; -queue_join(Q1, Q2) -> - lists:zipwith(fun join_list/2, Q1, Q2). +stash_empty(Stash) -> + maps:size(Stash) =:= 0. -join_list(L1, []) -> - L1; -join_list([], L2) -> - L2; -join_list(L1, L2) -> - [L1, L2]. - -queue_empty(Queue) -> - lists:all(fun(L) -> L == [] end, Queue). - -gather_operations(Incoming) -> - [ - pick_operations(Prio, Incoming) ++ drain_operations(Prio) - || Prio <- lists:seq(?HIGHEST_PRIO, ?LOWEST_PRIO) - ]. - -drain_operations(Prio) -> +stash_drain(Stash) -> receive - {Prio, Op} -> - [Op | drain_operations(Prio)] + ?PUSH(Prio, Op) -> + stash_drain(stash_add(Prio, Op, Stash)) after 0 -> - [] + Stash end. -pick_operations(Prio, Incoming) -> - [Op || {P, Op} <- Incoming, P =:= Prio]. +stash_add(Pushes, Stash) -> + lists:foldl( + fun(?PUSH(Prio, Op), QAcc) -> stash_add(Prio, Op, QAcc) end, + Stash, + Pushes + ). + +stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) -> + Route = {Topic, Dest}, + case maps:get(Route, Stash, undefined) of + undefined -> + Stash#{Route => {Action, Prio, Ctx}}; + RouteOp -> + case merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)) of + undefined -> + maps:remove(Route, Stash); + RouteOpMerged -> + Stash#{Route := RouteOpMerged} + end + end. + +merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) -> + %% NOTE: This should not happen anyway. + _ = replyctx_send(ignored, Ctx1), + DestOp; +merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2)) -> + %% NOTE: Operations cancel each other. + _ = replyctx_send(ok, Ctx1), + _ = replyctx_send(ok, Ctx2), + undefined. + +%% + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +batch_test() -> + Dest = node(), + Ctx = fun(N) -> {N, self()} end, + Stash = stash_add( + [ + ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/1">>, Dest, Ctx(2))), + ?PUSH(?PRIO_LO, ?OP(add, <<"t/1">>, Dest, Ctx(3))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(4))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(5))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/4">>, Dest, Ctx(6))), + ?PUSH(?PRIO_LO, ?OP(delete, <<"t/3">>, Dest, Ctx(7))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"t/3">>, Dest, Ctx(8))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(9))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"old/1">>, Dest, Ctx(10))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(11))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"old/2">>, Dest, Ctx(12))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(13))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(14))), + ?PUSH(?PRIO_LO, ?OP(delete, <<"old/3">>, Dest, Ctx(15))), + ?PUSH(?PRIO_LO, ?OP(delete, <<"t/2">>, Dest, Ctx(16))) + ], + stash_new() + ), + {Batch, StashLeft} = mk_batch(Stash, 5), + ?assertMatch( + #{ + {<<"t/1">>, Dest} := {add, ?PRIO_LO, _}, + {<<"t/3">>, Dest} := {add, ?PRIO_HI, _}, + {<<"t/2">>, Dest} := {delete, ?PRIO_LO, _}, + {<<"t/4">>, Dest} := {add, ?PRIO_HI, _}, + {<<"old/3">>, Dest} := {delete, ?PRIO_LO, _} + }, + Batch + ), + ?assertMatch( + #{ + {<<"old/1">>, Dest} := {delete, ?PRIO_BG, _}, + {<<"old/2">>, Dest} := {delete, ?PRIO_BG, _} + }, + StashLeft + ), + ?assertEqual( + [ + {2, ignored}, + {1, ok}, + {4, ok}, + {5, ok}, + {7, ok}, + {9, ok}, + {11, ok}, + {8, ok}, + {13, ok} + ], + emqx_utils_stream:consume(emqx_utils_stream:mqueue(0)) + ). + +-endif. From a1ccf85c66577ba3d606596442ab13f0c006325a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 5 Jan 2024 18:35:49 +0100 Subject: [PATCH 05/21] test(routesync): verify that syncer preserves consistency Under a highly concurrent load. Be aware that this testcase is not deterministic. --- apps/emqx/test/emqx_routing_SUITE.erl | 138 +++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 10ca866fe..af8af737b 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx_router.hrl"). @@ -29,7 +30,8 @@ all() -> {group, routing_schema_v1}, {group, routing_schema_v2}, t_routing_schema_switch_v1, - t_routing_schema_switch_v2 + t_routing_schema_switch_v2, + t_concurrent_routing_updates ]. groups() -> @@ -182,6 +184,140 @@ unsubscribe(C, Topic) -> %% +-define(SUBSCRIBE_TOPICS, [ + <<"t/#">>, + <<"t/fixed">>, + <<"t/1/+">>, + <<"t/2/+">>, + <<"t/42/+/+">>, + <<"client/${i}/+">>, + <<"client/${i}/fixed">>, + <<"client/${i}/#">>, + <<"rand/${r}/+">>, + <<"rand/${r}/fixed">> +]). + +t_concurrent_routing_updates(init, Config) -> + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + config => #{broker => #{routing => #{storage_schema => v2}}}, + before_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([node, broker_pool_size], 2) + end + }} + ], + #{work_dir => WorkDir} + ), + ok = snabbkaffe:start_trace(), + [{tc_apps, Apps} | Config]; +t_concurrent_routing_updates('end', Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_suite:stop(?config(tc_apps, Config)). + +t_concurrent_routing_updates(_Config) -> + NClients = 400, + NRTopics = 250, + MCommands = 8, + Port = get_mqtt_tcp_port(node()), + Clients = [ + spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics]) + || I <- lists:seq(1, NClients) + ], + ok = lists:foreach(fun ping_concurrent_client/1, Clients), + ok = timer:sleep(200), + Subscribers = ets:tab2list(?SUBSCRIBER), + Topics = maps:keys(maps:from_list(Subscribers)), + ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())), + ok = lists:foreach(fun stop_concurrent_client/1, Clients), + ok = timer:sleep(1000), + ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]), + ?assertEqual([], ets:tab2list(?SUBSCRIBER)), + ?assertEqual([], emqx_router:topics()). + +run_concurrent_client(I, Port, MCommands, NRTopics) -> + % _ = rand:seed(default, I), + Ctx = #{ + i => I, + r => rand:uniform(NRTopics) + }, + {ok, C} = emqtt:start_link(#{port => Port, clientid => render("client:${i}", Ctx)}), + {ok, _Props} = emqtt:connect(C), + NCommands = rand:uniform(MCommands), + Commands = gen_concurrent_client_plan(NCommands, Ctx), + ok = subscribe_concurrent_client(C, Commands), + run_concurrent_client_loop(C). + +gen_concurrent_client_plan(N, Ctx) -> + lists:foldl( + fun(_, Acc) -> mixin(pick_random_command(Ctx), Acc) end, + [], + lists:seq(1, N) + ). + +subscribe_concurrent_client(C, Commands) -> + lists:foreach( + fun + ({subscribe, Topic}) -> + {ok, _Props, [0]} = emqtt:subscribe(C, Topic); + ({unsubscribe, Topic}) -> + {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic) + end, + Commands + ). + +pick_random_command(Ctx) -> + Topic = render(randpick(?SUBSCRIBE_TOPICS), Ctx), + randpick([ + [{subscribe, Topic}], + [{subscribe, Topic}, {unsubscribe, Topic}] + ]). + +render(Template, Ctx) -> + iolist_to_binary(emqx_template:render_strict(emqx_template:parse(Template), Ctx)). + +run_concurrent_client_loop(C) -> + receive + {From, Ref, F} -> + Reply = F(C), + From ! {Ref, Reply}, + run_concurrent_client_loop(C) + end. + +ping_concurrent_client(Pid) -> + Ref = make_ref(), + Pid ! {self(), Ref, fun emqtt:ping/1}, + receive + {Ref, Reply} -> Reply + after 5000 -> + error(timeout) + end. + +stop_concurrent_client(Pid) -> + MRef = erlang:monitor(process, Pid), + true = erlang:unlink(Pid), + true = erlang:exit(Pid, shutdown), + receive + {'DOWN', MRef, process, Pid, Reason} -> Reason + end. + +randpick(List) -> + lists:nth(rand:uniform(length(List)), List). + +mixin(L = [H | T], Into = [HInto | TInto]) -> + case rand:uniform(length(Into) + 1) of + 1 -> [H | mixin(T, Into)]; + _ -> [HInto | mixin(L, TInto)] + end; +mixin(L, Into) -> + L ++ Into. + +%% + t_routing_schema_switch_v1(Config) -> WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), t_routing_schema_switch(_From = v2, _To = v1, WorkDir). From 38e13f2337e41254b8848fe7cab33b307efbb369 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Jan 2024 20:00:32 +0100 Subject: [PATCH 06/21] fix(syncer): start syncer pool before broker pool As the latter depends on the former. --- apps/emqx/src/emqx_broker_sup.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl index e6d93d92f..aee8dff5d 100644 --- a/apps/emqx/src/emqx_broker_sup.erl +++ b/apps/emqx/src/emqx_broker_sup.erl @@ -66,4 +66,4 @@ init([]) -> modules => [emqx_broker_helper] }, - {ok, {{one_for_all, 0, 1}, [BrokerPool, SyncerPool, SharedSub, Helper]}}. + {ok, {{one_for_all, 0, 1}, [SyncerPool, BrokerPool, SharedSub, Helper]}}. From 0b3f5f7c3775aba0f150adc2c56096741a855e0e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 9 Jan 2024 13:19:11 +0100 Subject: [PATCH 07/21] feat(syncer): allow to turn syncer pool on/off through config --- apps/emqx/src/emqx_broker.erl | 14 ++++- apps/emqx/src/emqx_schema.erl | 16 +++++ apps/emqx/test/emqx_cth_suite.erl | 1 + apps/emqx/test/emqx_routing_SUITE.erl | 91 +++++++++++++++++---------- rel/i18n/emqx_schema.hocon | 4 ++ 5 files changed, 92 insertions(+), 34 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 3ca152749..4851a20ea 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -604,11 +604,21 @@ do_dispatch({shard, I}, Topic, Msg) -> %% maybe_add_route(_Existed = false, Topic, ReplyTo) -> - emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); + add_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic, ReplyTo); maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. +add_route(_BatchSync = true, Topic, ReplyTo) -> + emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); +add_route(_BatchSync = false, Topic, _ReplyTo) -> + emqx_router:do_add_route(Topic, node()). + maybe_delete_route(_Exists = false, Topic) -> - emqx_router_syncer:push(delete, Topic, node(), #{}); + delete_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic); maybe_delete_route(_Exists = true, _Topic) -> ok. + +delete_route(_BatchSync = true, Topic) -> + emqx_router_syncer:push(delete, Topic, node(), #{}); +delete_route(_BatchSync = false, Topic) -> + emqx_router:do_delete_route(Topic, node()). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e972c57e0..14eef30d3 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1404,6 +1404,22 @@ fields("broker_routing") -> 'readOnly' => true, desc => ?DESC(broker_routing_storage_schema) } + )}, + {"batch_sync", + sc( + ref("broker_routing_batch_sync"), + #{importance => ?IMPORTANCE_HIDDEN} + )} + ]; +fields("broker_routing_batch_sync") -> + [ + {"enable", + sc( + boolean(), + #{ + default => false, + desc => ?DESC(broker_routing_batch_sync_enabled) + } )} ]; fields("shared_subscription_group") -> diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 042ef91db..fbb9da595 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -72,6 +72,7 @@ -export([stop_apps/1]). -export([merge_appspec/2]). +-export([merge_config/2]). %% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs -export([schema_module/0, upgrade_raw_conf/1]). diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index af8af737b..9753e1d9b 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -30,32 +30,52 @@ all() -> {group, routing_schema_v1}, {group, routing_schema_v2}, t_routing_schema_switch_v1, - t_routing_schema_switch_v2, - t_concurrent_routing_updates + t_routing_schema_switch_v2 ]. groups() -> - TCs = [ + GroupVsn = [ + {group, batch_sync_on}, + {group, batch_sync_off} + ], + GroupBase = [ + {group, cluster}, + t_concurrent_routing_updates + ], + ClusterTCs = [ t_cluster_routing, t_slow_rlog_routing_consistency ], [ - {routing_schema_v1, [], TCs}, - {routing_schema_v2, [], TCs} + {routing_schema_v1, [], GroupVsn}, + {routing_schema_v2, [], GroupVsn}, + {batch_sync_on, [], GroupBase}, + {batch_sync_off, [], GroupBase}, + {cluster, [], ClusterTCs} ]. -init_per_group(GroupName, Config) -> - WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), +init_per_group(routing_schema_v1, Config) -> + [{emqx_config, "broker.routing.storage_schema = v1"} | Config]; +init_per_group(routing_schema_v2, Config) -> + [{emqx_config, "broker.routing.storage_schema = v2"} | Config]; +init_per_group(batch_sync_on, Config) -> + [{emqx_config, "broker.routing.batch_sync.enable = true"} | Config]; +init_per_group(batch_sync_off, Config) -> + [{emqx_config, "broker.routing.batch_sync.enable = false"} | Config]; +init_per_group(cluster, Config) -> + WorkDir = emqx_cth_suite:work_dir(Config), NodeSpecs = [ - {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}}, - {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}}, - {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}} + {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}}, + {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}}, + {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}} ], Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), [{cluster, Nodes} | Config]. -end_per_group(_GroupName, Config) -> - emqx_cth_cluster:stop(?config(cluster, Config)). +end_per_group(cluster, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)); +end_per_group(_, _Config) -> + ok. init_per_testcase(TC, Config) -> emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config). @@ -63,9 +83,9 @@ init_per_testcase(TC, Config) -> end_per_testcase(TC, Config) -> emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config). -mk_emqx_appspec(GroupName, N) -> +mk_emqx_appspec(N, Config) -> {emqx, #{ - config => mk_config(GroupName, N), + config => mk_config(N, Config), after_start => fun() -> % NOTE % This one is actually defined on `emqx_conf_schema` level, but used @@ -79,24 +99,28 @@ mk_genrpc_appspec() -> override_env => [{port_discovery, stateless}] }}. -mk_config(GroupName, N) -> - #{ - broker => mk_config_broker(GroupName), - listeners => mk_config_listeners(N) - }. +mk_config(N, ConfigOrVsn) -> + emqx_cth_suite:merge_config( + mk_config_broker(ConfigOrVsn), + mk_config_listeners(N) + ). -mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 -> - #{routing => #{storage_schema => v1}}; -mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 -> - #{routing => #{storage_schema => v2}}. +mk_config_broker(v1) -> + "broker.routing.storage_schema = v1"; +mk_config_broker(v2) -> + "broker.routing.storage_schema = v2"; +mk_config_broker(CTConfig) -> + string:join(proplists:get_all_values(emqx_config, CTConfig), "\n"). mk_config_listeners(N) -> Port = 1883 + N, #{ - tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}}, - ssl => #{default => #{enable => false}}, - ws => #{default => #{enable => false}}, - wss => #{default => #{enable => false}} + listeners => #{ + tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}}, + ssl => #{default => #{enable => false}}, + ws => #{default => #{enable => false}}, + wss => #{default => #{enable => false}} + } }. %% @@ -202,12 +226,15 @@ t_concurrent_routing_updates(init, Config) -> Apps = emqx_cth_suite:start( [ {emqx, #{ - config => #{broker => #{routing => #{storage_schema => v2}}}, + config => mk_config_broker(Config), + %% NOTE + %% Artificially increasing pool workers contention by forcing small pool size. before_start => fun() -> % NOTE % This one is actually defined on `emqx_conf_schema` level, but used % in `emqx_broker`. Thus we have to resort to this ugly hack. - emqx_config:force_put([node, broker_pool_size], 2) + emqx_config:force_put([node, broker_pool_size], 2), + emqx_app:set_config_loader(?MODULE) end }} ], @@ -331,7 +358,7 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) -> [Node1] = emqx_cth_cluster:start( [ {routing_schema_switch1, #{ - apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)] + apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, VTo)] }} ], #{work_dir => WorkDir} @@ -344,12 +371,12 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) -> [Node2, Node3] = emqx_cth_cluster:start( [ {routing_schema_switch2, #{ - apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)], + apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, VFrom)], base_port => 20000, join_to => Node1 }}, {routing_schema_switch3, #{ - apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)], + apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, VFrom)], base_port => 20100, join_to => Node1 }} diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 84305317f..bd8909c3d 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1541,6 +1541,10 @@ Set v1 to use the former schema. NOTE: Schema v2 is still experimental. NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect.""" +broker_routing_batch_sync_enable.desc: +"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner. +Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.""" + broker_perf_trie_compaction.desc: """Enable trie path compaction. Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber. From 498b7a922d26f78026b550e6a31b313fb643bee1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 18:45:01 +0100 Subject: [PATCH 08/21] chore(syncer): fix code style issues Co-Authored-By: Zaiming (Stone) Shi --- apps/emqx/src/emqx_router_syncer.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index bdbeffb4b..d81fc5d88 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -78,7 +78,7 @@ push(Action, Topic, Dest, Opts) -> Worker = gproc_pool:pick_worker(?POOL, Topic), Prio = designate_prio(Action, Opts), Context = mk_push_context(Opts), - Worker ! ?PUSH(Prio, {Action, Topic, Dest, Context}), + _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})), case Context of {MRef, _} -> MRef; @@ -148,7 +148,7 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) -> ok = send_replies(Errors, Batch), NState = State#{stash := Stash3}, %% TODO: postpone if only ?PRIO_BG operations left? - case stash_empty(Stash3) of + case is_stash_empty(Stash3) of true -> NState; false -> @@ -209,7 +209,8 @@ send_replies(Errors, Batch) -> replyctx_send(_Result, []) -> noreply; replyctx_send(Result, {MRef, Pid}) -> - Pid ! {MRef, Result}. + _ = erlang:send(Pid, {MRef, Result}), + ok. %% @@ -221,7 +222,7 @@ run_batch(Batch) -> stash_new() -> #{}. -stash_empty(Stash) -> +is_stash_empty(Stash) -> maps:size(Stash) =:= 0. stash_drain(Stash) -> From 7d037cfe910f8e85a905f119adcbc02d329f4da6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 19:08:23 +0100 Subject: [PATCH 09/21] chore(route-sync): clarify why wait indefinitely for reply --- apps/emqx/src/emqx_router_syncer.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index d81fc5d88..c1928108a 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -89,7 +89,14 @@ push(Action, Topic, Dest, Opts) -> -spec wait(_WaitRef :: reference()) -> ok | {error, _Reason}. wait(MRef) -> - %% FIXME: timeouts + %% NOTE + %% No timeouts here because (as in `emqx_broker:call/2` case) callers do not + %% really expect this to fail with timeout exception. However, waiting + %% indefinitely is not the best option since it blocks the caller from receiving + %% other messages, so for instance channel (connection) process may not be able + %% to react to socket close event in time. Better option would probably be to + %% introduce cancellable operation, which will be able to check if the caller + %% would still be interested in the result. receive {MRef, Result} -> Result From d6f731c4fce99e62a7977d0bb5e2d374f08fda2d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 19:18:24 +0100 Subject: [PATCH 10/21] fix(route-sync): use public function as mria activity target --- apps/emqx/src/emqx_router.erl | 70 +++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 54667065a..3b191ac36 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -46,9 +46,9 @@ ]). %% Mria Activity RPC targets -% -export([ -% mria_insert_route/2 -% ]). +-export([ + mria_batch_run/2 +]). -export([do_batch/1]). @@ -96,9 +96,12 @@ -export_type([dest/0]). -type group() :: binary(). - -type dest() :: node() | {group(), node()}. +%% Operation :: {add, ...} | {delete, ...}. +-type batch() :: #{batch_route() => _Operation :: tuple()}. +-type batch_route() :: {emqx_types:topic(), dest()}. + -record(routeidx, { entry :: '$1' | emqx_topic_index:key(dest()), unused = [] :: nil() @@ -241,11 +244,7 @@ mria_delete_route(v2, Topic, Dest, Ctx) -> mria_delete_route(v1, Topic, Dest, Ctx) -> mria_delete_route_v1(Topic, Dest, Ctx). --spec do_batch(Batch) -> Errors when - %% Operation :: {add, ...} | {delete, ...}. - Batch :: #{Route => _Operation :: tuple()}, - Errors :: #{Route => _Error}, - Route :: {emqx_types:topic(), dest()}. +-spec do_batch(batch()) -> #{batch_route() => _Error}. do_batch(Batch) -> Nodes = batch_get_dest_nodes(Batch), ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)), @@ -257,30 +256,15 @@ mria_batch(v1, Batch) -> mria_batch_v1(Batch). mria_batch_v2(Batch) -> - mria:async_dirty(?ROUTE_SHARD, fun mria_batch_run/2, [v2, Batch]). + mria:async_dirty(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v2, Batch]). mria_batch_v1(Batch) -> - {atomic, Res} = mria:transaction(?ROUTE_SHARD, fun mria_batch_run/2, [v1, Batch]), - Res. - -mria_batch_run(SchemaVsn, Batch) -> - maps:fold( - fun({Topic, Dest}, Op, Errors) -> - case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of - ok -> - Errors; - Error -> - Errors#{{Topic, Dest} => Error} - end - end, - #{}, - Batch - ). - -mria_batch_operation(SchemaVsn, add, Topic, Dest) -> - mria_insert_route(SchemaVsn, Topic, Dest, batch); -mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> - mria_delete_route(SchemaVsn, Topic, Dest, batch). + case mria:transaction(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v1, Batch]) of + {atomic, Result} -> + Result; + Error -> + Error + end. batch_get_dest_nodes(Batch) -> maps:fold( @@ -368,6 +352,30 @@ call(Router, Msg) -> pick(Topic) -> gproc_pool:pick_worker(router_pool, Topic). +%%-------------------------------------------------------------------- +%% Route batch RPC targets +%%-------------------------------------------------------------------- + +-spec mria_batch_run(schemavsn(), batch()) -> #{batch_route() => _Error}. +mria_batch_run(SchemaVsn, Batch) -> + maps:fold( + fun({Topic, Dest}, Op, Errors) -> + case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of + ok -> + Errors; + Error -> + Errors#{{Topic, Dest} => Error} + end + end, + #{}, + Batch + ). + +mria_batch_operation(SchemaVsn, add, Topic, Dest) -> + mria_insert_route(SchemaVsn, Topic, Dest, batch); +mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> + mria_delete_route(SchemaVsn, Topic, Dest, batch). + %%-------------------------------------------------------------------- %% Schema v1 %% -------------------------------------------------------------------- From 884f784c1c478e70ac32f5903cbccbcc136ad1ba Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 19:20:26 +0100 Subject: [PATCH 11/21] refactor(router): don't `emqx_broker_helper:monitor/1` in batches As per out current understanding, this doesn't changes much in terms of observability, since other nodes call `ekka:monitor(membership)` anyway, so they will observe nodedowns without explicitly writing an entry into the `?ROUTING_NODE` table. --- apps/emqx/src/emqx_router.erl | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3b191ac36..e7ab37ace 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -246,8 +246,6 @@ mria_delete_route(v1, Topic, Dest, Ctx) -> -spec do_batch(batch()) -> #{batch_route() => _Error}. do_batch(Batch) -> - Nodes = batch_get_dest_nodes(Batch), - ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)), mria_batch(get_schema_vsn(), Batch). mria_batch(v2, Batch) -> @@ -266,20 +264,6 @@ mria_batch_v1(Batch) -> Error end. -batch_get_dest_nodes(Batch) -> - maps:fold( - fun({_Topic, Dest}, Op, Acc) -> - case batch_get_action(Op) of - add -> - ordsets:add_element(get_dest_node(Dest), Acc); - delete -> - Acc - end - end, - ordsets:new(), - Batch - ). - batch_get_action(Op) -> element(1, Op). From 2dffd44985e2290a391e36225e973777938dfc1b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 20:08:40 +0100 Subject: [PATCH 12/21] feat(route-sync): allow to enable syncer pool only on cores/replicants So we would able to roll it out and test more gradually. --- apps/emqx/src/emqx_broker.erl | 34 +++++++++++++++++++-------- apps/emqx/src/emqx_schema.erl | 10 ++++---- apps/emqx/test/emqx_routing_SUITE.erl | 8 +++++-- rel/i18n/emqx_schema.hocon | 9 +++++-- 4 files changed, 43 insertions(+), 18 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 4851a20ea..7dd3d08be 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -604,21 +604,35 @@ do_dispatch({shard, I}, Topic, Msg) -> %% maybe_add_route(_Existed = false, Topic, ReplyTo) -> - add_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic, ReplyTo); + sync_route(add, Topic, #{reply => ReplyTo}); maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. -add_route(_BatchSync = true, Topic, ReplyTo) -> - emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); -add_route(_BatchSync = false, Topic, _ReplyTo) -> - emqx_router:do_add_route(Topic, node()). - maybe_delete_route(_Exists = false, Topic) -> - delete_route(emqx_config:get([broker, routing, batch_sync, enable]), Topic); + sync_route(delete, Topic, #{}); maybe_delete_route(_Exists = true, _Topic) -> ok. -delete_route(_BatchSync = true, Topic) -> - emqx_router_syncer:push(delete, Topic, node(), #{}); -delete_route(_BatchSync = false, Topic) -> +sync_route(Action, Topic, ReplyTo) -> + EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]), + case EnabledOn of + all -> + push_sync_route(Action, Topic, ReplyTo); + none -> + regular_sync_route(Action, Topic); + Role -> + case mria_config:whoami() of + Role -> + push_sync_route(Action, Topic, ReplyTo); + _Disabled -> + regular_sync_route(Action, Topic) + end + end. + +push_sync_route(Action, Topic, Opts) -> + emqx_router_syncer:push(Action, Topic, node(), Opts). + +regular_sync_route(add, Topic) -> + emqx_router:do_add_route(Topic, node()); +regular_sync_route(delete, Topic) -> emqx_router:do_delete_route(Topic, node()). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 14eef30d3..1dd0a55ed 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1413,12 +1413,14 @@ fields("broker_routing") -> ]; fields("broker_routing_batch_sync") -> [ - {"enable", + {"enable_on", sc( - boolean(), + hoconsc:enum([none, core, replicant, all]), #{ - default => false, - desc => ?DESC(broker_routing_batch_sync_enabled) + %% TODO + %% Make `replicant` the default value after initial release. + default => none, + desc => ?DESC(broker_routing_batch_sync_enable_on) } )} ]; diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 9753e1d9b..70b4eaf51 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -36,6 +36,7 @@ all() -> groups() -> GroupVsn = [ {group, batch_sync_on}, + {group, batch_sync_replicants}, {group, batch_sync_off} ], GroupBase = [ @@ -50,6 +51,7 @@ groups() -> {routing_schema_v1, [], GroupVsn}, {routing_schema_v2, [], GroupVsn}, {batch_sync_on, [], GroupBase}, + {batch_sync_replicants, [], GroupBase}, {batch_sync_off, [], GroupBase}, {cluster, [], ClusterTCs} ]. @@ -59,9 +61,11 @@ init_per_group(routing_schema_v1, Config) -> init_per_group(routing_schema_v2, Config) -> [{emqx_config, "broker.routing.storage_schema = v2"} | Config]; init_per_group(batch_sync_on, Config) -> - [{emqx_config, "broker.routing.batch_sync.enable = true"} | Config]; + [{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config]; +init_per_group(batch_sync_replicants, Config) -> + [{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config]; init_per_group(batch_sync_off, Config) -> - [{emqx_config, "broker.routing.batch_sync.enable = false"} | Config]; + [{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config]; init_per_group(cluster, Config) -> WorkDir = emqx_cth_suite:work_dir(Config), NodeSpecs = [ diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index bd8909c3d..fe315b5d7 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1541,9 +1541,14 @@ Set v1 to use the former schema. NOTE: Schema v2 is still experimental. NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect.""" -broker_routing_batch_sync_enable.desc: +broker_routing_batch_sync_enable_on.desc: """Use separate process pool to synchronize subscriptions with the global routing table in a batched manner. -Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.""" +Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded. +The selected value determines which nodes in the cluster will have this feature enabled. +- all: enables it unconditionally on each node, +- replicant: enables it only on replicants (e.g. those where node.role = replicant), +- core: enables it only on core nodes, +- none: disables this altogether.""" broker_perf_trie_compaction.desc: """Enable trie path compaction. From a28fc7bfa86a6069aa94381ea9c3319b3911900a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 20:10:34 +0100 Subject: [PATCH 13/21] feat(route-sync): do not run empty batches --- apps/emqx/src/emqx_router_syncer.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index c1928108a..a694a7134 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -221,8 +221,10 @@ replyctx_send(Result, {MRef, Pid}) -> %% -run_batch(Batch) -> - emqx_router:do_batch(Batch). +run_batch(Batch) when map_size(Batch) > 0 -> + emqx_router:do_batch(Batch); +run_batch(_Empty) -> + #{}. %% From 2ac6cddf19f91327f0899263e7776dcb34dd78bb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 21:25:24 +0100 Subject: [PATCH 14/21] fix(route-sync): handle batch sync errors gracefully --- apps/emqx/src/emqx_router_syncer.erl | 103 ++++++++++++++++++----- apps/emqx/test/emqx_routing_SUITE.erl | 116 +++++++++++++++++++------- 2 files changed, 169 insertions(+), 50 deletions(-) diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index a694a7134..e10ed399c 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -16,6 +16,7 @@ -module(emqx_router_syncer). +-include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -behaviour(gen_server). @@ -25,6 +26,8 @@ -export([push/4]). -export([wait/1]). +-export([stats/0]). + -export([ init/1, handle_call/3, @@ -40,6 +43,16 @@ -define(MAX_BATCH_SIZE, 1000). -define(MIN_SYNC_INTERVAL, 1). +%% How long (ms) to idle after observing a batch sync error? +%% Should help to avoid excessive retries in situations when errors are caused by +%% conditions that take some time to resolve (e.g. restarting an upstream core node). +-define(ERROR_DELAY, 10). + +%% How soon (ms) to retry last failed batch sync attempt? +%% Only matter in absence of new operations, otherwise batch sync is triggered as +%% soon as `?ERROR_DELAY` is over. +-define(ERROR_RETRY_INTERVAL, 500). + -define(PRIO_HI, 1). -define(PRIO_LO, 2). -define(PRIO_BG, 3). @@ -117,16 +130,36 @@ mk_push_context(_) -> %% +-type stats() :: #{ + size := non_neg_integer(), + n_add := non_neg_integer(), + n_delete := non_neg_integer(), + prio_highest := non_neg_integer() | undefined, + prio_lowest := non_neg_integer() | undefined +}. + +-spec stats() -> [stats()]. +stats() -> + Workers = gproc_pool:active_workers(?POOL), + [gen_server:call(Pid, stats, infinity) || {_Name, Pid} <- Workers]. + +%% + init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{stash => stash_new()}}. +handle_call(stats, _From, State = #{stash := Stash}) -> + {reply, stash_stats(Stash), State}; handle_call(_Call, _From, State) -> {reply, ignored, State}. handle_cast(_Msg, State) -> {noreply, State}. +handle_info({timeout, _TRef, retry}, State) -> + NState = run_batch_loop([], maps:remove(retry_timer, State)), + {noreply, NState}; handle_info(Push = ?PUSH(_, _), State) -> %% NOTE: Wait a bit to collect potentially overlapping operations. ok = timer:sleep(?MIN_SYNC_INTERVAL), @@ -142,26 +175,41 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) -> Stash1 = stash_add(Incoming, Stash0), Stash2 = stash_drain(Stash1), {Batch, Stash3} = mk_batch(Stash2), - ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, #{ - size => maps:size(Batch), - stashed => maps:size(Stash3), - n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Batch)), - n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Batch)), - prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Batch), - prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Batch) - }), - %% TODO: retry if error? - Errors = run_batch(Batch), - ok = send_replies(Errors, Batch), - NState = State#{stash := Stash3}, - %% TODO: postpone if only ?PRIO_BG operations left? - case is_stash_empty(Stash3) of - true -> - NState; - false -> - run_batch_loop([], NState) + ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)), + case run_batch(Batch) of + Status = #{} -> + ok = send_replies(Status, Batch), + NState = cancel_retry_timer(State#{stash := Stash3}), + %% TODO: postpone if only ?PRIO_BG operations left? + case is_stash_empty(Stash3) of + true -> + NState; + false -> + run_batch_loop([], NState) + end; + BatchError -> + ?SLOG(warning, #{ + msg => "router_batch_sync_failed", + reason => BatchError, + batch => batch_stats(Batch, Stash3) + }), + NState = State#{stash := Stash2}, + ok = timer:sleep(?ERROR_DELAY), + ensure_retry_timer(NState) end. +ensure_retry_timer(State = #{retry_timer := _TRef}) -> + State; +ensure_retry_timer(State) -> + TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry), + State#{retry_timer => TRef}. + +cancel_retry_timer(State = #{retry_timer := TRef}) -> + ok = emqx_utils:cancel_timer(TRef), + maps:remove(retry_timer, State); +cancel_retry_timer(State) -> + State. + %% mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE -> @@ -222,7 +270,7 @@ replyctx_send(Result, {MRef, Pid}) -> %% run_batch(Batch) when map_size(Batch) > 0 -> - emqx_router:do_batch(Batch); + catch emqx_router:do_batch(Batch); run_batch(_Empty) -> #{}. @@ -275,6 +323,23 @@ merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2 %% +batch_stats(Batch, Stash) -> + BatchStats = stash_stats(Batch), + BatchStats#{ + stashed => maps:size(Stash) + }. + +stash_stats(Stash) -> + #{ + size => maps:size(Stash), + n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Stash)), + n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Stash)), + prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Stash), + prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Stash) + }. + +%% + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 70b4eaf51..96ca0b297 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -39,21 +39,21 @@ groups() -> {group, batch_sync_replicants}, {group, batch_sync_off} ], - GroupBase = [ - {group, cluster}, - t_concurrent_routing_updates - ], ClusterTCs = [ t_cluster_routing, t_slow_rlog_routing_consistency ], + SingleTCs = [t_concurrent_routing_updates], + BatchSyncTCs = lists:duplicate(5, t_concurrent_routing_updates_with_errors), [ {routing_schema_v1, [], GroupVsn}, {routing_schema_v2, [], GroupVsn}, - {batch_sync_on, [], GroupBase}, - {batch_sync_replicants, [], GroupBase}, - {batch_sync_off, [], GroupBase}, - {cluster, [], ClusterTCs} + {batch_sync_on, [], [{group, cluster}, {group, single_batch_on}]}, + {batch_sync_replicants, [], [{group, cluster}, {group, single}]}, + {batch_sync_off, [], [{group, cluster}, {group, single}]}, + {cluster, [], ClusterTCs}, + {single_batch_on, [], SingleTCs ++ BatchSyncTCs}, + {single, [], SingleTCs} ]. init_per_group(routing_schema_v1, Config) -> @@ -74,10 +74,38 @@ init_per_group(cluster, Config) -> {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}} ], Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), - [{cluster, Nodes} | Config]. + [{cluster, Nodes} | Config]; +init_per_group(GroupName, Config) when + GroupName =:= single_batch_on; + GroupName =:= single +-> + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + config => mk_config_broker(Config), + %% NOTE + %% Artificially increasing pool workers contention by forcing small pool size. + before_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([node, broker_pool_size], 2), + emqx_app:set_config_loader(?MODULE) + end + }} + ], + #{work_dir => WorkDir} + ), + [{group_apps, Apps} | Config]. end_per_group(cluster, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)); +end_per_group(GroupName, Config) when + GroupName =:= single_batch_on; + GroupName =:= single +-> + emqx_cth_suite:stop(?config(group_apps, Config)); end_per_group(_, _Config) -> ok. @@ -226,29 +254,10 @@ unsubscribe(C, Topic) -> ]). t_concurrent_routing_updates(init, Config) -> - WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), - Apps = emqx_cth_suite:start( - [ - {emqx, #{ - config => mk_config_broker(Config), - %% NOTE - %% Artificially increasing pool workers contention by forcing small pool size. - before_start => fun() -> - % NOTE - % This one is actually defined on `emqx_conf_schema` level, but used - % in `emqx_broker`. Thus we have to resort to this ugly hack. - emqx_config:force_put([node, broker_pool_size], 2), - emqx_app:set_config_loader(?MODULE) - end - }} - ], - #{work_dir => WorkDir} - ), ok = snabbkaffe:start_trace(), - [{tc_apps, Apps} | Config]; -t_concurrent_routing_updates('end', Config) -> - ok = snabbkaffe:stop(), - ok = emqx_cth_suite:stop(?config(tc_apps, Config)). + Config; +t_concurrent_routing_updates('end', _Config) -> + ok = snabbkaffe:stop(). t_concurrent_routing_updates(_Config) -> NClients = 400, @@ -270,6 +279,51 @@ t_concurrent_routing_updates(_Config) -> ?assertEqual([], ets:tab2list(?SUBSCRIBER)), ?assertEqual([], emqx_router:topics()). +t_concurrent_routing_updates_with_errors(init, Config) -> + ok = snabbkaffe:start_trace(), + ok = meck:new(emqx_router, [passthrough, no_history]), + Config; +t_concurrent_routing_updates_with_errors('end', _Config) -> + ok = meck:unload(emqx_router), + ok = snabbkaffe:stop(). + +t_concurrent_routing_updates_with_errors(_Config) -> + NClients = 100, + NRTopics = 80, + MCommands = 6, + PSyncError = 0.1, + Port = get_mqtt_tcp_port(node()), + %% Crash the batch sync operation with some small probability. + ok = meck:expect(emqx_router, mria_batch_run, fun(Vsn, Batch) -> + case rand:uniform() < PSyncError of + false -> meck:passthrough([Vsn, Batch]); + true -> error(overload) + end + end), + Clients = [ + spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics]) + || I <- lists:seq(1, NClients) + ], + ok = lists:foreach(fun ping_concurrent_client/1, Clients), + 0 = ?retry( + _Interval = 500, + _NTimes = 10, + 0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()]) + ), + Subscribers = ets:tab2list(?SUBSCRIBER), + Topics = maps:keys(maps:from_list(Subscribers)), + ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())), + ok = lists:foreach(fun stop_concurrent_client/1, Clients), + ok = timer:sleep(100), + 0 = ?retry( + 500, + 10, + 0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()]) + ), + ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]), + ?assertEqual([], ets:tab2list(?SUBSCRIBER)), + ?assertEqual([], emqx_router:topics()). + run_concurrent_client(I, Port, MCommands, NRTopics) -> % _ = rand:seed(default, I), Ctx = #{ From 8f4758d9d4eb6d0d6c6a919ea2114d1c1d596a85 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 11 Jan 2024 11:50:52 +0100 Subject: [PATCH 15/21] feat(route-sync): use the smallest possible min sync delay --- apps/emqx/src/emqx_router_syncer.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index e10ed399c..741e01295 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -41,7 +41,11 @@ -define(POOL, router_syncer_pool). -define(MAX_BATCH_SIZE, 1000). --define(MIN_SYNC_INTERVAL, 1). + +%% How long to idle (ms) after receiving a new operation before triggering batch sync? +%% Zero effectively just schedules out the process, so that it has a chance to receive +%% more operations, and introduce no minimum delay. +-define(MIN_SYNC_INTERVAL, 0). %% How long (ms) to idle after observing a batch sync error? %% Should help to avoid excessive retries in situations when errors are caused by From e21a3497c7dc01deada2352a31884972a5457d11 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 11 Jan 2024 18:55:39 +0100 Subject: [PATCH 16/21] chore(route-sync): turn TODO into comment --- apps/emqx/src/emqx_router_syncer.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index 741e01295..ad076fc31 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -184,7 +184,12 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) -> Status = #{} -> ok = send_replies(Status, Batch), NState = cancel_retry_timer(State#{stash := Stash3}), - %% TODO: postpone if only ?PRIO_BG operations left? + %% NOTE + %% We could postpone batches where only `?PRIO_BG` operations left, which + %% would allow to do less work in situations when there are intermittently + %% reconnecting clients with moderately unique subscriptions. However, this + %% would also require us to forego the idempotency of batch syncs (see + %% `merge_route_op/2`). case is_stash_empty(Stash3) of true -> NState; From 2f98f1faafced11b5534b9196e0f7c8534e589ad Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 11 Jan 2024 19:02:17 +0100 Subject: [PATCH 17/21] fix(route-sync): ensure batch sync preserve idemopotency --- apps/emqx/src/emqx_router_syncer.erl | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index ad076fc31..dccc681c8 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -312,23 +312,21 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) -> undefined -> Stash#{Route => {Action, Prio, Ctx}}; RouteOp -> - case merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)) of - undefined -> - maps:remove(Route, Stash); - RouteOpMerged -> - Stash#{Route := RouteOpMerged} - end + RouteOpMerged = merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)), + Stash#{Route := RouteOpMerged} end. merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) -> %% NOTE: This should not happen anyway. _ = replyctx_send(ignored, Ctx1), DestOp; -merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2)) -> - %% NOTE: Operations cancel each other. +merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) -> + %% NOTE: Latter cancel the former. + %% Strictly speaking, in ideal conditions we could just cancel both, because they + %% essentially do not change the global state. However, we decided to stay on the + %% safe side and cancel only the former, making batch syncs idempotent. _ = replyctx_send(ok, Ctx1), - _ = replyctx_send(ok, Ctx2), - undefined. + DestOp. %% @@ -398,13 +396,13 @@ batch_test() -> [ {2, ignored}, {1, ok}, - {4, ok}, {5, ok}, - {7, ok}, + {7, ignored}, + {4, ok}, {9, ok}, - {11, ok}, {8, ok}, - {13, ok} + {13, ignored}, + {11, ok} ], emqx_utils_stream:consume(emqx_utils_stream:mqueue(0)) ). From b15106c753de3bb42f12266efda5396c4aaed8c5 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 12 Jan 2024 18:49:55 +0800 Subject: [PATCH 18/21] fix(iotdb): robustify type verification 1. let the type is not case sensitive 2. return error if type is invalid --- .../src/emqx_bridge_iotdb_connector.erl | 101 ++++++++++-------- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 23 ++++ 2 files changed, 80 insertions(+), 44 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 75161bdc2..2f078864e 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -128,7 +128,7 @@ fields("connection_fields") -> mk( hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), #{ - desc => ?DESC(emqx_bridge_iotdb, "iotdb_version"), + desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"), default => ?VSN_1_1_X } )}, @@ -422,25 +422,41 @@ proc_data(PreProcessedData, Msg) -> now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond), now_ns => NowNS }, - lists:map( - fun( - #{ - timestamp := TimestampTkn, - measurement := Measurement, - data_type := DataType0, - value := ValueTkn - } - ) -> - DataType = emqx_placeholder:proc_tmpl(DataType0, Msg), - #{ - timestamp => iot_timestamp(TimestampTkn, Msg, Nows), - measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), - data_type => DataType, - value => proc_value(DataType, ValueTkn, Msg) - } - end, - PreProcessedData - ). + proc_data(PreProcessedData, Msg, Nows, []). + +proc_data( + [ + #{ + timestamp := TimestampTkn, + measurement := Measurement, + data_type := DataType0, + value := ValueTkn + } + | T + ], + Msg, + Nows, + Acc +) -> + DataType = list_to_binary( + string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg))) + ), + case proc_value(DataType, ValueTkn, Msg) of + {ok, Value} -> + proc_data(T, Msg, Nows, [ + #{ + timestamp => iot_timestamp(TimestampTkn, Msg, Nows), + measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), + data_type => DataType, + value => Value + } + | Acc + ]); + Error -> + Error + end; +proc_data([], _Msg, _Nows, Acc) -> + {ok, lists:reverse(Acc)}. iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> Timestamp; @@ -459,16 +475,19 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). proc_value(<<"TEXT">>, ValueTkn, Msg) -> - case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of - <<"undefined">> -> null; - Val -> Val - end; + {ok, + case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of + <<"undefined">> -> null; + Val -> Val + end}; proc_value(<<"BOOLEAN">>, ValueTkn, Msg) -> - convert_bool(replace_var(ValueTkn, Msg)); + {ok, convert_bool(replace_var(ValueTkn, Msg))}; proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> -> - convert_int(replace_var(ValueTkn, Msg)); + {ok, convert_int(replace_var(ValueTkn, Msg))}; proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> -> - convert_float(replace_var(ValueTkn, Msg)). + {ok, convert_float(replace_var(ValueTkn, Msg))}; +proc_value(Type, _, _) -> + {error, {invalid_type, Type}}. replace_var(Tokens, Data) when is_list(Tokens) -> [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), @@ -630,9 +649,9 @@ eval_response_body(Body, Resp) -> preproc_data_template(DataList) -> Atom2Bin = fun - (Atom, Converter) when is_atom(Atom) -> - Converter(Atom); - (Bin, _) -> + (Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); + (Bin) -> Bin end, lists:map( @@ -645,18 +664,9 @@ preproc_data_template(DataList) -> } ) -> #{ - timestamp => emqx_placeholder:preproc_tmpl( - Atom2Bin(Timestamp, fun erlang:atom_to_binary/1) - ), + timestamp => emqx_placeholder:preproc_tmpl(Atom2Bin(Timestamp)), measurement => emqx_placeholder:preproc_tmpl(Measurement), - data_type => emqx_placeholder:preproc_tmpl( - Atom2Bin( - DataType, - fun(Atom) -> - erlang:list_to_binary(string:uppercase(erlang:atom_to_list(Atom))) - end - ) - ), + data_type => emqx_placeholder:preproc_tmpl(Atom2Bin(DataType)), value => emqx_placeholder:preproc_tmpl(Value) } end, @@ -681,9 +691,12 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) [] -> {error, invalid_data}; DataTemplate -> - DataList = proc_data(DataTemplate, Message), - - make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) + case proc_data(DataTemplate, Message) of + {ok, DataList} -> + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn); + Error -> + Error + end end end. diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index d2d5760e5..8145faf33 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -646,6 +646,29 @@ t_template(Config) -> iotdb_reset(Config, TemplateDeviceId), ok. +t_sync_query_case(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "InT32", "36"), + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query + ), + Query = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), + ?assertMatch( + #{<<"values">> := [[36]]}, + emqx_utils_json:decode(IoTDBResult) + ). + +t_sync_query_invalid_type(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"), + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), + IsInvalidType = fun(Result) -> ?assertMatch({error, {invalid_type, _}}, Result) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query + ). + is_empty(null) -> true; is_empty([]) -> true; is_empty([[]]) -> true; From b4eac256789d12cbb4e89785dfc74ffef19fc81e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 12 Jan 2024 13:54:23 +0100 Subject: [PATCH 19/21] chore(broker): fix code style issue Co-Authored-By: Zaiming (Stone) Shi --- apps/emqx/src/emqx_broker.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 7dd3d08be..ac9116cbd 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -621,10 +621,10 @@ sync_route(Action, Topic, ReplyTo) -> none -> regular_sync_route(Action, Topic); Role -> - case mria_config:whoami() of - Role -> + case Role =:= mria_config:whoami() of + true -> push_sync_route(Action, Topic, ReplyTo); - _Disabled -> + false -> regular_sync_route(Action, Topic) end end. From a5e7db793bbbba21a48efecc8487b35657af9875 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 12 Jan 2024 09:37:20 -0300 Subject: [PATCH 20/21] fix(mongodb_action): make `batch_size` hidden and fixed in the API Fixes https://emqx.atlassian.net/browse/ED-1171 The old bridge schema already had this override, but it had not been ported to actions. --- .../src/schema/emqx_bridge_v2_schema.erl | 15 +++++++++++---- .../src/emqx_bridge_mongodb.app.src | 2 +- .../src/emqx_bridge_mongodb.erl | 13 ++++++++++++- changes/ee/fix-12317.en.md | 1 + 4 files changed, 25 insertions(+), 6 deletions(-) create mode 100644 changes/ee/fix-12317.en.md diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 74239ffc0..fd0fb076c 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -50,8 +50,8 @@ ]). -export([ - make_producer_action_schema/1, - make_consumer_action_schema/1, + make_producer_action_schema/1, make_producer_action_schema/2, + make_consumer_action_schema/1, make_consumer_action_schema/2, top_level_common_action_keys/0, project_to_actions_resource_opts/1 ]). @@ -282,12 +282,19 @@ top_level_common_action_keys() -> %%====================================================================================== make_producer_action_schema(ActionParametersRef) -> + make_producer_action_schema(ActionParametersRef, _Opts = #{}). + +make_producer_action_schema(ActionParametersRef, Opts) -> [ {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})} - | make_consumer_action_schema(ActionParametersRef) + | make_consumer_action_schema(ActionParametersRef, Opts) ]. make_consumer_action_schema(ActionParametersRef) -> + make_consumer_action_schema(ActionParametersRef, _Opts = #{}). + +make_consumer_action_schema(ActionParametersRef, Opts) -> + ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, resource_opts)), [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {connector, @@ -297,7 +304,7 @@ make_consumer_action_schema(ActionParametersRef) -> {description, emqx_schema:description_schema()}, {parameters, ActionParametersRef}, {resource_opts, - mk(ref(?MODULE, resource_opts), #{ + mk(ResourceOptsRef, #{ default => #{}, desc => ?DESC(emqx_resource_schema, "resource_opts") })} diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index f361d5276..198c5f8e8 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mongodb, [ {description, "EMQX Enterprise MongoDB Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index 5ba73303f..e8eb93624 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -86,7 +86,8 @@ fields(mongodb_action) -> emqx_bridge_v2_schema:make_producer_action_schema( mk(ref(?MODULE, action_parameters), #{ required => true, desc => ?DESC(action_parameters) - }) + }), + #{resource_opts_ref => ref(?MODULE, action_resource_opts)} ); fields(action_parameters) -> [ @@ -95,6 +96,14 @@ fields(action_parameters) -> ]; fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); +fields(action_resource_opts) -> + emqx_bridge_v2_schema:resource_opts_fields([ + {batch_size, #{ + importance => ?IMPORTANCE_HIDDEN, + converter => fun(_, _) -> 1 end, + desc => ?DESC("batch_size") + }} + ]); fields(resource_opts) -> fields("creation_opts"); fields(mongodb_rs) -> @@ -213,6 +222,8 @@ desc("creation_opts") -> ?DESC(emqx_resource_schema, "creation_opts"); desc(resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); +desc(action_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(connector_resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); desc(mongodb_rs) -> diff --git a/changes/ee/fix-12317.en.md b/changes/ee/fix-12317.en.md new file mode 100644 index 000000000..c114c4a2a --- /dev/null +++ b/changes/ee/fix-12317.en.md @@ -0,0 +1 @@ +Removed the `resource_opts.batch_size` field from the MongoDB Action schema, as it's still not supported. From ac6ce3b72c9718727ec15c575b63812a6b0d5b04 Mon Sep 17 00:00:00 2001 From: yanzhiemq <125347458+yanzhiemq@users.noreply.github.com> Date: Mon, 15 Jan 2024 09:26:23 +0800 Subject: [PATCH 21/21] ci: run scheduled packages build of release-55 --- .github/workflows/build_packages_cron.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_packages_cron.yaml b/.github/workflows/build_packages_cron.yaml index 784454c86..56d5c37f2 100644 --- a/.github/workflows/build_packages_cron.yaml +++ b/.github/workflows/build_packages_cron.yaml @@ -24,7 +24,7 @@ jobs: matrix: profile: - ['emqx', 'master', '5.3-2:1.15.7-26.2.1-2'] - - ['emqx-enterprise', 'release-54', '5.3-2:1.15.7-25.3.2-2'] + - ['emqx-enterprise', 'release-55', '5.3-2:1.15.7-25.3.2-2'] os: - debian10 - ubuntu22.04