fix(ds): Fix egress flush condition

This commit is contained in:
ieQu1 2024-04-21 14:20:59 +02:00
parent 307cd79be2
commit 4c76a2574d
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 12 additions and 12 deletions

View File

@ -127,7 +127,7 @@ init([DB, Shard]) ->
metrics_id = MetricsId,
queue = queue:new()
},
{ok, start_timer(S)}.
{ok, S}.
handle_call(
#enqueue_req{
@ -195,7 +195,6 @@ enqueue(
true ->
%% Adding this batch would cause buffer to overflow. Flush
%% it now, and retry:
cancel_timer(S0),
S1 = flush(S0),
enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
false ->
@ -203,12 +202,11 @@ enqueue(
%% entirety:
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
case NMsgs >= NMax orelse NBytes >= NBytes of
case NMsgs >= NMax orelse NBytes >= NBytesMax of
true ->
cancel_timer(S1),
flush(S1);
false ->
S1
ensure_timer(S1)
end
end.
@ -216,7 +214,7 @@ enqueue(
-define(COOLDOWN_MAX, 5000).
flush(S) ->
start_timer(do_flush(S)).
do_flush(cancel_timer(S)).
do_flush(S0 = #s{n = 0}) ->
S0;
@ -372,16 +370,18 @@ compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) ->
compose_errors(ErrAcc, _Err) ->
ErrAcc.
start_timer(S) ->
ensure_timer(S = #s{tref = undefined}) ->
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
Tref = erlang:send_after(Interval, self(), ?flush),
S#s{tref = Tref}.
S#s{tref = Tref};
ensure_timer(S) ->
S.
cancel_timer(#s{tref = undefined}) ->
ok;
cancel_timer(#s{tref = TRef}) ->
cancel_timer(S = #s{tref = undefined}) ->
S;
cancel_timer(S = #s{tref = TRef}) ->
_ = erlang:cancel_timer(TRef),
ok.
S#s{tref = undefined}.
%% @doc Return approximate size of the MQTT message (it doesn't take
%% all things into account, for example headers and extras)