fix(dsrepl): classify ra error conditions more carefully

Most importantly: avoid automatic retries of `shutdown` and `nodedown`
errors as this could easily lead to Raft log entries duplication.
This commit is contained in:
Andrew Mayorov 2024-06-26 18:40:30 +02:00
parent 733751fadd
commit 3d296abde9
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 73 additions and 23 deletions

View File

@ -63,6 +63,8 @@
apply/3,
tick/2,
state_enter/2,
snapshot_module/0
]).
@ -380,7 +382,7 @@ init_buffer(_DB, _Shard, _Options) ->
{ok, #bs{}}.
-spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) ->
{egress_state(), ok | {error, recoverable | unrecoverable, _}}.
{egress_state(), ok | emqx_ds:error(_)}.
flush_buffer(DB, Shard, Messages, State) ->
case ra_store_batch(DB, Shard, Messages) of
{timeout, ServerId} ->
@ -623,18 +625,20 @@ list_nodes() ->
).
-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) ->
ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err.
ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}.
ra_store_batch(DB, Shard, Messages) ->
Command = #{
?tag => ?BATCH,
?batch_messages => Messages
},
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
Error
{timeout, _} = Timeout ->
Timeout;
{error, Reason = servers_unreachable} ->
{error, recoverable, Reason}
end.
ra_add_generation(DB, Shard) ->
@ -970,7 +974,19 @@ set_ts({DB, Shard}, TS) ->
%%
-spec state_enter(ra_server:ra_state() | eol, ra_state()) -> ra_machine:effects().
state_enter(MemberState, #{db_shard := {DB, Shard}, latest := Latest}) ->
?tp(
ds_ra_state_enter,
#{db => DB, shard => Shard, latest => Latest, state => MemberState}
),
[].
%%
approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) ->
%% NOTE: Overhead here is basically few empty maps + 8-byte message id.
%% TODO: Probably need to ask the storage layer about the footprint.
MinOverhead = 40,
MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload).

View File

@ -4,6 +4,8 @@
-module(emqx_ds_replication_layer_shard).
-include_lib("snabbkaffe/include/trace.hrl").
%% API:
-export([start_link/3]).
@ -19,6 +21,12 @@
servers/3
]).
%% Safe Process Command API
-export([
process_command/3,
try_servers/3
]).
%% Membership
-export([
add_local_server/2,
@ -37,6 +45,12 @@
-type server() :: ra:server_id().
-type server_error() :: server_error(none()).
-type server_error(Reason) ::
{timeout, server()}
| {error, server(), Reason}
| {error, servers_unreachable}.
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
%%
@ -146,6 +160,40 @@ local_site() ->
%%
-spec process_command([server()], _Command, timeout()) ->
{ok, _Result, _Leader :: server()} | server_error().
process_command(Servers, Command, Timeout) ->
try_servers(Servers, fun ra:process_command/3, [Command, Timeout]).
-spec try_servers([server()], function(), [_Arg]) ->
{ok, _Result, _Leader :: server()} | server_error(_Reason).
try_servers([Server | Rest], Fun, Args) ->
case is_server_online(Server) andalso erlang:apply(Fun, [Server | Args]) of
{ok, R, Leader} ->
{ok, R, Leader};
_Online = false ->
?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => offline}),
try_servers(Rest, Fun, Args);
{error, Reason = noproc} ->
?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => Reason}),
try_servers(Rest, Fun, Args);
{error, Reason} when Reason =:= nodedown orelse Reason =:= shutdown ->
%% NOTE
%% Conceptually, those error conditions basically mean the same as a plain
%% timeout: "it's impossible to tell if operation has succeeded or not".
?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => Reason}),
{timeout, Server};
{timeout, _} = Timeout ->
?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => timeout}),
Timeout;
{error, Reason} ->
{error, Server, Reason}
end;
try_servers([], _Fun, _Args) ->
{error, servers_unreachable}.
%%
%% @doc Add a local server to the shard cluster.
%% It's recommended to have the local server running before calling this function.
%% This function is idempotent.
@ -174,10 +222,10 @@ add_local_server(DB, Shard) ->
}
end,
Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT,
case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of
case try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of
{ok, _, _Leader} ->
ok;
{error, already_member} ->
{error, _Server, already_member} ->
ok;
Error ->
{error, recoverable, Error}
@ -208,10 +256,10 @@ drop_local_server(DB, Shard) ->
remove_server(DB, Shard, Server) ->
ShardServers = shard_servers(DB, Shard),
Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT,
case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of
case try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of
{ok, _, _Leader} ->
ok;
{error, not_member} ->
{error, _Server, not_member} ->
ok;
Error ->
{error, recoverable, Error}
@ -261,20 +309,6 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership}
member_readiness(#{}) ->
unknown.
%%
ra_try_servers([Server | Rest], Fun, Args) ->
case erlang:apply(Fun, [Server | Args]) of
{ok, R, Leader} ->
{ok, R, Leader};
{error, Reason} when Reason == noproc; Reason == nodedown ->
ra_try_servers(Rest, Fun, Args);
ErrorOrTimeout ->
ErrorOrTimeout
end;
ra_try_servers([], _Fun, _Args) ->
{error, servers_unreachable}.
ra_overview(Server) ->
case ra:member_overview(Server) of
{ok, Overview, _Leader} ->