wip: prefer local replica in read path

This commit is contained in:
Andrew Mayorov 2024-02-12 18:48:04 +01:00
parent 3a6b4b57d7
commit 58bd42bfc1
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 16 additions and 19 deletions

View File

@ -425,23 +425,23 @@ ra_drop_generation(DB, Shard, GenId) ->
end.
ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard).
ra_drop_shard(DB, Shard) ->

View File

@ -73,10 +73,8 @@ servers(DB, Shard, _Order = leader_preferred) ->
servers(DB, Shard, _Order = undefined) ->
get_shard_servers(DB, Shard).
server(DB, Shard, _Which = random_follower) ->
pick_random_replica(DB, Shard);
server(DB, Shard, _Which = local) ->
get_local_server(DB, Shard).
server(DB, Shard, _Which = local_preferred) ->
get_server_local_preferred(DB, Shard).
get_servers_leader_preferred(DB, Shard) ->
%% NOTE: Contact last known leader first, then rest of shard servers.
@ -90,31 +88,30 @@ get_servers_leader_preferred(DB, Shard) ->
get_shard_servers(DB, Shard)
end.
pick_random_replica(DB, Shard) ->
get_server_local_preferred(DB, Shard) ->
%% NOTE: Contact random replica that is not a known leader.
%% TODO: Replica may be down, so we may need to retry.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
Leader = ra_leaderboard:lookup_leader(ClusterName),
pick_replica(Servers, Leader);
pick_local(Servers);
undefined ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
%% TODO: Dynamic membership.
pick_server(get_shard_servers(DB, Shard))
pick_random(get_shard_servers(DB, Shard))
end.
pick_replica(Servers, Leader) ->
case lists:delete(Leader, Servers) of
pick_local(Servers) ->
case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of
[Local | _] ->
Local;
[] ->
Leader;
Followers ->
pick_server(Followers)
pick_random(Servers)
end.
pick_server(Servers) ->
pick_random(Servers) ->
lists:nth(rand:uniform(length(Servers)), Servers).
get_cluster_name(DB, Shard) ->