diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 8b37b29cb..6c1499620 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -90,7 +90,7 @@ init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), %% TODO: adjust leader dynamically - {ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard), + Leader = shard_leader(DB, Shard), S = #s{ db = DB, shard = Shard, @@ -173,3 +173,13 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), ?flush). + +shard_leader(DB, Shard) -> + %% TODO: use optvar + case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of + {ok, Leader} -> + Leader; + {error, no_leader_for_shard} -> + timer:sleep(500), + shard_leader(DB, Shard) + end.