diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl index d05cb7718..e6d93d92f 100644 --- a/apps/emqx/src/emqx_broker_sup.erl +++ b/apps/emqx/src/emqx_broker_sup.erl @@ -42,7 +42,7 @@ init([]) -> SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [ router_syncer_pool, hash, - emqx:get_config([node, syncer_pool_size], emqx_vm:schedulers() * 2), + PoolSize, {emqx_router_syncer, start_link, []} ]), diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index a10fde1cc..54667065a 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -45,6 +45,11 @@ do_delete_route/2 ]). +%% Mria Activity RPC targets +% -export([ +% mria_insert_route/2 +% ]). + -export([do_batch/1]). -export([cleanup_routes/1]). @@ -236,9 +241,14 @@ mria_delete_route(v2, Topic, Dest, Ctx) -> mria_delete_route(v1, Topic, Dest, Ctx) -> mria_delete_route_v1(Topic, Dest, Ctx). +-spec do_batch(Batch) -> Errors when + %% Operation :: {add, ...} | {delete, ...}. + Batch :: #{Route => _Operation :: tuple()}, + Errors :: #{Route => _Error}, + Route :: {emqx_types:topic(), dest()}. do_batch(Batch) -> Nodes = batch_get_dest_nodes(Batch), - ok = lists:foreach(fun emqx_router_helper:monitor/1, Nodes), + ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)), mria_batch(get_schema_vsn(), Batch). mria_batch(v2, Batch) -> @@ -256,7 +266,7 @@ mria_batch_v1(Batch) -> mria_batch_run(SchemaVsn, Batch) -> maps:fold( fun({Topic, Dest}, Op, Errors) -> - case mria_batch_operation(SchemaVsn, Op, Topic, Dest) of + case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of ok -> Errors; Error -> @@ -274,16 +284,21 @@ mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> batch_get_dest_nodes(Batch) -> maps:fold( - fun - ({_Topic, Dest}, add, Acc) -> - ordsets:add_element(get_dest_node(Dest), Acc); - (_, delete, Acc) -> - Acc + fun({_Topic, Dest}, Op, Acc) -> + case batch_get_action(Op) of + add -> + ordsets:add_element(get_dest_node(Dest), Acc); + delete -> + Acc + end end, ordsets:new(), Batch ). +batch_get_action(Op) -> + element(1, Op). + -spec select(Spec, _Limit :: pos_integer(), Continuation) -> {[emqx_types:route()], Continuation} | '$end_of_table' when diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index 4ffb724df..bdbeffb4b 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -16,6 +16,8 @@ -module(emqx_router_syncer). +-include_lib("snabbkaffe/include/trace.hrl"). + -behaviour(gen_server). -export([start_link/2]). @@ -35,16 +37,27 @@ -define(POOL, router_syncer_pool). --define(MAX_BATCH_SIZE, 4000). +-define(MAX_BATCH_SIZE, 1000). -define(MIN_SYNC_INTERVAL, 1). --define(HIGHEST_PRIO, 1). --define(LOWEST_PRIO, 4). +-define(PRIO_HI, 1). +-define(PRIO_LO, 2). +-define(PRIO_BG, 3). -define(PUSH(PRIO, OP), {PRIO, OP}). - -define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}). +-define(ROUTEOP(ACT), {ACT, _, _}). +-define(ROUTEOP(ACT, PRIO), {ACT, PRIO, _}). +-define(ROUTEOP(ACT, PRIO, CTX), {ACT, PRIO, CTX}). + +-ifdef(TEST). +-undef(MAX_BATCH_SIZE). +-undef(MIN_SYNC_INTERVAL). +-define(MAX_BATCH_SIZE, 40). +-define(MIN_SYNC_INTERVAL, 10). +-endif. + %% -spec start_link(atom(), pos_integer()) -> @@ -82,12 +95,12 @@ wait(MRef) -> Result end. -designate_prio(_, #{reply := true}) -> - ?HIGHEST_PRIO; +designate_prio(_, #{reply := _To}) -> + ?PRIO_HI; designate_prio(add, #{}) -> - 2; + ?PRIO_LO; designate_prio(delete, #{}) -> - 3. + ?PRIO_BG. mk_push_context(#{reply := To}) -> MRef = erlang:make_ref(), @@ -99,7 +112,7 @@ mk_push_context(_) -> init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #{queue => []}}. + {ok, #{stash => stash_new()}}. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -118,15 +131,24 @@ terminate(_Reason, _State) -> %% -run_batch_loop(Incoming, State = #{queue := Queue}) -> - NQueue = queue_join(Queue, gather_operations(Incoming)), - {Batch, N, FQueue} = mk_batch(NQueue), +run_batch_loop(Incoming, State = #{stash := Stash0}) -> + Stash1 = stash_add(Incoming, Stash0), + Stash2 = stash_drain(Stash1), + {Batch, Stash3} = mk_batch(Stash2), + ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, #{ + size => maps:size(Batch), + stashed => maps:size(Stash3), + n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Batch)), + n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Batch)), + prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Batch), + prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Batch) + }), %% TODO: retry if error? Errors = run_batch(Batch), - 0 = send_replies(Errors, N, NQueue), - %% TODO: squash queue - NState = State#{queue := queue_fix(FQueue)}, - case queue_empty(FQueue) of + ok = send_replies(Errors, Batch), + NState = State#{stash := Stash3}, + %% TODO: postpone if only ?PRIO_BG operations left? + case stash_empty(Stash3) of true -> NState; false -> @@ -135,56 +157,59 @@ run_batch_loop(Incoming, State = #{queue := Queue}) -> %% -mk_batch(Queue) -> - mk_batch(Queue, 0, #{}). +mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE -> + %% This is perfect situation, we just use stash as batch w/o extra reallocations. + {Stash, stash_new()}; +mk_batch(Stash) -> + %% Take a subset of stashed operations to form a batch. + %% Note that stash is an unordered map, it's not a queue. The order of operations is + %% not preserved strictly, only loosely, because of how we start from high priority + %% operations and go down to low priority ones. This might cause some operations to + %% stay in stash for unfairly long time, when there are many high priority operations. + %% However, it's unclear how likely this is to happen in practice. + mk_batch(Stash, ?MAX_BATCH_SIZE). -mk_batch(Queue, N, Batch) when map_size(Batch) =:= ?MAX_BATCH_SIZE -> - {Batch, N, Queue}; -mk_batch([Op = ?OP(_, _, _, _) | Queue], N, Batch) -> - NBatch = batch_add_operation(Op, Batch), - mk_batch(Queue, N + 1, NBatch); -mk_batch([Run | Queue], N, Batch) when is_list(Run) -> - case mk_batch(Run, N, Batch) of - {NBatch, N1, []} -> - mk_batch(Queue, N1, NBatch); - {NBatch, N1, Left} -> - {NBatch, N1, [Left | Queue]} +mk_batch(Stash, BatchSize) -> + mk_batch(?PRIO_HI, #{}, BatchSize, Stash). + +mk_batch(Prio, Batch, SizeLeft, Stash) -> + mk_batch(Prio, Batch, SizeLeft, Stash, maps:iterator(Stash)). + +mk_batch(Prio, Batch, SizeLeft, Stash, It) when SizeLeft > 0 -> + %% Iterating over stash, only taking operations with priority equal to `Prio`. + case maps:next(It) of + {Route, Op = ?ROUTEOP(_Action, Prio), NIt} -> + NBatch = Batch#{Route => Op}, + NStash = maps:remove(Route, Stash), + mk_batch(Prio, NBatch, SizeLeft - 1, NStash, NIt); + {_Route, _Op, NIt} -> + %% This is lower priority operation, skip it. + mk_batch(Prio, Batch, SizeLeft, Stash, NIt); + none -> + %% No more operations with priority `Prio`, go to the next priority level. + true = Prio < ?PRIO_BG, + mk_batch(Prio + 1, Batch, SizeLeft, Stash) end; -mk_batch([], N, Batch) -> - {Batch, N, []}. +mk_batch(_Prio, Batch, _, Stash, _It) -> + {Batch, Stash}. -batch_add_operation(?OP(Action, Topic, Dest, _ReplyCtx), Batch) -> - case Batch of - #{{Topic, Dest} := Action} -> - Batch; - #{{Topic, Dest} := delete} when Action == add -> - Batch#{{Topic, Dest} := add}; - #{{Topic, Dest} := add} when Action == delete -> - maps:remove({Topic, Dest}, Batch); - #{} -> - maps:put({Topic, Dest}, Action, Batch) - end. +send_replies(Errors, Batch) -> + maps:foreach( + fun(Route, {_Action, _Prio, Ctx}) -> + case Ctx of + [] -> + ok; + _ -> + replyctx_send(maps:get(Route, Errors, ok), Ctx) + end + end, + Batch + ). -send_replies(_Result, 0, _Queue) -> - 0; -send_replies(Result, N, [Op = ?OP(_, _, _, _) | Queue]) -> - _ = replyctx_send(Result, Op), - send_replies(Result, N - 1, Queue); -send_replies(Result, N, [Run | Queue]) when is_list(Run) -> - N1 = send_replies(Result, N, Run), - send_replies(Result, N1, Queue); -send_replies(_Result, N, []) -> - N. - -replyctx_send(_Result, ?OP(_, _, _, [])) -> +replyctx_send(_Result, []) -> noreply; -replyctx_send(Result, ?OP(_, Topic, Dest, {MRef, Pid})) -> - case Result of - #{{Topic, Dest} := Error} -> - Pid ! {MRef, Error}; - #{} -> - Pid ! {MRef, ok} - end. +replyctx_send(Result, {MRef, Pid}) -> + Pid ! {MRef, Result}. %% @@ -193,43 +218,111 @@ run_batch(Batch) -> %% -queue_fix([]) -> - []; -queue_fix(Queue) when length(Queue) < ?LOWEST_PRIO -> - queue_fix([[] | Queue]); -queue_fix(Queue) -> - Queue. +stash_new() -> + #{}. -queue_join(Q1, []) -> - Q1; -queue_join([], Q2) -> - Q2; -queue_join(Q1, Q2) -> - lists:zipwith(fun join_list/2, Q1, Q2). +stash_empty(Stash) -> + maps:size(Stash) =:= 0. -join_list(L1, []) -> - L1; -join_list([], L2) -> - L2; -join_list(L1, L2) -> - [L1, L2]. - -queue_empty(Queue) -> - lists:all(fun(L) -> L == [] end, Queue). - -gather_operations(Incoming) -> - [ - pick_operations(Prio, Incoming) ++ drain_operations(Prio) - || Prio <- lists:seq(?HIGHEST_PRIO, ?LOWEST_PRIO) - ]. - -drain_operations(Prio) -> +stash_drain(Stash) -> receive - {Prio, Op} -> - [Op | drain_operations(Prio)] + ?PUSH(Prio, Op) -> + stash_drain(stash_add(Prio, Op, Stash)) after 0 -> - [] + Stash end. -pick_operations(Prio, Incoming) -> - [Op || {P, Op} <- Incoming, P =:= Prio]. +stash_add(Pushes, Stash) -> + lists:foldl( + fun(?PUSH(Prio, Op), QAcc) -> stash_add(Prio, Op, QAcc) end, + Stash, + Pushes + ). + +stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) -> + Route = {Topic, Dest}, + case maps:get(Route, Stash, undefined) of + undefined -> + Stash#{Route => {Action, Prio, Ctx}}; + RouteOp -> + case merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)) of + undefined -> + maps:remove(Route, Stash); + RouteOpMerged -> + Stash#{Route := RouteOpMerged} + end + end. + +merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) -> + %% NOTE: This should not happen anyway. + _ = replyctx_send(ignored, Ctx1), + DestOp; +merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2)) -> + %% NOTE: Operations cancel each other. + _ = replyctx_send(ok, Ctx1), + _ = replyctx_send(ok, Ctx2), + undefined. + +%% + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +batch_test() -> + Dest = node(), + Ctx = fun(N) -> {N, self()} end, + Stash = stash_add( + [ + ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/1">>, Dest, Ctx(2))), + ?PUSH(?PRIO_LO, ?OP(add, <<"t/1">>, Dest, Ctx(3))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(4))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(5))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/4">>, Dest, Ctx(6))), + ?PUSH(?PRIO_LO, ?OP(delete, <<"t/3">>, Dest, Ctx(7))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"t/3">>, Dest, Ctx(8))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(9))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"old/1">>, Dest, Ctx(10))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(11))), + ?PUSH(?PRIO_BG, ?OP(delete, <<"old/2">>, Dest, Ctx(12))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(13))), + ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(14))), + ?PUSH(?PRIO_LO, ?OP(delete, <<"old/3">>, Dest, Ctx(15))), + ?PUSH(?PRIO_LO, ?OP(delete, <<"t/2">>, Dest, Ctx(16))) + ], + stash_new() + ), + {Batch, StashLeft} = mk_batch(Stash, 5), + ?assertMatch( + #{ + {<<"t/1">>, Dest} := {add, ?PRIO_LO, _}, + {<<"t/3">>, Dest} := {add, ?PRIO_HI, _}, + {<<"t/2">>, Dest} := {delete, ?PRIO_LO, _}, + {<<"t/4">>, Dest} := {add, ?PRIO_HI, _}, + {<<"old/3">>, Dest} := {delete, ?PRIO_LO, _} + }, + Batch + ), + ?assertMatch( + #{ + {<<"old/1">>, Dest} := {delete, ?PRIO_BG, _}, + {<<"old/2">>, Dest} := {delete, ?PRIO_BG, _} + }, + StashLeft + ), + ?assertEqual( + [ + {2, ignored}, + {1, ok}, + {4, ok}, + {5, ok}, + {7, ok}, + {9, ok}, + {11, ok}, + {8, ok}, + {13, ok} + ], + emqx_utils_stream:consume(emqx_utils_stream:mqueue(0)) + ). + +-endif.