diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index eb4b1fc70..f328c7a99 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -76,7 +76,7 @@ store_batch(DB, Messages, Opts) -> case shards_of_batch(DB, Messages) of [{Shard, {NMsgs, NBytes}}] -> %% Happy case: - gen_server:call( + enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ messages = Messages, @@ -84,8 +84,7 @@ store_batch(DB, Messages, Opts) -> atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes - }, - infinity + } ); [_, _ | _] when Atomic -> %% It's impossible to commit a batch to multiple shards @@ -93,7 +92,7 @@ store_batch(DB, Messages, Opts) -> {error, unrecoverable, atomic_commit_to_multiple_shards}; _Shards -> %% Use a slower implementation for the unlikely case: - repackage_messages(DB, Messages, Sync, Atomic) + repackage_messages(DB, Messages, Sync) end. %%================================================================================ @@ -127,15 +126,31 @@ init([DB, Shard]) -> handle_call( #enqueue_req{ - messages = Msgs, sync = Sync, atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes }, From, - S + S0 = #s{pending_replies = Replies0} ) -> - {noreply, enqueue(From, Sync, Atomic, Msgs, NMsgs, NBytes, S)}; + S = S0#s{pending_replies = [From | Replies0]}, + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. +handle_cast( + #enqueue_req{ + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + S +) -> + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_cast(_Cast, S) -> {noreply, S}. @@ -156,13 +171,12 @@ terminate(_Reason, _S) -> %%================================================================================ enqueue( - From, Sync, Atomic, Msgs, BatchSize, BatchBytes, - S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0, pending_replies = Replies0} + S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} ) -> %% At this point we don't split the batches, even when they aren't %% atomic. It wouldn't win us anything in terms of memory, and @@ -178,20 +192,12 @@ enqueue( %% it now, and retry: cancel_timer(S0), S1 = flush(S0), - enqueue(From, Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); false -> %% The buffer is empty, we enqueue the atomic batch in its %% entirety: Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), - Replies = - case Sync of - true -> - [From | Replies0]; - false -> - gen_server:reply(From, ok), - Replies0 - end, - S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, pending_replies = Replies}, + S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, case NMsgs >= NMax orelse NBytes >= NBytes of true -> cancel_timer(S1), @@ -295,7 +301,7 @@ shards_of_batch(DB, Messages) -> ) ). -repackage_messages(DB, Messages, Sync, Atomic) -> +repackage_messages(DB, Messages, Sync) -> Batches = lists:foldl( fun(Message, Acc) -> Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), @@ -314,16 +320,15 @@ repackage_messages(DB, Messages, Sync, Atomic) -> ), maps:fold( fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> - Err = gen_server:call( + Err = enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ messages = lists:reverse(RevMessages), sync = Sync, - atomic = Atomic, + atomic = false, n_messages = NMsgs, payload_bytes = ByteSize - }, - infinity + } ), compose_errors(ErrAcc, Err) end, @@ -331,6 +336,11 @@ repackage_messages(DB, Messages, Sync, Atomic) -> Batches ). +enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) -> + gen_server:call(To, Req, infinity); +enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) -> + gen_server:cast(To, Req). + compose_errors(ErrAcc, ok) -> ErrAcc; compose_errors(ok, Err) ->