From 4382971443548ed5f92bfb77eb644bf2805eb25c Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 2 Apr 2024 10:45:13 +0200 Subject: [PATCH] fix(ds): Preserve errors in the egress --- .../src/emqx_ds_replication_layer_egress.erl | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 667e1daa4..72b0a468b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -302,9 +302,9 @@ repackage_messages(DB, Messages, Sync, Atomic) -> #{}, Messages ), - maps:foreach( - fun(Shard, {NMsgs, ByteSize, RevMessages}) -> - gen_server:call( + maps:fold( + fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> + Err = gen_server:call( ?via(DB, Shard), #enqueue_req{ messages = lists:reverse(RevMessages), @@ -314,11 +314,22 @@ repackage_messages(DB, Messages, Sync, Atomic) -> payload_bytes = ByteSize }, infinity - ) + ), + compose_errors(ErrAcc, Err) end, + ok, Batches ). +compose_errors(ErrAcc, ok) -> + ErrAcc; +compose_errors(ok, Err) -> + Err; +compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) -> + {error, unrecoverable, Err}; +compose_errors(ErrAcc, _Err) -> + ErrAcc. + start_timer(S) -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Tref = erlang:send_after(Interval, self(), ?flush),