diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 856a3f7ae..3ca152749 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -167,7 +167,13 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% `unsubscribe` codepath. So we have to pick a worker according to the topic, %% but not shard. If there are topics with high number of shards, then the %% load across the pool will be unbalanced. - call(pick(Topic), {subscribe, Topic, SubPid, I}); + Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}), + case Sync of + ok -> + ok; + Ref when is_reference(Ref) -> + emqx_router_syncer:wait(Ref) + end; do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) -> @@ -491,8 +497,8 @@ safe_update_stats(Tab, Stat, MaxStat) -> call(Broker, Req) -> gen_server:call(Broker, Req, infinity). -cast(Broker, Msg) -> - gen_server:cast(Broker, Msg). +cast(Broker, Req) -> + gen_server:cast(Broker, Req). %% Pick a broker pick(Topic) -> @@ -506,18 +512,18 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({subscribe, Topic, SubPid, 0}, _From, State) -> +handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) -> Existed = ets:member(?SUBSCRIBER, Topic), true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - Result = maybe_add_route(Existed, Topic), + Result = maybe_add_route(Existed, Topic, From), {reply, Result, State}; -handle_call({subscribe, Topic, SubPid, I}, _From, State) -> +handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) -> Existed = ets:member(?SUBSCRIBER, Topic), true = ets:insert(?SUBSCRIBER, [ {Topic, {shard, I}}, {{shard, Topic, I}, SubPid} ]), - Result = maybe_add_route(Existed, Topic), + Result = maybe_add_route(Existed, Topic, From), {reply, Result, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -597,12 +603,12 @@ do_dispatch({shard, I}, Topic, Msg) -> %% -maybe_add_route(_Existed = false, Topic) -> - emqx_router:do_add_route(Topic); -maybe_add_route(_Existed = true, _Topic) -> +maybe_add_route(_Existed = false, Topic, ReplyTo) -> + emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo}); +maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. maybe_delete_route(_Exists = false, Topic) -> - emqx_router:do_delete_route(Topic); + emqx_router_syncer:push(delete, Topic, node(), #{}); maybe_delete_route(_Exists = true, _Topic) -> ok. diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl index 74baf5674..d05cb7718 100644 --- a/apps/emqx/src/emqx_broker_sup.erl +++ b/apps/emqx/src/emqx_broker_sup.erl @@ -32,13 +32,20 @@ start_link() -> init([]) -> %% Broker pool PoolSize = emqx:get_config([node, broker_pool_size], emqx_vm:schedulers() * 2), - BrokerPool = emqx_pool_sup:spec([ + BrokerPool = emqx_pool_sup:spec(broker_pool_sup, [ broker_pool, hash, PoolSize, {emqx_broker, start_link, []} ]), + SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [ + router_syncer_pool, + hash, + emqx:get_config([node, syncer_pool_size], emqx_vm:schedulers() * 2), + {emqx_router_syncer, start_link, []} + ]), + %% Shared subscription SharedSub = #{ id => shared_sub, @@ -59,4 +66,4 @@ init([]) -> modules => [emqx_broker_helper] }, - {ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}. + {ok, {{one_for_all, 0, 1}, [BrokerPool, SyncerPool, SharedSub, Helper]}}. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 13efbe4ea..a10fde1cc 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -45,6 +45,8 @@ do_delete_route/2 ]). +-export([do_batch/1]). + -export([cleanup_routes/1]). -export([ @@ -86,6 +88,8 @@ deinit_schema/0 ]). +-export_type([dest/0]). + -type group() :: binary(). -type dest() :: node() | {group(), node()}. @@ -173,12 +177,12 @@ do_add_route(Topic) when is_binary(Topic) -> -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, Dest) when is_binary(Topic) -> ok = emqx_router_helper:monitor(Dest), - mria_insert_route(get_schema_vsn(), Topic, Dest). + mria_insert_route(get_schema_vsn(), Topic, Dest, single). -mria_insert_route(v2, Topic, Dest) -> - mria_insert_route_v2(Topic, Dest); -mria_insert_route(v1, Topic, Dest) -> - mria_insert_route_v1(Topic, Dest). +mria_insert_route(v2, Topic, Dest, Ctx) -> + mria_insert_route_v2(Topic, Dest, Ctx); +mria_insert_route(v1, Topic, Dest, Ctx) -> + mria_insert_route_v1(Topic, Dest, Ctx). %% @doc Take a real topic (not filter) as input, return the matching topics and topic %% filters associated with route destination. @@ -225,12 +229,60 @@ do_delete_route(Topic) when is_binary(Topic) -> -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, Dest) -> - mria_delete_route(get_schema_vsn(), Topic, Dest). + mria_delete_route(get_schema_vsn(), Topic, Dest, single). -mria_delete_route(v2, Topic, Dest) -> - mria_delete_route_v2(Topic, Dest); -mria_delete_route(v1, Topic, Dest) -> - mria_delete_route_v1(Topic, Dest). +mria_delete_route(v2, Topic, Dest, Ctx) -> + mria_delete_route_v2(Topic, Dest, Ctx); +mria_delete_route(v1, Topic, Dest, Ctx) -> + mria_delete_route_v1(Topic, Dest, Ctx). + +do_batch(Batch) -> + Nodes = batch_get_dest_nodes(Batch), + ok = lists:foreach(fun emqx_router_helper:monitor/1, Nodes), + mria_batch(get_schema_vsn(), Batch). + +mria_batch(v2, Batch) -> + mria_batch_v2(Batch); +mria_batch(v1, Batch) -> + mria_batch_v1(Batch). + +mria_batch_v2(Batch) -> + mria:async_dirty(?ROUTE_SHARD, fun mria_batch_run/2, [v2, Batch]). + +mria_batch_v1(Batch) -> + {atomic, Res} = mria:transaction(?ROUTE_SHARD, fun mria_batch_run/2, [v1, Batch]), + Res. + +mria_batch_run(SchemaVsn, Batch) -> + maps:fold( + fun({Topic, Dest}, Op, Errors) -> + case mria_batch_operation(SchemaVsn, Op, Topic, Dest) of + ok -> + Errors; + Error -> + Errors#{{Topic, Dest} => Error} + end + end, + #{}, + Batch + ). + +mria_batch_operation(SchemaVsn, add, Topic, Dest) -> + mria_insert_route(SchemaVsn, Topic, Dest, batch); +mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> + mria_delete_route(SchemaVsn, Topic, Dest, batch). + +batch_get_dest_nodes(Batch) -> + maps:fold( + fun + ({_Topic, Dest}, add, Acc) -> + ordsets:add_element(get_dest_node(Dest), Acc); + (_, delete, Acc) -> + Acc + end, + ordsets:new(), + Batch + ). -spec select(Spec, _Limit :: pos_integer(), Continuation) -> {[emqx_types:route()], Continuation} | '$end_of_table' @@ -305,43 +357,51 @@ pick(Topic) -> %% Schema v1 %% -------------------------------------------------------------------- -mria_insert_route_v1(Topic, Dest) -> +mria_insert_route_v1(Topic, Dest, Ctx) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - mria_route_tab_insert_update_trie(Route); + mria_route_tab_insert_update_trie(Route, Ctx); false -> - mria_route_tab_insert(Route) + mria_route_tab_insert(Route, Ctx) end. -mria_route_tab_insert_update_trie(Route) -> +mria_route_tab_insert_update_trie(Route, single) -> emqx_router_utils:maybe_trans( fun emqx_router_utils:insert_trie_route/2, [?ROUTE_TAB, Route], ?ROUTE_SHARD - ). + ); +mria_route_tab_insert_update_trie(Route, batch) -> + emqx_router_utils:insert_trie_route(?ROUTE_TAB, Route). -mria_route_tab_insert(Route) -> - mria:dirty_write(?ROUTE_TAB, Route). +mria_route_tab_insert(Route, single) -> + mria:dirty_write(?ROUTE_TAB, Route); +mria_route_tab_insert(Route, batch) -> + mnesia:write(?ROUTE_TAB, Route, write). -mria_delete_route_v1(Topic, Dest) -> +mria_delete_route_v1(Topic, Dest, Ctx) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - mria_route_tab_delete_update_trie(Route); + mria_route_tab_delete_update_trie(Route, Ctx); false -> - mria_route_tab_delete(Route) + mria_route_tab_delete(Route, Ctx) end. -mria_route_tab_delete_update_trie(Route) -> +mria_route_tab_delete_update_trie(Route, single) -> emqx_router_utils:maybe_trans( fun emqx_router_utils:delete_trie_route/2, [?ROUTE_TAB, Route], ?ROUTE_SHARD - ). + ); +mria_route_tab_delete_update_trie(Route, batch) -> + emqx_router_utils:delete_trie_route(?ROUTE_TAB, Route). -mria_route_tab_delete(Route) -> - mria:dirty_delete_object(?ROUTE_TAB, Route). +mria_route_tab_delete(Route, single) -> + mria:dirty_delete_object(?ROUTE_TAB, Route); +mria_route_tab_delete(Route, batch) -> + mnesia:delete_object(?ROUTE_TAB, Route, write). match_routes_v1(Topic) -> lookup_route_tab(Topic) ++ @@ -410,24 +470,34 @@ fold_routes_v1(FunName, FoldFun, AccIn) -> %% topics. Writes go to only one of the two tables at a time. %% -------------------------------------------------------------------- -mria_insert_route_v2(Topic, Dest) -> +mria_insert_route_v2(Topic, Dest, Ctx) -> case emqx_trie_search:filter(Topic) of Words when is_list(Words) -> K = emqx_topic_index:make_key(Words, Dest), - mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}); + mria_filter_tab_insert(K, Ctx); false -> - mria_route_tab_insert(#route{topic = Topic, dest = Dest}) + mria_route_tab_insert(#route{topic = Topic, dest = Dest}, Ctx) end. -mria_delete_route_v2(Topic, Dest) -> +mria_filter_tab_insert(K, single) -> + mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}); +mria_filter_tab_insert(K, batch) -> + mnesia:write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}, write). + +mria_delete_route_v2(Topic, Dest, Ctx) -> case emqx_trie_search:filter(Topic) of Words when is_list(Words) -> K = emqx_topic_index:make_key(Words, Dest), - mria:dirty_delete(?ROUTE_TAB_FILTERS, K); + mria_filter_tab_delete(K, Ctx); false -> - mria_route_tab_delete(#route{topic = Topic, dest = Dest}) + mria_route_tab_delete(#route{topic = Topic, dest = Dest}, Ctx) end. +mria_filter_tab_delete(K, single) -> + mria:dirty_delete(?ROUTE_TAB_FILTERS, K); +mria_filter_tab_delete(K, batch) -> + mnesia:delete(?ROUTE_TAB_FILTERS, K, write). + match_routes_v2(Topic) -> lookup_route_tab(Topic) ++ [match_to_route(M) || M <- match_filters(Topic)]. diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl new file mode 100644 index 000000000..4ffb724df --- /dev/null +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -0,0 +1,235 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_syncer). + +-behaviour(gen_server). + +-export([start_link/2]). + +-export([push/4]). +-export([wait/1]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-type action() :: add | delete. + +-define(POOL, router_syncer_pool). + +-define(MAX_BATCH_SIZE, 4000). +-define(MIN_SYNC_INTERVAL, 1). + +-define(HIGHEST_PRIO, 1). +-define(LOWEST_PRIO, 4). + +-define(PUSH(PRIO, OP), {PRIO, OP}). + +-define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}). + +%% + +-spec start_link(atom(), pos_integer()) -> + {ok, pid()}. +start_link(Pool, Id) -> + gen_server:start_link( + {local, emqx_utils:proc_name(?MODULE, Id)}, + ?MODULE, + [Pool, Id], + [] + ). + +-spec push(action(), emqx_types:topic(), emqx_router:dest(), Opts) -> + ok | _WaitRef :: reference() +when + Opts :: #{reply => pid()}. +push(Action, Topic, Dest, Opts) -> + Worker = gproc_pool:pick_worker(?POOL, Topic), + Prio = designate_prio(Action, Opts), + Context = mk_push_context(Opts), + Worker ! ?PUSH(Prio, {Action, Topic, Dest, Context}), + case Context of + {MRef, _} -> + MRef; + [] -> + ok + end. + +-spec wait(_WaitRef :: reference()) -> + ok | {error, _Reason}. +wait(MRef) -> + %% FIXME: timeouts + receive + {MRef, Result} -> + Result + end. + +designate_prio(_, #{reply := true}) -> + ?HIGHEST_PRIO; +designate_prio(add, #{}) -> + 2; +designate_prio(delete, #{}) -> + 3. + +mk_push_context(#{reply := To}) -> + MRef = erlang:make_ref(), + {MRef, To}; +mk_push_context(_) -> + []. + +%% + +init([Pool, Id]) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #{queue => []}}. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(Push = ?PUSH(_, _), State) -> + %% NOTE: Wait a bit to collect potentially overlapping operations. + ok = timer:sleep(?MIN_SYNC_INTERVAL), + NState = run_batch_loop([Push], State), + {noreply, NState}. + +terminate(_Reason, _State) -> + ok. + +%% + +run_batch_loop(Incoming, State = #{queue := Queue}) -> + NQueue = queue_join(Queue, gather_operations(Incoming)), + {Batch, N, FQueue} = mk_batch(NQueue), + %% TODO: retry if error? + Errors = run_batch(Batch), + 0 = send_replies(Errors, N, NQueue), + %% TODO: squash queue + NState = State#{queue := queue_fix(FQueue)}, + case queue_empty(FQueue) of + true -> + NState; + false -> + run_batch_loop([], NState) + end. + +%% + +mk_batch(Queue) -> + mk_batch(Queue, 0, #{}). + +mk_batch(Queue, N, Batch) when map_size(Batch) =:= ?MAX_BATCH_SIZE -> + {Batch, N, Queue}; +mk_batch([Op = ?OP(_, _, _, _) | Queue], N, Batch) -> + NBatch = batch_add_operation(Op, Batch), + mk_batch(Queue, N + 1, NBatch); +mk_batch([Run | Queue], N, Batch) when is_list(Run) -> + case mk_batch(Run, N, Batch) of + {NBatch, N1, []} -> + mk_batch(Queue, N1, NBatch); + {NBatch, N1, Left} -> + {NBatch, N1, [Left | Queue]} + end; +mk_batch([], N, Batch) -> + {Batch, N, []}. + +batch_add_operation(?OP(Action, Topic, Dest, _ReplyCtx), Batch) -> + case Batch of + #{{Topic, Dest} := Action} -> + Batch; + #{{Topic, Dest} := delete} when Action == add -> + Batch#{{Topic, Dest} := add}; + #{{Topic, Dest} := add} when Action == delete -> + maps:remove({Topic, Dest}, Batch); + #{} -> + maps:put({Topic, Dest}, Action, Batch) + end. + +send_replies(_Result, 0, _Queue) -> + 0; +send_replies(Result, N, [Op = ?OP(_, _, _, _) | Queue]) -> + _ = replyctx_send(Result, Op), + send_replies(Result, N - 1, Queue); +send_replies(Result, N, [Run | Queue]) when is_list(Run) -> + N1 = send_replies(Result, N, Run), + send_replies(Result, N1, Queue); +send_replies(_Result, N, []) -> + N. + +replyctx_send(_Result, ?OP(_, _, _, [])) -> + noreply; +replyctx_send(Result, ?OP(_, Topic, Dest, {MRef, Pid})) -> + case Result of + #{{Topic, Dest} := Error} -> + Pid ! {MRef, Error}; + #{} -> + Pid ! {MRef, ok} + end. + +%% + +run_batch(Batch) -> + emqx_router:do_batch(Batch). + +%% + +queue_fix([]) -> + []; +queue_fix(Queue) when length(Queue) < ?LOWEST_PRIO -> + queue_fix([[] | Queue]); +queue_fix(Queue) -> + Queue. + +queue_join(Q1, []) -> + Q1; +queue_join([], Q2) -> + Q2; +queue_join(Q1, Q2) -> + lists:zipwith(fun join_list/2, Q1, Q2). + +join_list(L1, []) -> + L1; +join_list([], L2) -> + L2; +join_list(L1, L2) -> + [L1, L2]. + +queue_empty(Queue) -> + lists:all(fun(L) -> L == [] end, Queue). + +gather_operations(Incoming) -> + [ + pick_operations(Prio, Incoming) ++ drain_operations(Prio) + || Prio <- lists:seq(?HIGHEST_PRIO, ?LOWEST_PRIO) + ]. + +drain_operations(Prio) -> + receive + {Prio, Op} -> + [Op | drain_operations(Prio)] + after 0 -> + [] + end. + +pick_operations(Prio, Incoming) -> + [Op || {P, Op} <- Incoming, P =:= Prio].