From 887e151be52bb9301fc730e1b383dc937760c200 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 19 Feb 2024 19:09:22 +0100 Subject: [PATCH] fix(dsrepl): handle errors gracefully in shard egress process Also add cooldown on timeout / unavailability. --- .../src/emqx_ds_replication_layer.erl | 14 +++--- .../src/emqx_ds_replication_layer_egress.erl | 45 ++++++++++++------- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 003e5799f..a19f376fe 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -492,23 +492,27 @@ list_nodes() -> %% +%% TODO +%% Too large for normal operation, need better backpressure mechanism. +-define(RA_TIMEOUT, 60 * 1000). + ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, ?batch_messages => Messages }, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> - error(Error, [DB, Shard]) + Error end. ra_add_generation(DB, Shard) -> Command = #{?tag => add_generation}, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> @@ -518,7 +522,7 @@ ra_add_generation(DB, Shard) -> ra_update_config(DB, Shard, Opts) -> Command = #{?tag => update_config, ?config => Opts}, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> @@ -528,7 +532,7 @@ ra_update_config(DB, Shard, Opts) -> ra_drop_generation(DB, Shard, GenId) -> Command = #{?tag => drop_generation, ?generation => GenId}, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command) of + case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; Error -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index c130b8b2f..080eda937 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -83,15 +83,13 @@ store_batch(DB, Messages, Opts) -> ); true -> maps:foreach( - fun(Shard, Batch) -> + fun(Shard, BatchIn) -> Timestamp = emqx_ds:timestamp_us(), + Batch = [emqx_message:set_timestamp(Timestamp, Message) || Message <- BatchIn], gen_server:call( ?via(DB, Shard), #enqueue_atomic_req{ - batch = [ - emqx_message:set_timestamp(Timestamp, Message) - || Message <- Batch - ], + batch = Batch, sync = Sync }, infinity @@ -156,22 +154,39 @@ terminate(_Reason, _S) -> %% Internal functions %%================================================================================ +-define(COOLDOWN_MIN, 1000). +-define(COOLDOWN_MAX, 5000). + do_flush(S = #s{batch = []}) -> S#s{tref = start_timer()}; do_flush( S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} ) -> %% FIXME - ok = emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)), - [gen_server:reply(From, ok) || From <- lists:reverse(Replies)], - ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}), - erlang:garbage_collect(), - S#s{ - n = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }. + case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of + ok -> + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + ?tp( + emqx_ds_replication_layer_egress_flush, + #{db => DB, shard => Shard, batch => Messages} + ), + true = erlang:garbage_collect(), + S#s{ + n = 0, + batch = [], + pending_replies = [], + tref = start_timer() + }; + {error, Reason} -> + ?tp( + warning, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, reason => Reason} + ), + Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + ok = timer:sleep(Cooldown), + S#s{tref = start_timer()} + end. do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),