diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index f8478bb72..ecc6a492e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -551,6 +551,8 @@ list_nodes() -> end ). +-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> + ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err. ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index f328c7a99..4122d937d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -103,6 +103,11 @@ store_batch(DB, Messages, Opts) -> db :: emqx_ds:db(), shard :: emqx_ds_replication_layer:shard_id(), metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), + n_retries = 0 :: non_neg_integer(), + %% FIXME: Currently max_retries is always 0, because replication + %% layer doesn't guarantee idempotency. Retrying would create + %% duplicate messages. + max_retries = 0 :: non_neg_integer(), n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), tref :: undefined | reference(), @@ -216,7 +221,15 @@ flush(S) -> do_flush(S0 = #s{n = 0}) -> S0; do_flush( - S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics} + S = #s{ + queue = Q, + pending_replies = Replies, + db = DB, + shard = Shard, + metrics_id = Metrics, + n_retries = Retries, + max_retries = MaxRetries + } ) -> Messages = queue:to_list(Q), T0 = erlang:monotonic_time(microsecond), @@ -240,7 +253,7 @@ do_flush( queue = queue:new(), pending_replies = [] }; - {error, recoverable, Reason} -> + {timeout, ServerId} when Retries < MaxRetries -> %% Note: this is a hot loop, so we report error messages %% with `debug' level to avoid wiping the logs. Instead, %% error the detection must rely on the metrics. Debug @@ -248,8 +261,8 @@ do_flush( %% via logger domain. ?tp( debug, - emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason, recoverable => true} + emqx_ds_replication_layer_egress_flush_retry, + #{db => DB, shard => Shard, reason => timeout, server_id => ServerId} ), %% Retry sending the batch: emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), @@ -257,21 +270,30 @@ do_flush( %% We block the gen_server until the next retry. BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), timer:sleep(BlockTime), - S; - Err = {error, unrecoverable, Reason} -> + S#s{n_retries = Retries + 1}; + Err -> ?tp( debug, emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason, recoverable => false} + #{db => DB, shard => Shard, error => Err} ), emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), - lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies), + Reply = + case Err of + {error, _, _} -> Err; + {timeout, ServerId} -> {error, recoverable, {timeout, ServerId}}; + _ -> {error, unrecoverable, Err} + end, + lists:foreach( + fun(From) -> gen_server:reply(From, Reply) end, Replies + ), erlang:garbage_collect(), S#s{ n = 0, n_bytes = 0, queue = queue:new(), - pending_replies = [] + pending_replies = [], + n_retries = 0 } end. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 727f424b8..1d2daacbb 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -684,7 +684,7 @@ t_store_batch_fail(_Config) -> ?check_trace( #{timetrap => 15_000}, try - meck:new(emqx_ds_replication_layer, [passthrough, no_history]), + meck:new(emqx_ds_storage_layer, [passthrough, no_history]), DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), %% Success: @@ -694,7 +694,7 @@ t_store_batch_fail(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), %% Inject unrecoverable error: - meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(_DB, _Shard, _Messages) -> + meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) -> {error, unrecoverable, mock} end), Batch2 = [ @@ -704,35 +704,32 @@ t_store_batch_fail(_Config) -> ?assertMatch( {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) ), - %% Inject a recoverable error: + meck:unload(emqx_ds_storage_layer), + %% Inject a recoveralbe error: + meck:new(ra, [passthrough, no_history]), + meck:expect(ra, process_command, fun(Servers, Shard, Command) -> + ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}), + {timeout, mock} + end), Batch3 = [ message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) ], - meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(DB, Shard, Messages) -> - try - ?tp(store_batch, #{messages => Messages}), - meck:passthrough([DB, Shard, Messages]) - catch - _:_ -> - {error, recoverable, mock} - end - end), - ?inject_crash(#{?snk_kind := store_batch}, snabbkaffe_nemesis:recover_after(3)), + %% Note: due to idempotency issues the number of retries + %% is currently set to 0: + ?assertMatch( + {error, recoverable, {timeout, mock}}, + emqx_ds:store_batch(DB, Batch3, #{sync => true}) + ), + meck:unload(ra), ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) after meck:unload() end, [ - {"number of successfull flushes after retry", fun(Trace) -> - ?assertMatch([_, _], ?of_kind(store_batch, Trace)) - end}, - {"number of retries", fun(Trace) -> - ?assertMatch([_, _, _], ?of_kind(snabbkaffe_crash, Trace)) - end}, {"message ordering", fun(StoredMessages, _Trace) -> [{_, Stream1}, {_, Stream2}] = StoredMessages, ?assertMatch(