fix(ds-egress): drop pending batch on failures

Before this commit, messages in the current batch will be retried as
part of next batch. This could have led to message duplication which is
probably not what the user wants by default.
This commit is contained in:
Andrew Mayorov 2024-03-19 17:33:20 +01:00
parent a1f5de3f5b
commit fe50a1711b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 12 additions and 13 deletions

View File

@ -159,31 +159,30 @@ do_flush(S = #s{batch = []}) ->
do_flush( do_flush(
S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard}
) -> ) ->
%% FIXME
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
ok -> ok ->
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
true = erlang:garbage_collect(),
?tp( ?tp(
emqx_ds_replication_layer_egress_flush, emqx_ds_replication_layer_egress_flush,
#{db => DB, shard => Shard, batch => Messages} #{db => DB, shard => Shard, batch => Messages}
), );
Error ->
true = erlang:garbage_collect(), true = erlang:garbage_collect(),
S#s{
n = 0,
batch = [],
pending_replies = [],
tref = start_timer()
};
{error, Reason} ->
?tp( ?tp(
warning, warning,
emqx_ds_replication_layer_egress_flush_failed, emqx_ds_replication_layer_egress_flush_failed,
#{db => DB, shard => Shard, reason => Reason} #{db => DB, shard => Shard, reason => Error}
), ),
Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
ok = timer:sleep(Cooldown), ok = timer:sleep(Cooldown)
S#s{tref = start_timer()} end,
end. S#s{
n = 0,
batch = [],
pending_replies = [],
tref = start_timer()
}.
do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),