From 3d296abde97e833cebfe00f69cb05b45a9637bb9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 26 Jun 2024 18:40:30 +0200 Subject: [PATCH] fix(dsrepl): classify ra error conditions more carefully Most importantly: avoid automatic retries of `shutdown` and `nodedown` errors as this could easily lead to Raft log entries duplication. --- .../src/emqx_ds_replication_layer.erl | 26 +++++-- .../src/emqx_ds_replication_layer_shard.erl | 70 ++++++++++++++----- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index a2476ba66..669abdbf1 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -63,6 +63,8 @@ apply/3, tick/2, + state_enter/2, + snapshot_module/0 ]). @@ -380,7 +382,7 @@ init_buffer(_DB, _Shard, _Options) -> {ok, #bs{}}. -spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) -> - {egress_state(), ok | {error, recoverable | unrecoverable, _}}. + {egress_state(), ok | emqx_ds:error(_)}. flush_buffer(DB, Shard, Messages, State) -> case ra_store_batch(DB, Shard, Messages) of {timeout, ServerId} -> @@ -623,18 +625,20 @@ list_nodes() -> ). -spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> - ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err. + ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}. ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, ?batch_messages => Messages }, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command, ?RA_TIMEOUT) of + case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; - Error -> - Error + {timeout, _} = Timeout -> + Timeout; + {error, Reason = servers_unreachable} -> + {error, recoverable, Reason} end. ra_add_generation(DB, Shard) -> @@ -970,7 +974,19 @@ set_ts({DB, Shard}, TS) -> %% +-spec state_enter(ra_server:ra_state() | eol, ra_state()) -> ra_machine:effects(). +state_enter(MemberState, #{db_shard := {DB, Shard}, latest := Latest}) -> + ?tp( + ds_ra_state_enter, + #{db => DB, shard => Shard, latest => Latest, state => MemberState} + ), + []. + +%% + approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) -> + %% NOTE: Overhead here is basically few empty maps + 8-byte message id. + %% TODO: Probably need to ask the storage layer about the footprint. MinOverhead = 40, MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload). diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index b43373c43..cdd62d874 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -4,6 +4,8 @@ -module(emqx_ds_replication_layer_shard). +-include_lib("snabbkaffe/include/trace.hrl"). + %% API: -export([start_link/3]). @@ -19,6 +21,12 @@ servers/3 ]). +%% Safe Process Command API +-export([ + process_command/3, + try_servers/3 +]). + %% Membership -export([ add_local_server/2, @@ -37,6 +45,12 @@ -type server() :: ra:server_id(). +-type server_error() :: server_error(none()). +-type server_error(Reason) :: + {timeout, server()} + | {error, server(), Reason} + | {error, servers_unreachable}. + -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). %% @@ -146,6 +160,40 @@ local_site() -> %% +-spec process_command([server()], _Command, timeout()) -> + {ok, _Result, _Leader :: server()} | server_error(). +process_command(Servers, Command, Timeout) -> + try_servers(Servers, fun ra:process_command/3, [Command, Timeout]). + +-spec try_servers([server()], function(), [_Arg]) -> + {ok, _Result, _Leader :: server()} | server_error(_Reason). +try_servers([Server | Rest], Fun, Args) -> + case is_server_online(Server) andalso erlang:apply(Fun, [Server | Args]) of + {ok, R, Leader} -> + {ok, R, Leader}; + _Online = false -> + ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => offline}), + try_servers(Rest, Fun, Args); + {error, Reason = noproc} -> + ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => Reason}), + try_servers(Rest, Fun, Args); + {error, Reason} when Reason =:= nodedown orelse Reason =:= shutdown -> + %% NOTE + %% Conceptually, those error conditions basically mean the same as a plain + %% timeout: "it's impossible to tell if operation has succeeded or not". + ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => Reason}), + {timeout, Server}; + {timeout, _} = Timeout -> + ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => timeout}), + Timeout; + {error, Reason} -> + {error, Server, Reason} + end; +try_servers([], _Fun, _Args) -> + {error, servers_unreachable}. + +%% + %% @doc Add a local server to the shard cluster. %% It's recommended to have the local server running before calling this function. %% This function is idempotent. @@ -174,10 +222,10 @@ add_local_server(DB, Shard) -> } end, Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, - case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of + case try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of {ok, _, _Leader} -> ok; - {error, already_member} -> + {error, _Server, already_member} -> ok; Error -> {error, recoverable, Error} @@ -208,10 +256,10 @@ drop_local_server(DB, Shard) -> remove_server(DB, Shard, Server) -> ShardServers = shard_servers(DB, Shard), Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, - case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of + case try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of {ok, _, _Leader} -> ok; - {error, not_member} -> + {error, _Server, not_member} -> ok; Error -> {error, recoverable, Error} @@ -261,20 +309,6 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership} member_readiness(#{}) -> unknown. -%% - -ra_try_servers([Server | Rest], Fun, Args) -> - case erlang:apply(Fun, [Server | Args]) of - {ok, R, Leader} -> - {ok, R, Leader}; - {error, Reason} when Reason == noproc; Reason == nodedown -> - ra_try_servers(Rest, Fun, Args); - ErrorOrTimeout -> - ErrorOrTimeout - end; -ra_try_servers([], _Fun, _Args) -> - {error, servers_unreachable}. - ra_overview(Server) -> case ra:member_overview(Server) of {ok, Overview, _Leader} ->