refactor: remove redundant function
`retry_queue` does basically what the running state does, now that we refactored the buffer workers to always use the queue.
This commit is contained in:
parent
d6a9d0aa48
commit
d4724d6ce9
|
@ -298,99 +298,13 @@ resume_from_blocked(Data) ->
|
||||||
#{inflight_tid := InflightTID} = Data,
|
#{inflight_tid := InflightTID} = Data,
|
||||||
case inflight_get_first(InflightTID) of
|
case inflight_get_first(InflightTID) of
|
||||||
empty ->
|
empty ->
|
||||||
retry_queue(Data);
|
{next_state, running, Data};
|
||||||
{Ref, FirstQuery} ->
|
{Ref, FirstQuery} ->
|
||||||
%% We retry msgs in inflight window sync, as if we send them
|
%% We retry msgs in inflight window sync, as if we send them
|
||||||
%% async, they will be appended to the end of inflight window again.
|
%% async, they will be appended to the end of inflight window again.
|
||||||
retry_inflight_sync(Ref, FirstQuery, Data)
|
retry_inflight_sync(Ref, FirstQuery, Data)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_queue(
|
|
||||||
#{
|
|
||||||
queue := Q0,
|
|
||||||
id := Id,
|
|
||||||
index := Index,
|
|
||||||
batch_size := 1,
|
|
||||||
inflight_tid := InflightTID,
|
|
||||||
resume_interval := ResumeT
|
|
||||||
} = Data0
|
|
||||||
) ->
|
|
||||||
%% no batching
|
|
||||||
case get_first_n_from_queue(Q0, 1) of
|
|
||||||
empty ->
|
|
||||||
{next_state, running, Data0};
|
|
||||||
{Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
|
|
||||||
Data = Data0#{queue := Q1},
|
|
||||||
QueryOpts = #{inflight_name => InflightTID},
|
|
||||||
Ref = make_message_ref(),
|
|
||||||
Result = call_query(configured, Id, Index, Ref, Query, QueryOpts),
|
|
||||||
Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
|
|
||||||
case reply_caller(Id, Reply) of
|
|
||||||
true ->
|
|
||||||
%% Still failed, but now it's in the inflight
|
|
||||||
%% table and marked as sent, except if the result
|
|
||||||
%% says inflight is full. In this case, we must
|
|
||||||
%% ensure it's indeed in the inflight table or
|
|
||||||
%% risk lose it.
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
|
||||||
is_inflight_full_result(Result) andalso
|
|
||||||
inflight_append(InflightTID, Ref, Query, Id, Index),
|
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
|
||||||
{keep_state, Data, {state_timeout, ResumeT, resume}};
|
|
||||||
false ->
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
|
||||||
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index),
|
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
|
||||||
retry_queue(Data)
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
retry_queue(
|
|
||||||
#{
|
|
||||||
queue := Q,
|
|
||||||
id := Id,
|
|
||||||
index := Index,
|
|
||||||
batch_size := BatchSize,
|
|
||||||
inflight_tid := InflightTID,
|
|
||||||
resume_interval := ResumeT
|
|
||||||
} = Data0
|
|
||||||
) ->
|
|
||||||
%% batching
|
|
||||||
case get_first_n_from_queue(Q, BatchSize) of
|
|
||||||
empty ->
|
|
||||||
{next_state, running, Data0};
|
|
||||||
{Q1, QAckRef, Batch0} ->
|
|
||||||
Data = Data0#{queue := Q1},
|
|
||||||
QueryOpts = #{inflight_name => InflightTID},
|
|
||||||
Ref = make_message_ref(),
|
|
||||||
Result = call_query(configured, Id, Index, Ref, Batch0, QueryOpts),
|
|
||||||
%% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
|
|
||||||
%% we now change the 'from' field to 'undefined' so it will not reply the caller again.
|
|
||||||
Batch = [
|
|
||||||
?QUERY(undefined, Request, HasBeenSent0)
|
|
||||||
|| ?QUERY(_, Request, HasBeenSent0) <- Batch0
|
|
||||||
],
|
|
||||||
case batch_reply_caller(Id, Result, Batch) of
|
|
||||||
true ->
|
|
||||||
?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}),
|
|
||||||
%% Still failed, but now it's in the inflight
|
|
||||||
%% table and marked as sent, except if the result
|
|
||||||
%% says inflight is full. In this case, we must
|
|
||||||
%% ensure it's indeed in the inflight table or
|
|
||||||
%% risk lose it.
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
|
||||||
is_inflight_full_result(Result) andalso
|
|
||||||
inflight_append(InflightTID, Ref, Batch, Id, Index),
|
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
|
||||||
{keep_state, Data, {state_timeout, ResumeT, resume}};
|
|
||||||
false ->
|
|
||||||
?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}),
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
|
||||||
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index),
|
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
|
||||||
retry_queue(Data)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
||||||
#{
|
#{
|
||||||
id := Id,
|
id := Id,
|
||||||
|
@ -891,16 +805,6 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
|
||||||
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
|
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
|
||||||
Q2.
|
Q2.
|
||||||
|
|
||||||
-spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
|
|
||||||
empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}.
|
|
||||||
get_first_n_from_queue(Q, N) ->
|
|
||||||
case queue_count(Q) of
|
|
||||||
0 ->
|
|
||||||
empty;
|
|
||||||
_ ->
|
|
||||||
replayq:pop(Q, #{count_limit => N})
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%==============================================================================
|
%%==============================================================================
|
||||||
%% the inflight queue for async query
|
%% the inflight queue for async query
|
||||||
-define(MAX_SIZE_REF, -1).
|
-define(MAX_SIZE_REF, -1).
|
||||||
|
|
Loading…
Reference in New Issue