From 70737a437ac473f1c9f6146848c80d0487cd411f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 21 Mar 2024 14:39:21 -0300 Subject: [PATCH] fix(ds): add caller to pending replies before flushing --- .../src/emqx_ds_replication_layer_egress.erl | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 6d879dbb6..b1a1c0cb4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -193,14 +193,6 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies Msg -> S0#s{n = N + 1, batch = [Msg | Batch]} end, - S2 = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S0#s.tref), - do_flush(S1); - false -> - S1 - end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to %% allow for async replies. For now, we ack when the message is @@ -208,12 +200,20 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies %% %% Otherwise, the client would freeze for at least flush interval, %% or until the buffer is filled. - S = + S2 = case Sync of true -> - S2#s{pending_replies = [From | Replies]}; + S1#s{pending_replies = [From | Replies]}; false -> gen_server:reply(From, ok), + S1 + end, + S = + case N >= NMax of + true -> + _ = erlang:cancel_timer(S2#s.tref), + do_flush(S2); + false -> S2 end, %% TODO: add a backpressure mechanism for the server to avoid