diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 667e1daa4..72b0a468b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -302,9 +302,9 @@ repackage_messages(DB, Messages, Sync, Atomic) -> #{}, Messages ), - maps:foreach( - fun(Shard, {NMsgs, ByteSize, RevMessages}) -> - gen_server:call( + maps:fold( + fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> + Err = gen_server:call( ?via(DB, Shard), #enqueue_req{ messages = lists:reverse(RevMessages), @@ -314,11 +314,22 @@ repackage_messages(DB, Messages, Sync, Atomic) -> payload_bytes = ByteSize }, infinity - ) + ), + compose_errors(ErrAcc, Err) end, + ok, Batches ). +compose_errors(ErrAcc, ok) -> + ErrAcc; +compose_errors(ok, Err) -> + Err; +compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) -> + {error, unrecoverable, Err}; +compose_errors(ErrAcc, _Err) -> + ErrAcc. + start_timer(S) -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Tref = erlang:send_after(Interval, self(), ?flush),