feat: poke the buffer workers when inflight is no longer full

if max inflight = 1, then we only make progress based on the state
timer, since the callbacks were not poking the buffer workers.
This commit is contained in:
Thales Macedo Garitezi 2023-01-16 18:12:30 -03:00
parent b5aaef084c
commit 3ba65c4377
1 changed files with 16 additions and 1 deletions

View File

@ -33,7 +33,8 @@
sync_query/3, sync_query/3,
async_query/3, async_query/3,
block/1, block/1,
resume/1 resume/1,
flush_worker/1
]). ]).
-export([ -export([
@ -152,6 +153,10 @@ block(ServerRef) ->
resume(ServerRef) -> resume(ServerRef) ->
gen_statem:cast(ServerRef, resume). gen_statem:cast(ServerRef, resume).
-spec flush_worker(pid()) -> ok.
flush_worker(ServerRef) ->
gen_statem:cast(ServerRef, flush).
-spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()). -spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()).
init({Id, Index, Opts}) -> init({Id, Index, Opts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
@ -196,6 +201,8 @@ running(enter, _, St) ->
maybe_flush(St); maybe_flush(St);
running(cast, resume, _St) -> running(cast, resume, _St) ->
keep_state_and_data; keep_state_and_data;
running(cast, flush, Data) ->
flush(Data);
running(cast, block, St) -> running(cast, block, St) ->
{next_state, blocked, St}; {next_state, blocked, St};
running(info, ?SEND_REQ(_From, _Req) = Request0, Data) -> running(info, ?SEND_REQ(_From, _Req) = Request0, Data) ->
@ -222,6 +229,8 @@ blocked(cast, block, _St) ->
keep_state_and_data; keep_state_and_data;
blocked(cast, resume, St) -> blocked(cast, resume, St) ->
resume_from_blocked(St); resume_from_blocked(St);
blocked(cast, flush, Data) ->
resume_from_blocked(Data);
blocked(state_timeout, unblock, St) -> blocked(state_timeout, unblock, St) ->
resume_from_blocked(St); resume_from_blocked(St);
blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -> blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) ->
@ -834,6 +843,7 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
%% returned, otherwise the request will get retried. The %% returned, otherwise the request will get retried. The
%% caller has just been notified of the failure and should %% caller has just been notified of the failure and should
%% decide if it wants to retry or not. %% decide if it wants to retry or not.
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso PostFn(), IsAcked andalso PostFn(),
case Action of case Action of
@ -850,6 +860,7 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
batch_or_query => ?QUERY(From, Request, HasBeenSent), batch_or_query => ?QUERY(From, Request, HasBeenSent),
result => Result result => Result
}), }),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
ok ok
end. end.
@ -862,6 +873,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
%% returned, otherwise the request will get retried. The %% returned, otherwise the request will get retried. The
%% caller has just been notified of the failure and should %% caller has just been notified of the failure and should
%% decide if it wants to retry or not. %% decide if it wants to retry or not.
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
case Action of case Action of
@ -874,6 +886,7 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
?tp(resource_worker_reply_after_query, #{ ?tp(resource_worker_reply_after_query, #{
action => ack, batch_or_query => Batch, result => Result action => ack, batch_or_query => Batch, result => Result
}), }),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
ok ok
end. end.
@ -942,6 +955,8 @@ inflight_get_first_retriable(InflightTID) ->
{Ref, BatchOrQuery} {Ref, BatchOrQuery}
end. end.
is_inflight_full(undefined) ->
false;
is_inflight_full(InflightTID) -> is_inflight_full(InflightTID) ->
[{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
%% we consider number of batches rather than number of messages %% we consider number of batches rather than number of messages