From 58bd42bfc1847cd46c2abad85885109711a846f2 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 12 Feb 2024 18:48:04 +0100 Subject: [PATCH] wip: prefer local replica in read path --- .../src/emqx_ds_replication_layer.erl | 10 ++++---- .../src/emqx_ds_replication_layer_shard.erl | 25 ++++++++----------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 507754ed0..cd1dd3cc1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -425,23 +425,23 @@ ra_drop_generation(DB, Shard, GenId) -> end. ra_get_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). ra_update_iterator(DB, Shard, Iter, DSKey) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). ra_next(DB, Shard, Iter, BatchSize) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). ra_list_generations_with_lifetimes(DB, Shard) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard). ra_drop_shard(DB, Shard) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index dde96fbc2..223c5782c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -73,10 +73,8 @@ servers(DB, Shard, _Order = leader_preferred) -> servers(DB, Shard, _Order = undefined) -> get_shard_servers(DB, Shard). -server(DB, Shard, _Which = random_follower) -> - pick_random_replica(DB, Shard); -server(DB, Shard, _Which = local) -> - get_local_server(DB, Shard). +server(DB, Shard, _Which = local_preferred) -> + get_server_local_preferred(DB, Shard). get_servers_leader_preferred(DB, Shard) -> %% NOTE: Contact last known leader first, then rest of shard servers. @@ -90,31 +88,30 @@ get_servers_leader_preferred(DB, Shard) -> get_shard_servers(DB, Shard) end. -pick_random_replica(DB, Shard) -> +get_server_local_preferred(DB, Shard) -> %% NOTE: Contact random replica that is not a known leader. %% TODO: Replica may be down, so we may need to retry. ClusterName = get_cluster_name(DB, Shard), case ra_leaderboard:lookup_members(ClusterName) of Servers when is_list(Servers) -> - Leader = ra_leaderboard:lookup_leader(ClusterName), - pick_replica(Servers, Leader); + pick_local(Servers); undefined -> %% TODO %% Leader is unkonwn if there are no servers of this group on the %% local node. We want to pick a replica in that case as well. %% TODO: Dynamic membership. - pick_server(get_shard_servers(DB, Shard)) + pick_random(get_shard_servers(DB, Shard)) end. -pick_replica(Servers, Leader) -> - case lists:delete(Leader, Servers) of +pick_local(Servers) -> + case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of + [Local | _] -> + Local; [] -> - Leader; - Followers -> - pick_server(Followers) + pick_random(Servers) end. -pick_server(Servers) -> +pick_random(Servers) -> lists:nth(rand:uniform(length(Servers)), Servers). get_cluster_name(DB, Shard) ->