fix(ds): Make async batches truly async

This commit is contained in:
ieQu1 2024-04-02 23:02:47 +02:00
parent 92ca90c0ca
commit 2bbfada7af
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 34 additions and 24 deletions

View File

@ -76,7 +76,7 @@ store_batch(DB, Messages, Opts) ->
case shards_of_batch(DB, Messages) of case shards_of_batch(DB, Messages) of
[{Shard, {NMsgs, NBytes}}] -> [{Shard, {NMsgs, NBytes}}] ->
%% Happy case: %% Happy case:
gen_server:call( enqueue_call_or_cast(
?via(DB, Shard), ?via(DB, Shard),
#enqueue_req{ #enqueue_req{
messages = Messages, messages = Messages,
@ -84,8 +84,7 @@ store_batch(DB, Messages, Opts) ->
atomic = Atomic, atomic = Atomic,
n_messages = NMsgs, n_messages = NMsgs,
payload_bytes = NBytes payload_bytes = NBytes
}, }
infinity
); );
[_, _ | _] when Atomic -> [_, _ | _] when Atomic ->
%% It's impossible to commit a batch to multiple shards %% It's impossible to commit a batch to multiple shards
@ -93,7 +92,7 @@ store_batch(DB, Messages, Opts) ->
{error, unrecoverable, atomic_commit_to_multiple_shards}; {error, unrecoverable, atomic_commit_to_multiple_shards};
_Shards -> _Shards ->
%% Use a slower implementation for the unlikely case: %% Use a slower implementation for the unlikely case:
repackage_messages(DB, Messages, Sync, Atomic) repackage_messages(DB, Messages, Sync)
end. end.
%%================================================================================ %%================================================================================
@ -127,15 +126,31 @@ init([DB, Shard]) ->
handle_call( handle_call(
#enqueue_req{ #enqueue_req{
messages = Msgs, sync = Sync, atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes messages = Msgs,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
payload_bytes = NBytes
}, },
From, From,
S S0 = #s{pending_replies = Replies0}
) -> ) ->
{noreply, enqueue(From, Sync, Atomic, Msgs, NMsgs, NBytes, S)}; S = S0#s{pending_replies = [From | Replies0]},
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
handle_cast(
#enqueue_req{
messages = Msgs,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
payload_bytes = NBytes
},
S
) ->
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
handle_cast(_Cast, S) -> handle_cast(_Cast, S) ->
{noreply, S}. {noreply, S}.
@ -156,13 +171,12 @@ terminate(_Reason, _S) ->
%%================================================================================ %%================================================================================
enqueue( enqueue(
From,
Sync, Sync,
Atomic, Atomic,
Msgs, Msgs,
BatchSize, BatchSize,
BatchBytes, BatchBytes,
S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0, pending_replies = Replies0} S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0}
) -> ) ->
%% At this point we don't split the batches, even when they aren't %% At this point we don't split the batches, even when they aren't
%% atomic. It wouldn't win us anything in terms of memory, and %% atomic. It wouldn't win us anything in terms of memory, and
@ -178,20 +192,12 @@ enqueue(
%% it now, and retry: %% it now, and retry:
cancel_timer(S0), cancel_timer(S0),
S1 = flush(S0), S1 = flush(S0),
enqueue(From, Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
false -> false ->
%% The buffer is empty, we enqueue the atomic batch in its %% The buffer is empty, we enqueue the atomic batch in its
%% entirety: %% entirety:
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
Replies = S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
case Sync of
true ->
[From | Replies0];
false ->
gen_server:reply(From, ok),
Replies0
end,
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, pending_replies = Replies},
case NMsgs >= NMax orelse NBytes >= NBytes of case NMsgs >= NMax orelse NBytes >= NBytes of
true -> true ->
cancel_timer(S1), cancel_timer(S1),
@ -295,7 +301,7 @@ shards_of_batch(DB, Messages) ->
) )
). ).
repackage_messages(DB, Messages, Sync, Atomic) -> repackage_messages(DB, Messages, Sync) ->
Batches = lists:foldl( Batches = lists:foldl(
fun(Message, Acc) -> fun(Message, Acc) ->
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
@ -314,16 +320,15 @@ repackage_messages(DB, Messages, Sync, Atomic) ->
), ),
maps:fold( maps:fold(
fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) ->
Err = gen_server:call( Err = enqueue_call_or_cast(
?via(DB, Shard), ?via(DB, Shard),
#enqueue_req{ #enqueue_req{
messages = lists:reverse(RevMessages), messages = lists:reverse(RevMessages),
sync = Sync, sync = Sync,
atomic = Atomic, atomic = false,
n_messages = NMsgs, n_messages = NMsgs,
payload_bytes = ByteSize payload_bytes = ByteSize
}, }
infinity
), ),
compose_errors(ErrAcc, Err) compose_errors(ErrAcc, Err)
end, end,
@ -331,6 +336,11 @@ repackage_messages(DB, Messages, Sync, Atomic) ->
Batches Batches
). ).
enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) ->
gen_server:call(To, Req, infinity);
enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) ->
gen_server:cast(To, Req).
compose_errors(ErrAcc, ok) -> compose_errors(ErrAcc, ok) ->
ErrAcc; ErrAcc;
compose_errors(ok, Err) -> compose_errors(ok, Err) ->