From 8cfb22f0b89d74442b0c3a669f618e912b1b81e6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 16 Feb 2024 12:42:48 +0100 Subject: [PATCH] fix(ds): Retry getting the shard leader --- .../src/emqx_ds_replication_layer_egress.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 8b37b29cb..6c1499620 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -90,7 +90,7 @@ init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), %% TODO: adjust leader dynamically - {ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard), + Leader = shard_leader(DB, Shard), S = #s{ db = DB, shard = Shard, @@ -173,3 +173,13 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), ?flush). + +shard_leader(DB, Shard) -> + %% TODO: use optvar + case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of + {ok, Leader} -> + Leader; + {error, no_leader_for_shard} -> + timer:sleep(500), + shard_leader(DB, Shard) + end.