refactor: rename a few functions
This commit is contained in:
parent
cdd8de11b0
commit
fecdbac9a8
|
@ -210,9 +210,9 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
|
||||||
blocked(cast, block, _St) ->
|
blocked(cast, block, _St) ->
|
||||||
keep_state_and_data;
|
keep_state_and_data;
|
||||||
blocked(cast, resume, St) ->
|
blocked(cast, resume, St) ->
|
||||||
do_resume(St);
|
resume_from_blocked(St);
|
||||||
blocked(state_timeout, resume, St) ->
|
blocked(state_timeout, resume, St) ->
|
||||||
do_resume(St);
|
resume_from_blocked(St);
|
||||||
blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) ->
|
blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) ->
|
||||||
#{
|
#{
|
||||||
id := Id,
|
id := Id,
|
||||||
|
@ -227,6 +227,7 @@ blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) ->
|
||||||
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
|
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
|
||||||
HasBeenSent = false,
|
HasBeenSent = false,
|
||||||
_ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)),
|
_ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)),
|
||||||
|
%% TODO collect requests
|
||||||
NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]),
|
NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]),
|
||||||
Data = Data0#{queue := NewQ},
|
Data = Data0#{queue := NewQ},
|
||||||
{keep_state, Data};
|
{keep_state, Data};
|
||||||
|
@ -291,14 +292,14 @@ pick_cast(Id, Key, Query) ->
|
||||||
ok
|
ok
|
||||||
end).
|
end).
|
||||||
|
|
||||||
do_resume(#{id := Id, inflight_tid := InflightTID} = Data) ->
|
resume_from_blocked(Data) ->
|
||||||
case inflight_get_first(InflightTID) of
|
case inflight_get_first(InflightTID) of
|
||||||
empty ->
|
empty ->
|
||||||
retry_queue(Data);
|
retry_queue(Data);
|
||||||
{Ref, FirstQuery} ->
|
{Ref, FirstQuery} ->
|
||||||
%% We retry msgs in inflight window sync, as if we send them
|
%% We retry msgs in inflight window sync, as if we send them
|
||||||
%% async, they will be appended to the end of inflight window again.
|
%% async, they will be appended to the end of inflight window again.
|
||||||
retry_inflight_sync(Id, Ref, FirstQuery, InflightTID, Data)
|
retry_inflight_sync(Ref, FirstQuery, Data)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_queue(
|
retry_queue(
|
||||||
|
@ -387,13 +388,9 @@ retry_queue(
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_inflight_sync(
|
retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
||||||
Id,
|
#{id := Id, inflight_tid := InflightTID, index := Index, resume_interval := ResumeT} =
|
||||||
Ref,
|
Data0
|
||||||
QueryOrBatch,
|
|
||||||
InflightTID,
|
|
||||||
#{index := Index, resume_interval := ResumeT} = Data0
|
|
||||||
) ->
|
|
||||||
QueryOpts = #{},
|
QueryOpts = #{},
|
||||||
%% if we are retrying an inflight query, it has been sent
|
%% if we are retrying an inflight query, it has been sent
|
||||||
HasBeenSent = true,
|
HasBeenSent = true,
|
||||||
|
@ -416,7 +413,7 @@ retry_inflight_sync(
|
||||||
%% we bump the counter when removing it from the table.
|
%% we bump the counter when removing it from the table.
|
||||||
IsDropped andalso PostFn(),
|
IsDropped andalso PostFn(),
|
||||||
?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}),
|
?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}),
|
||||||
do_resume(Data0)
|
resume_from_blocked(Data0)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Called during the `running' state only.
|
%% Called during the `running' state only.
|
||||||
|
|
Loading…
Reference in New Issue