From 2ebc8dcc55d99198c132db556157ac660ac575ab Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 14 Mar 2024 10:17:18 -0300 Subject: [PATCH] fix(ds): use `infinity` timeout when storing batches --- .../src/emqx_ds_replication_layer_egress.erl | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 25064ad60..8b1a9a835 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -70,20 +70,28 @@ store_batch(DB, Messages, Opts) -> lists:foreach( fun(Message) -> Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - gen_server:call(?via(DB, Shard), #enqueue_req{ - message = Message, - sync = Sync - }) + gen_server:call( + ?via(DB, Shard), + #enqueue_req{ + message = Message, + sync = Sync + }, + infinity + ) end, Messages ); true -> maps:foreach( fun(Shard, Batch) -> - gen_server:call(?via(DB, Shard), #enqueue_atomic_req{ - batch = Batch, - sync = Sync - }) + gen_server:call( + ?via(DB, Shard), + #enqueue_atomic_req{ + batch = Batch, + sync = Sync + }, + infinity + ) end, maps:groups_from_list( fun(Message) ->