diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 6d879dbb6..128aeb380 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -161,12 +161,13 @@ do_flush( ) -> case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of ok -> - lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} - ); + ), + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + true = erlang:garbage_collect(), + ok; Error -> true = erlang:garbage_collect(), ?tp( @@ -175,7 +176,11 @@ do_flush( #{db => DB, shard => Shard, reason => Error} ), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown) + ok = timer:sleep(Cooldown), + %% Since we drop the entire batch here, we at least reply callers with an + %% error so they don't hang indefinitely in the `gen_server' call with + %% `infinity' timeout. + lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies) end, S#s{ n = 0, @@ -193,14 +198,6 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies Msg -> S0#s{n = N + 1, batch = [Msg | Batch]} end, - S2 = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S0#s.tref), - do_flush(S1); - false -> - S1 - end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to %% allow for async replies. For now, we ack when the message is @@ -208,12 +205,20 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies %% %% Otherwise, the client would freeze for at least flush interval, %% or until the buffer is filled. - S = + S2 = case Sync of true -> - S2#s{pending_replies = [From | Replies]}; + S1#s{pending_replies = [From | Replies]}; false -> gen_server:reply(From, ok), + S1 + end, + S = + case N >= NMax of + true -> + _ = erlang:cancel_timer(S2#s.tref), + do_flush(S2); + false -> S2 end, %% TODO: add a backpressure mechanism for the server to avoid