From fe50a1711b99418f5252fdc170cf9041d8c5a829 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Mar 2024 17:33:20 +0100 Subject: [PATCH] fix(ds-egress): drop pending batch on failures Before this commit, messages in the current batch will be retried as part of next batch. This could have led to message duplication which is probably not what the user wants by default. --- .../src/emqx_ds_replication_layer_egress.erl | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index d3dcd887d..6d879dbb6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -159,31 +159,30 @@ do_flush(S = #s{batch = []}) -> do_flush( S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} ) -> - %% FIXME case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of ok -> lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} - ), + ); + Error -> true = erlang:garbage_collect(), - S#s{ - n = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }; - {error, Reason} -> ?tp( warning, emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason} + #{db => DB, shard => Shard, reason => Error} ), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown), - S#s{tref = start_timer()} - end. + ok = timer:sleep(Cooldown) + end, + S#s{ + n = 0, + batch = [], + pending_replies = [], + tref = start_timer() + }. do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),