From 70737a437ac473f1c9f6146848c80d0487cd411f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 21 Mar 2024 14:39:21 -0300 Subject: [PATCH 1/3] fix(ds): add caller to pending replies before flushing --- .../src/emqx_ds_replication_layer_egress.erl | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 6d879dbb6..b1a1c0cb4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -193,14 +193,6 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies Msg -> S0#s{n = N + 1, batch = [Msg | Batch]} end, - S2 = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S0#s.tref), - do_flush(S1); - false -> - S1 - end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to %% allow for async replies. For now, we ack when the message is @@ -208,12 +200,20 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies %% %% Otherwise, the client would freeze for at least flush interval, %% or until the buffer is filled. - S = + S2 = case Sync of true -> - S2#s{pending_replies = [From | Replies]}; + S1#s{pending_replies = [From | Replies]}; false -> gen_server:reply(From, ok), + S1 + end, + S = + case N >= NMax of + true -> + _ = erlang:cancel_timer(S2#s.tref), + do_flush(S2); + false -> S2 end, %% TODO: add a backpressure mechanism for the server to avoid From 68af21113084f8269c877ccf04b9a9ee61fa8fe4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 21 Mar 2024 15:33:18 -0300 Subject: [PATCH 2/3] fix(ds): reply sync callers after raft store failure --- .../src/emqx_ds_replication_layer_egress.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index b1a1c0cb4..0b0618a80 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -175,7 +175,11 @@ do_flush( #{db => DB, shard => Shard, reason => Error} ), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown) + ok = timer:sleep(Cooldown), + %% Since we drop the entire batch here, we at least reply callers with an + %% error so they don't hang indefinitely in the `gen_server' call with + %% `infinity' timeout. + lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies) end, S#s{ n = 0, From 796c04e7a8530d97dde5e9e15dcd62e5e05c4f84 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 21 Mar 2024 15:47:29 -0300 Subject: [PATCH 3/3] test: fix flaky test We should emit the trace event before replying to callers. Example failure: https://github.com/emqx/emqx/actions/runs/8378977952/job/22946318696#step:6:182 ``` =CRITICAL REPORT==== 21-Mar-2024::17:45:37.676024 === "check stage" failed: error {assertMatch,[{module,emqx_ds_storage_bitfield_lts_SUITE}, {line,270}, {expression,"? of_kind ( emqx_ds_replication_layer_egress_flush , Trace )"}, {pattern,"[ # { batch := [ _ , _ , _ ] } ]"}, {value,[]}]} Stacktrace: [{emqx_ds_storage_bitfield_lts_SUITE, '-t_atomic_store_batch/1-fun-1-',1, [{file, "/__w/emqx/emqx/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl"}, {line,270}]}, {emqx_ds_storage_bitfield_lts_SUITE,t_atomic_store_batch,1, [{file, "/__w/emqx/emqx/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl"}, {line,249}]}] ``` --- .../src/emqx_ds_replication_layer_egress.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 0b0618a80..128aeb380 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -161,12 +161,13 @@ do_flush( ) -> case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of ok -> - lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} - ); + ), + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + true = erlang:garbage_collect(), + ok; Error -> true = erlang:garbage_collect(), ?tp(