fix(ds): Preserve errors in the egress

This commit is contained in:
ieQu1 2024-04-02 10:45:13 +02:00
parent 94ca7ad0f8
commit 4382971443
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 15 additions and 4 deletions

View File

@ -302,9 +302,9 @@ repackage_messages(DB, Messages, Sync, Atomic) ->
#{}, #{},
Messages Messages
), ),
maps:foreach( maps:fold(
fun(Shard, {NMsgs, ByteSize, RevMessages}) -> fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) ->
gen_server:call( Err = gen_server:call(
?via(DB, Shard), ?via(DB, Shard),
#enqueue_req{ #enqueue_req{
messages = lists:reverse(RevMessages), messages = lists:reverse(RevMessages),
@ -314,11 +314,22 @@ repackage_messages(DB, Messages, Sync, Atomic) ->
payload_bytes = ByteSize payload_bytes = ByteSize
}, },
infinity infinity
) ),
compose_errors(ErrAcc, Err)
end, end,
ok,
Batches Batches
). ).
compose_errors(ErrAcc, ok) ->
ErrAcc;
compose_errors(ok, Err) ->
Err;
compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) ->
{error, unrecoverable, Err};
compose_errors(ErrAcc, _Err) ->
ErrAcc.
start_timer(S) -> start_timer(S) ->
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
Tref = erlang:send_after(Interval, self(), ?flush), Tref = erlang:send_after(Interval, self(), ?flush),