fix(ds): Perform read operations on the leader.

This commit is contained in:
ieQu1 2024-05-18 15:53:21 +02:00
parent 4580906405
commit c6fc76e335
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
4 changed files with 104 additions and 59 deletions

View File

@ -124,6 +124,8 @@ The following application environment variables are available:
- `emqx_durable_storage.egress_flush_interval`: period at which the batches of messages are committed to the durable storage.
- `emqx_durable_storage.reads`: `leader_preferred` | `local_preferred`.
Runtime settings for the durable storages can be modified via CLI as well as the REST API.
The following CLI commands are available:

View File

@ -561,12 +561,27 @@ list_nodes() ->
%% Too large for normal operation, need better backpressure mechanism.
-define(RA_TIMEOUT, 60 * 1000).
-define(SAFERPC(EXPR),
-define(SAFE_ERPC(EXPR),
try
EXPR
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
error:RPCError__ = {erpc, _} ->
{error, recoverable, RPCError__}
end
).
-define(SHARD_RPC(DB, SHARD, NODE, BODY),
case
emqx_ds_replication_layer_shard:servers(
DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred)
)
of
[{_, NODE} | _] ->
begin
BODY
end;
[] ->
{error, recoverable, replica_offline}
end
).
@ -623,44 +638,79 @@ ra_drop_generation(DB, Shard, GenId) ->
end.
ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimestampUs = timestamp_to_timeus(Time),
?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs))
).
ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time))
).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimeUs = timestamp_to_timeus(StartTime),
?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs))
).
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimeUs = timestamp_to_timeus(StartTime),
?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(
emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)
)
).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey))
).
ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
RPCError = {badrpc, _} ->
{error, recoverable, RPCError};
Other ->
Other
end.
?SHARD_RPC(
DB,
Shard,
Node,
case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
Err = {badrpc, _} ->
{error, recoverable, Err};
Ret ->
Ret
end
).
ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize))
).
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of
Reply = ?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard))
),
case Reply of
Gens = #{} ->
maps:map(
fun(_GenId, Data = #{since := Since, until := Until}) ->

View File

@ -28,8 +28,7 @@
%% Dynamic server location API
-export([
servers/3,
server/3
servers/3
]).
%% Membership
@ -83,16 +82,15 @@ server_name(DB, Shard, Site) ->
%%
-spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server(), ...] when
Order :: leader_preferred | undefined.
servers(DB, Shard, _Order = leader_preferred) ->
-spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server()] when
Order :: leader_preferred | local_preferred | undefined.
servers(DB, Shard, leader_preferred) ->
get_servers_leader_preferred(DB, Shard);
servers(DB, Shard, local_preferred) ->
get_servers_local_preferred(DB, Shard);
servers(DB, Shard, _Order = undefined) ->
get_shard_servers(DB, Shard).
server(DB, Shard, _Which = local_preferred) ->
get_server_local_preferred(DB, Shard).
get_servers_leader_preferred(DB, Shard) ->
%% NOTE: Contact last known leader first, then rest of shard servers.
ClusterName = get_cluster_name(DB, Shard),
@ -104,17 +102,24 @@ get_servers_leader_preferred(DB, Shard) ->
get_online_servers(DB, Shard)
end.
get_server_local_preferred(DB, Shard) ->
%% NOTE: Contact either local server or a random replica.
get_servers_local_preferred(DB, Shard) ->
%% Return list of servers, where the local replica (if exists) is
%% the first element. Note: result is _NOT_ shuffled. This can be
%% bad for the load balancing, but it makes results more
%% deterministic. Caller that doesn't care about that can shuffle
%% the results by itself.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
pick_local(Servers);
undefined ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
pick_random(get_online_servers(DB, Shard))
Servers = get_online_servers(DB, Shard);
Servers when is_list(Servers) ->
ok
end,
case lists:keyfind(node(), 2, Servers) of
false ->
Servers;
Local when is_tuple(Local) ->
[Local | lists:delete(Local, Servers)]
end.
lookup_leader(DB, Shard) ->
@ -139,17 +144,6 @@ filter_online(Servers) ->
is_server_online({_Name, Node}) ->
Node == node() orelse lists:member(Node, nodes()).
pick_local(Servers) ->
case lists:keyfind(node(), 2, Servers) of
Local when is_tuple(Local) ->
Local;
false ->
pick_random(Servers)
end.
pick_random(Servers) ->
lists:nth(rand:uniform(length(Servers)), Servers).
get_cluster_name(DB, Shard) ->
memoize(fun cluster_name/2, [DB, Shard]).

View File

@ -479,11 +479,13 @@ t_rebalance_offline_restarts(Config) ->
%%
shard_server_info(Node, DB, Shard, Site, Info) ->
Server = shard_server(Node, DB, Shard, Site),
{Server, ds_repl_shard(Node, server_info, [Info, Server])}.
shard_server(Node, DB, Shard, Site) ->
ds_repl_shard(Node, shard_server, [DB, Shard, Site]).
?ON(
Node,
begin
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
{Server, emqx_ds_replication_layer_shard:server_info(Info, Server)}
end
).
ds_repl_meta(Node, Fun) ->
ds_repl_meta(Node, Fun, []).
@ -499,9 +501,6 @@ ds_repl_meta(Node, Fun, Args) ->
error(meta_op_failed)
end.
ds_repl_shard(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).