refactor: cancel flush timer sooner

Avoids the cancellation being delayed.
This commit is contained in:
Thales Macedo Garitezi 2023-01-12 14:24:34 -03:00
parent 477c55d8ef
commit af6807e863
1 changed files with 16 additions and 18 deletions

View File

@ -445,17 +445,17 @@ handle_query_requests(Request0, Data0) ->
Data = Data0#{queue := NewQ}, Data = Data0#{queue := NewQ},
maybe_flush(Data). maybe_flush(Data).
maybe_flush(Data) -> maybe_flush(Data0) ->
#{ #{
batch_size := BatchSize, batch_size := BatchSize,
queue := Q queue := Q
} = Data, } = Data0,
QueueCount = queue_count(Q), QueueCount = queue_count(Q),
case QueueCount >= BatchSize of case QueueCount >= BatchSize of
true -> true ->
flush(Data); flush(Data0);
false -> false ->
{keep_state, ensure_flush_timer(Data)} {keep_state, ensure_flush_timer(Data0)}
end. end.
%% Called during the `running' state only. %% Called during the `running' state only.
@ -465,19 +465,19 @@ flush(Data0) ->
batch_size := BatchSize, batch_size := BatchSize,
queue := Q0 queue := Q0
} = Data0, } = Data0,
Data1 = cancel_flush_timer(Data0),
case replayq:count(Q0) of case replayq:count(Q0) of
0 -> 0 ->
Data = cancel_flush_timer(Data0), {keep_state, Data1};
{keep_state, Data};
_ -> _ ->
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
IsBatch = BatchSize =/= 1, IsBatch = BatchSize =/= 1,
%% We *must* use the new queue, because we currently can't %% We *must* use the new queue, because we currently can't
%% `nack' a `pop'. %% `nack' a `pop'.
%% Maybe we could re-open the queue? %% Maybe we could re-open the queue?
Data1 = Data0#{queue := Q1}, Data2 = Data1#{queue := Q1},
Ref = make_message_ref(), Ref = make_message_ref(),
do_flush(Data1, #{ do_flush(Data2, #{
new_queue => Q1, new_queue => Q1,
is_batch => IsBatch, is_batch => IsBatch,
batch => Batch, batch => Batch,
@ -511,7 +511,6 @@ do_flush(
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
QueryOpts = #{inflight_name => InflightTID}, QueryOpts = #{inflight_name => InflightTID},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Data1 = cancel_flush_timer(Data0),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
case reply_caller(Id, Reply) of case reply_caller(Id, Reply) of
%% Failed; remove the request from the queue, as we cannot pop %% Failed; remove the request from the queue, as we cannot pop
@ -525,7 +524,7 @@ do_flush(
is_not_connected_result(Result), is_not_connected_result(Result),
ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Request, Id, Index), ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Request, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
{next_state, blocked, Data1}; {next_state, blocked, Data0};
%% Success; just ack. %% Success; just ack.
false -> false ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
@ -533,9 +532,9 @@ do_flush(
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
case replayq:count(Q1) > 0 of case replayq:count(Q1) > 0 of
true -> true ->
{keep_state, Data1, [{next_event, internal, flush}]}; {keep_state, Data0, [{next_event, internal, flush}]};
false -> false ->
{keep_state, Data1} {keep_state, Data0}
end end
end; end;
do_flush(Data0, #{ do_flush(Data0, #{
@ -553,7 +552,6 @@ do_flush(Data0, #{
} = Data0, } = Data0,
QueryOpts = #{inflight_name => InflightTID}, QueryOpts = #{inflight_name => InflightTID},
Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
Data1 = cancel_flush_timer(Data0),
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller(Id, Result, Batch) of
%% Failed; remove the request from the queue, as we cannot pop %% Failed; remove the request from the queue, as we cannot pop
%% from it again. But we must ensure it's in the inflight %% from it again. But we must ensure it's in the inflight
@ -566,7 +564,7 @@ do_flush(Data0, #{
is_not_connected_result(Result), is_not_connected_result(Result),
ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Batch, Id, Index), ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Batch, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
{next_state, blocked, Data1}; {next_state, blocked, Data0};
%% Success; just ack. %% Success; just ack.
false -> false ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
@ -575,12 +573,12 @@ do_flush(Data0, #{
CurrentCount = replayq:count(Q1), CurrentCount = replayq:count(Q1),
case {CurrentCount > 0, CurrentCount >= BatchSize} of case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} -> {false, _} ->
{keep_state, Data1}; {keep_state, Data0};
{true, true} -> {true, true} ->
{keep_state, Data1, [{next_event, internal, flush}]}; {keep_state, Data0, [{next_event, internal, flush}]};
{true, false} -> {true, false} ->
Data2 = ensure_flush_timer(Data1), Data1 = ensure_flush_timer(Data0),
{keep_state, Data2} {keep_state, Data1}
end end
end. end.