From 4c76a2574d636c7167616fa93ae9ebea04db1a12 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 21 Apr 2024 14:20:59 +0200 Subject: [PATCH] fix(ds): Fix egress flush condition --- .../src/emqx_ds_replication_layer_egress.erl | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 4122d937d..9201ccf04 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -127,7 +127,7 @@ init([DB, Shard]) -> metrics_id = MetricsId, queue = queue:new() }, - {ok, start_timer(S)}. + {ok, S}. handle_call( #enqueue_req{ @@ -195,7 +195,6 @@ enqueue( true -> %% Adding this batch would cause buffer to overflow. Flush %% it now, and retry: - cancel_timer(S0), S1 = flush(S0), enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); false -> @@ -203,12 +202,11 @@ enqueue( %% entirety: Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, - case NMsgs >= NMax orelse NBytes >= NBytes of + case NMsgs >= NMax orelse NBytes >= NBytesMax of true -> - cancel_timer(S1), flush(S1); false -> - S1 + ensure_timer(S1) end end. @@ -216,7 +214,7 @@ enqueue( -define(COOLDOWN_MAX, 5000). flush(S) -> - start_timer(do_flush(S)). + do_flush(cancel_timer(S)). do_flush(S0 = #s{n = 0}) -> S0; @@ -372,16 +370,18 @@ compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) -> compose_errors(ErrAcc, _Err) -> ErrAcc. -start_timer(S) -> +ensure_timer(S = #s{tref = undefined}) -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Tref = erlang:send_after(Interval, self(), ?flush), - S#s{tref = Tref}. + S#s{tref = Tref}; +ensure_timer(S) -> + S. -cancel_timer(#s{tref = undefined}) -> - ok; -cancel_timer(#s{tref = TRef}) -> +cancel_timer(S = #s{tref = undefined}) -> + S; +cancel_timer(S = #s{tref = TRef}) -> _ = erlang:cancel_timer(TRef), - ok. + S#s{tref = undefined}. %% @doc Return approximate size of the MQTT message (it doesn't take %% all things into account, for example headers and extras)