From 00d509f27b4d31e2fbe14d4438d23674c6143736 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 12 Feb 2024 18:48:04 +0100 Subject: [PATCH] feat(dsrepl): prefer local replica in read path To optimize out any unnecessary RPCs. Given the load should be smoothed evenly across the cluster, choosing non-leader node is not a priority. --- .../src/emqx_ds_replication_layer.erl | 16 ++++++------ .../src/emqx_ds_replication_layer_shard.erl | 25 ++++++++----------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index c091392b1..003e5799f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -536,35 +536,35 @@ ra_drop_generation(DB, Shard, GenId) -> end. ra_get_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). ra_update_iterator(DB, Shard, Iter, DSKey) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). ra_next(DB, Shard, Iter, BatchSize) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize). ra_list_generations_with_lifetimes(DB, Shard) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard). ra_drop_shard(DB, Shard) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index dde96fbc2..223c5782c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -73,10 +73,8 @@ servers(DB, Shard, _Order = leader_preferred) -> servers(DB, Shard, _Order = undefined) -> get_shard_servers(DB, Shard). -server(DB, Shard, _Which = random_follower) -> - pick_random_replica(DB, Shard); -server(DB, Shard, _Which = local) -> - get_local_server(DB, Shard). +server(DB, Shard, _Which = local_preferred) -> + get_server_local_preferred(DB, Shard). get_servers_leader_preferred(DB, Shard) -> %% NOTE: Contact last known leader first, then rest of shard servers. @@ -90,31 +88,30 @@ get_servers_leader_preferred(DB, Shard) -> get_shard_servers(DB, Shard) end. -pick_random_replica(DB, Shard) -> +get_server_local_preferred(DB, Shard) -> %% NOTE: Contact random replica that is not a known leader. %% TODO: Replica may be down, so we may need to retry. ClusterName = get_cluster_name(DB, Shard), case ra_leaderboard:lookup_members(ClusterName) of Servers when is_list(Servers) -> - Leader = ra_leaderboard:lookup_leader(ClusterName), - pick_replica(Servers, Leader); + pick_local(Servers); undefined -> %% TODO %% Leader is unkonwn if there are no servers of this group on the %% local node. We want to pick a replica in that case as well. %% TODO: Dynamic membership. - pick_server(get_shard_servers(DB, Shard)) + pick_random(get_shard_servers(DB, Shard)) end. -pick_replica(Servers, Leader) -> - case lists:delete(Leader, Servers) of +pick_local(Servers) -> + case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of + [Local | _] -> + Local; [] -> - Leader; - Followers -> - pick_server(Followers) + pick_random(Servers) end. -pick_server(Servers) -> +pick_random(Servers) -> lists:nth(rand:uniform(length(Servers)), Servers). get_cluster_name(DB, Shard) ->