refactor: rename inflight_name field to inflight_tid

This commit is contained in:
Thales Macedo Garitezi 2023-01-16 16:56:07 -03:00
parent 006b4bda97
commit bd0e2a74ba
1 changed files with 52 additions and 64 deletions

View File

@ -301,12 +301,22 @@ pick_cast(Id, Key, Query) ->
resume_from_blocked(Data) ->
#{inflight_tid := InflightTID} = Data,
case inflight_get_first_retriable(InflightTID) of
empty ->
{next_state, running, Data};
none ->
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
{next_state, running, Data}
end;
{Ref, FirstQuery} ->
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
retry_inflight_sync(Ref, FirstQuery, Data)
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
retry_inflight_sync(Ref, FirstQuery, Data)
end
end.
retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
@ -452,7 +462,7 @@ do_flush(
} = Data0,
%% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
QueryOpts = #{inflight_name => InflightTID},
QueryOpts = #{inflight_tid => InflightTID},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
case reply_caller(Id, Reply) of
@ -519,7 +529,7 @@ do_flush(Data0, #{
batch_size := BatchSize,
inflight_tid := InflightTID
} = Data0,
QueryOpts = #{inflight_name => InflightTID},
QueryOpts = #{inflight_tid => InflightTID},
Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
case batch_reply_caller(Id, Result, Batch) of
%% Failed; remove the request from the queue, as we cannot pop
@ -746,21 +756,15 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_query,
case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of
true ->
%% should be kept in the inflight table and retried
%% when resuming.
{async_return, inflight_full};
false ->
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_query(Id, Request, ResSt)
begin
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_query(Id, Request, ResSt)
end,
Request
);
@ -768,22 +772,18 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_query_async,
case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of
true ->
{async_return, inflight_full};
false ->
ReplyFun = fun ?MODULE:reply_after_query/7,
Args = [self(), Id, Index, InflightTID, Ref, Query],
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
{async_return, Result}
begin
ReplyFun = fun ?MODULE:reply_after_query/7,
Args = [self(), Id, Index, InflightTID, Ref, Query],
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
{async_return, Result}
end,
Request
);
@ -791,22 +791,16 @@ apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt,
?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
?APPLY_RESOURCE(
call_batch_query,
case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of
true ->
%% should be kept in the inflight table and retried
%% when resuming.
{async_return, inflight_full};
false ->
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_batch_query(Id, Requests, ResSt)
begin
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_batch_query(Id, Requests, ResSt)
end,
Batch
);
@ -814,23 +808,19 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_batch_query_async,
case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of
true ->
{async_return, inflight_full};
false ->
ReplyFun = fun ?MODULE:batch_reply_after_query/7,
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
{async_return, Result}
begin
ReplyFun = fun ?MODULE:batch_reply_after_query/7,
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
{async_return, Result}
end,
Batch
).
@ -935,7 +925,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
TableId.
-spec inflight_get_first_retriable(ets:tid()) ->
empty | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}.
none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}.
inflight_get_first_retriable(InflightTID) ->
MatchSpec =
ets:fun2ms(
@ -947,13 +937,11 @@ inflight_get_first_retriable(InflightTID) ->
),
case ets:select(InflightTID, MatchSpec, _Limit = 1) of
'$end_of_table' ->
empty;
none;
{[{Ref, BatchOrQuery}], _Continuation} ->
{Ref, BatchOrQuery}
end.
is_inflight_full(undefined) ->
false;
is_inflight_full(InflightTID) ->
[{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
%% we consider number of batches rather than number of messages