fix(ds): Retry getting the shard leader

This commit is contained in:
ieQu1 2024-02-16 12:42:48 +01:00
parent 59e4db98f7
commit 8cfb22f0b8
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 11 additions and 1 deletions

View File

@ -90,7 +90,7 @@ init([DB, Shard]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap), process_flag(message_queue_data, off_heap),
%% TODO: adjust leader dynamically %% TODO: adjust leader dynamically
{ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard), Leader = shard_leader(DB, Shard),
S = #s{ S = #s{
db = DB, db = DB,
shard = Shard, shard = Shard,
@ -173,3 +173,13 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl
start_timer() -> start_timer() ->
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
erlang:send_after(Interval, self(), ?flush). erlang:send_after(Interval, self(), ?flush).
shard_leader(DB, Shard) ->
%% TODO: use optvar
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
{ok, Leader} ->
Leader;
{error, no_leader_for_shard} ->
timer:sleep(500),
shard_leader(DB, Shard)
end.