fix(routesync): ensure causal relationships are preserved

At the cost of strict FIFO semantics though.
This commit is contained in:
Andrew Mayorov 2024-01-05 18:34:39 +01:00
parent f92b5b3f32
commit 5aeff20f8b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 211 additions and 103 deletions

View File

@ -42,7 +42,7 @@ init([]) ->
SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [
router_syncer_pool,
hash,
emqx:get_config([node, syncer_pool_size], emqx_vm:schedulers() * 2),
PoolSize,
{emqx_router_syncer, start_link, []}
]),

View File

@ -45,6 +45,11 @@
do_delete_route/2
]).
%% Mria Activity RPC targets
% -export([
% mria_insert_route/2
% ]).
-export([do_batch/1]).
-export([cleanup_routes/1]).
@ -236,9 +241,14 @@ mria_delete_route(v2, Topic, Dest, Ctx) ->
mria_delete_route(v1, Topic, Dest, Ctx) ->
mria_delete_route_v1(Topic, Dest, Ctx).
-spec do_batch(Batch) -> Errors when
%% Operation :: {add, ...} | {delete, ...}.
Batch :: #{Route => _Operation :: tuple()},
Errors :: #{Route => _Error},
Route :: {emqx_types:topic(), dest()}.
do_batch(Batch) ->
Nodes = batch_get_dest_nodes(Batch),
ok = lists:foreach(fun emqx_router_helper:monitor/1, Nodes),
ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)),
mria_batch(get_schema_vsn(), Batch).
mria_batch(v2, Batch) ->
@ -256,7 +266,7 @@ mria_batch_v1(Batch) ->
mria_batch_run(SchemaVsn, Batch) ->
maps:fold(
fun({Topic, Dest}, Op, Errors) ->
case mria_batch_operation(SchemaVsn, Op, Topic, Dest) of
case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of
ok ->
Errors;
Error ->
@ -274,16 +284,21 @@ mria_batch_operation(SchemaVsn, delete, Topic, Dest) ->
batch_get_dest_nodes(Batch) ->
maps:fold(
fun
({_Topic, Dest}, add, Acc) ->
ordsets:add_element(get_dest_node(Dest), Acc);
(_, delete, Acc) ->
Acc
fun({_Topic, Dest}, Op, Acc) ->
case batch_get_action(Op) of
add ->
ordsets:add_element(get_dest_node(Dest), Acc);
delete ->
Acc
end
end,
ordsets:new(),
Batch
).
batch_get_action(Op) ->
element(1, Op).
-spec select(Spec, _Limit :: pos_integer(), Continuation) ->
{[emqx_types:route()], Continuation} | '$end_of_table'
when

View File

@ -16,6 +16,8 @@
-module(emqx_router_syncer).
-include_lib("snabbkaffe/include/trace.hrl").
-behaviour(gen_server).
-export([start_link/2]).
@ -35,16 +37,27 @@
-define(POOL, router_syncer_pool).
-define(MAX_BATCH_SIZE, 4000).
-define(MAX_BATCH_SIZE, 1000).
-define(MIN_SYNC_INTERVAL, 1).
-define(HIGHEST_PRIO, 1).
-define(LOWEST_PRIO, 4).
-define(PRIO_HI, 1).
-define(PRIO_LO, 2).
-define(PRIO_BG, 3).
-define(PUSH(PRIO, OP), {PRIO, OP}).
-define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}).
-define(ROUTEOP(ACT), {ACT, _, _}).
-define(ROUTEOP(ACT, PRIO), {ACT, PRIO, _}).
-define(ROUTEOP(ACT, PRIO, CTX), {ACT, PRIO, CTX}).
-ifdef(TEST).
-undef(MAX_BATCH_SIZE).
-undef(MIN_SYNC_INTERVAL).
-define(MAX_BATCH_SIZE, 40).
-define(MIN_SYNC_INTERVAL, 10).
-endif.
%%
-spec start_link(atom(), pos_integer()) ->
@ -82,12 +95,12 @@ wait(MRef) ->
Result
end.
designate_prio(_, #{reply := true}) ->
?HIGHEST_PRIO;
designate_prio(_, #{reply := _To}) ->
?PRIO_HI;
designate_prio(add, #{}) ->
2;
?PRIO_LO;
designate_prio(delete, #{}) ->
3.
?PRIO_BG.
mk_push_context(#{reply := To}) ->
MRef = erlang:make_ref(),
@ -99,7 +112,7 @@ mk_push_context(_) ->
init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{queue => []}}.
{ok, #{stash => stash_new()}}.
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
@ -118,15 +131,24 @@ terminate(_Reason, _State) ->
%%
run_batch_loop(Incoming, State = #{queue := Queue}) ->
NQueue = queue_join(Queue, gather_operations(Incoming)),
{Batch, N, FQueue} = mk_batch(NQueue),
run_batch_loop(Incoming, State = #{stash := Stash0}) ->
Stash1 = stash_add(Incoming, Stash0),
Stash2 = stash_drain(Stash1),
{Batch, Stash3} = mk_batch(Stash2),
?tp_ignore_side_effects_in_prod(router_syncer_new_batch, #{
size => maps:size(Batch),
stashed => maps:size(Stash3),
n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Batch)),
n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Batch)),
prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Batch),
prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Batch)
}),
%% TODO: retry if error?
Errors = run_batch(Batch),
0 = send_replies(Errors, N, NQueue),
%% TODO: squash queue
NState = State#{queue := queue_fix(FQueue)},
case queue_empty(FQueue) of
ok = send_replies(Errors, Batch),
NState = State#{stash := Stash3},
%% TODO: postpone if only ?PRIO_BG operations left?
case stash_empty(Stash3) of
true ->
NState;
false ->
@ -135,56 +157,59 @@ run_batch_loop(Incoming, State = #{queue := Queue}) ->
%%
mk_batch(Queue) ->
mk_batch(Queue, 0, #{}).
mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE ->
%% This is perfect situation, we just use stash as batch w/o extra reallocations.
{Stash, stash_new()};
mk_batch(Stash) ->
%% Take a subset of stashed operations to form a batch.
%% Note that stash is an unordered map, it's not a queue. The order of operations is
%% not preserved strictly, only loosely, because of how we start from high priority
%% operations and go down to low priority ones. This might cause some operations to
%% stay in stash for unfairly long time, when there are many high priority operations.
%% However, it's unclear how likely this is to happen in practice.
mk_batch(Stash, ?MAX_BATCH_SIZE).
mk_batch(Queue, N, Batch) when map_size(Batch) =:= ?MAX_BATCH_SIZE ->
{Batch, N, Queue};
mk_batch([Op = ?OP(_, _, _, _) | Queue], N, Batch) ->
NBatch = batch_add_operation(Op, Batch),
mk_batch(Queue, N + 1, NBatch);
mk_batch([Run | Queue], N, Batch) when is_list(Run) ->
case mk_batch(Run, N, Batch) of
{NBatch, N1, []} ->
mk_batch(Queue, N1, NBatch);
{NBatch, N1, Left} ->
{NBatch, N1, [Left | Queue]}
mk_batch(Stash, BatchSize) ->
mk_batch(?PRIO_HI, #{}, BatchSize, Stash).
mk_batch(Prio, Batch, SizeLeft, Stash) ->
mk_batch(Prio, Batch, SizeLeft, Stash, maps:iterator(Stash)).
mk_batch(Prio, Batch, SizeLeft, Stash, It) when SizeLeft > 0 ->
%% Iterating over stash, only taking operations with priority equal to `Prio`.
case maps:next(It) of
{Route, Op = ?ROUTEOP(_Action, Prio), NIt} ->
NBatch = Batch#{Route => Op},
NStash = maps:remove(Route, Stash),
mk_batch(Prio, NBatch, SizeLeft - 1, NStash, NIt);
{_Route, _Op, NIt} ->
%% This is lower priority operation, skip it.
mk_batch(Prio, Batch, SizeLeft, Stash, NIt);
none ->
%% No more operations with priority `Prio`, go to the next priority level.
true = Prio < ?PRIO_BG,
mk_batch(Prio + 1, Batch, SizeLeft, Stash)
end;
mk_batch([], N, Batch) ->
{Batch, N, []}.
mk_batch(_Prio, Batch, _, Stash, _It) ->
{Batch, Stash}.
batch_add_operation(?OP(Action, Topic, Dest, _ReplyCtx), Batch) ->
case Batch of
#{{Topic, Dest} := Action} ->
Batch;
#{{Topic, Dest} := delete} when Action == add ->
Batch#{{Topic, Dest} := add};
#{{Topic, Dest} := add} when Action == delete ->
maps:remove({Topic, Dest}, Batch);
#{} ->
maps:put({Topic, Dest}, Action, Batch)
end.
send_replies(Errors, Batch) ->
maps:foreach(
fun(Route, {_Action, _Prio, Ctx}) ->
case Ctx of
[] ->
ok;
_ ->
replyctx_send(maps:get(Route, Errors, ok), Ctx)
end
end,
Batch
).
send_replies(_Result, 0, _Queue) ->
0;
send_replies(Result, N, [Op = ?OP(_, _, _, _) | Queue]) ->
_ = replyctx_send(Result, Op),
send_replies(Result, N - 1, Queue);
send_replies(Result, N, [Run | Queue]) when is_list(Run) ->
N1 = send_replies(Result, N, Run),
send_replies(Result, N1, Queue);
send_replies(_Result, N, []) ->
N.
replyctx_send(_Result, ?OP(_, _, _, [])) ->
replyctx_send(_Result, []) ->
noreply;
replyctx_send(Result, ?OP(_, Topic, Dest, {MRef, Pid})) ->
case Result of
#{{Topic, Dest} := Error} ->
Pid ! {MRef, Error};
#{} ->
Pid ! {MRef, ok}
end.
replyctx_send(Result, {MRef, Pid}) ->
Pid ! {MRef, Result}.
%%
@ -193,43 +218,111 @@ run_batch(Batch) ->
%%
queue_fix([]) ->
[];
queue_fix(Queue) when length(Queue) < ?LOWEST_PRIO ->
queue_fix([[] | Queue]);
queue_fix(Queue) ->
Queue.
stash_new() ->
#{}.
queue_join(Q1, []) ->
Q1;
queue_join([], Q2) ->
Q2;
queue_join(Q1, Q2) ->
lists:zipwith(fun join_list/2, Q1, Q2).
stash_empty(Stash) ->
maps:size(Stash) =:= 0.
join_list(L1, []) ->
L1;
join_list([], L2) ->
L2;
join_list(L1, L2) ->
[L1, L2].
queue_empty(Queue) ->
lists:all(fun(L) -> L == [] end, Queue).
gather_operations(Incoming) ->
[
pick_operations(Prio, Incoming) ++ drain_operations(Prio)
|| Prio <- lists:seq(?HIGHEST_PRIO, ?LOWEST_PRIO)
].
drain_operations(Prio) ->
stash_drain(Stash) ->
receive
{Prio, Op} ->
[Op | drain_operations(Prio)]
?PUSH(Prio, Op) ->
stash_drain(stash_add(Prio, Op, Stash))
after 0 ->
[]
Stash
end.
pick_operations(Prio, Incoming) ->
[Op || {P, Op} <- Incoming, P =:= Prio].
stash_add(Pushes, Stash) ->
lists:foldl(
fun(?PUSH(Prio, Op), QAcc) -> stash_add(Prio, Op, QAcc) end,
Stash,
Pushes
).
stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) ->
Route = {Topic, Dest},
case maps:get(Route, Stash, undefined) of
undefined ->
Stash#{Route => {Action, Prio, Ctx}};
RouteOp ->
case merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)) of
undefined ->
maps:remove(Route, Stash);
RouteOpMerged ->
Stash#{Route := RouteOpMerged}
end
end.
merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) ->
%% NOTE: This should not happen anyway.
_ = replyctx_send(ignored, Ctx1),
DestOp;
merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2)) ->
%% NOTE: Operations cancel each other.
_ = replyctx_send(ok, Ctx1),
_ = replyctx_send(ok, Ctx2),
undefined.
%%
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
batch_test() ->
Dest = node(),
Ctx = fun(N) -> {N, self()} end,
Stash = stash_add(
[
?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))),
?PUSH(?PRIO_HI, ?OP(add, <<"t/1">>, Dest, Ctx(2))),
?PUSH(?PRIO_LO, ?OP(add, <<"t/1">>, Dest, Ctx(3))),
?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(4))),
?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(5))),
?PUSH(?PRIO_HI, ?OP(add, <<"t/4">>, Dest, Ctx(6))),
?PUSH(?PRIO_LO, ?OP(delete, <<"t/3">>, Dest, Ctx(7))),
?PUSH(?PRIO_BG, ?OP(delete, <<"t/3">>, Dest, Ctx(8))),
?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(9))),
?PUSH(?PRIO_BG, ?OP(delete, <<"old/1">>, Dest, Ctx(10))),
?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(11))),
?PUSH(?PRIO_BG, ?OP(delete, <<"old/2">>, Dest, Ctx(12))),
?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(13))),
?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(14))),
?PUSH(?PRIO_LO, ?OP(delete, <<"old/3">>, Dest, Ctx(15))),
?PUSH(?PRIO_LO, ?OP(delete, <<"t/2">>, Dest, Ctx(16)))
],
stash_new()
),
{Batch, StashLeft} = mk_batch(Stash, 5),
?assertMatch(
#{
{<<"t/1">>, Dest} := {add, ?PRIO_LO, _},
{<<"t/3">>, Dest} := {add, ?PRIO_HI, _},
{<<"t/2">>, Dest} := {delete, ?PRIO_LO, _},
{<<"t/4">>, Dest} := {add, ?PRIO_HI, _},
{<<"old/3">>, Dest} := {delete, ?PRIO_LO, _}
},
Batch
),
?assertMatch(
#{
{<<"old/1">>, Dest} := {delete, ?PRIO_BG, _},
{<<"old/2">>, Dest} := {delete, ?PRIO_BG, _}
},
StashLeft
),
?assertEqual(
[
{2, ignored},
{1, ok},
{4, ok},
{5, ok},
{7, ok},
{9, ok},
{11, ok},
{8, ok},
{13, ok}
],
emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))
).
-endif.