feat(dsrepl): prefer local replica in read path

To optimize out any unnecessary RPCs. Given the load should be
smoothed evenly across the cluster, choosing non-leader node is
not a priority.
This commit is contained in:
Andrew Mayorov 2024-02-12 18:48:04 +01:00
parent 19305c223c
commit 00d509f27b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 19 additions and 22 deletions

View File

@ -536,35 +536,35 @@ ra_drop_generation(DB, Shard, GenId) ->
end.
ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = ra_random_replica(DB, Shard),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = ra_random_replica(DB, Shard),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
{_Name, Node} = ra_random_replica(DB, Shard),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize).
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard).
ra_drop_shard(DB, Shard) ->

View File

@ -73,10 +73,8 @@ servers(DB, Shard, _Order = leader_preferred) ->
servers(DB, Shard, _Order = undefined) ->
get_shard_servers(DB, Shard).
server(DB, Shard, _Which = random_follower) ->
pick_random_replica(DB, Shard);
server(DB, Shard, _Which = local) ->
get_local_server(DB, Shard).
server(DB, Shard, _Which = local_preferred) ->
get_server_local_preferred(DB, Shard).
get_servers_leader_preferred(DB, Shard) ->
%% NOTE: Contact last known leader first, then rest of shard servers.
@ -90,31 +88,30 @@ get_servers_leader_preferred(DB, Shard) ->
get_shard_servers(DB, Shard)
end.
pick_random_replica(DB, Shard) ->
get_server_local_preferred(DB, Shard) ->
%% NOTE: Contact random replica that is not a known leader.
%% TODO: Replica may be down, so we may need to retry.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
Leader = ra_leaderboard:lookup_leader(ClusterName),
pick_replica(Servers, Leader);
pick_local(Servers);
undefined ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
%% TODO: Dynamic membership.
pick_server(get_shard_servers(DB, Shard))
pick_random(get_shard_servers(DB, Shard))
end.
pick_replica(Servers, Leader) ->
case lists:delete(Leader, Servers) of
pick_local(Servers) ->
case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of
[Local | _] ->
Local;
[] ->
Leader;
Followers ->
pick_server(Followers)
pick_random(Servers)
end.
pick_server(Servers) ->
pick_random(Servers) ->
lists:nth(rand:uniform(length(Servers)), Servers).
get_cluster_name(DB, Shard) ->