diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 6d879dbb6..b1a1c0cb4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -193,14 +193,6 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies Msg -> S0#s{n = N + 1, batch = [Msg | Batch]} end, - S2 = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S0#s.tref), - do_flush(S1); - false -> - S1 - end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to %% allow for async replies. For now, we ack when the message is @@ -208,12 +200,20 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies %% %% Otherwise, the client would freeze for at least flush interval, %% or until the buffer is filled. - S = + S2 = case Sync of true -> - S2#s{pending_replies = [From | Replies]}; + S1#s{pending_replies = [From | Replies]}; false -> gen_server:reply(From, ok), + S1 + end, + S = + case N >= NMax of + true -> + _ = erlang:cancel_timer(S2#s.tref), + do_flush(S2); + false -> S2 end, %% TODO: add a backpressure mechanism for the server to avoid