diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index d3dcd887d..6d879dbb6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -159,31 +159,30 @@ do_flush(S = #s{batch = []}) -> do_flush( S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} ) -> - %% FIXME case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of ok -> lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} - ), + ); + Error -> true = erlang:garbage_collect(), - S#s{ - n = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }; - {error, Reason} -> ?tp( warning, emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason} + #{db => DB, shard => Shard, reason => Error} ), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown), - S#s{tref = start_timer()} - end. + ok = timer:sleep(Cooldown) + end, + S#s{ + n = 0, + batch = [], + pending_replies = [], + tref = start_timer() + }. do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),