refactor: rename some variables and sum type constructors for clarity
This commit is contained in:
parent
7d798c10e9
commit
c74c93388e
|
@ -63,8 +63,8 @@
|
||||||
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
|
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
|
||||||
-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
|
-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
|
||||||
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
|
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
|
||||||
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
|
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
|
||||||
{Ref, BatchOrQuery, IsRetriable, WorkerMRef}
|
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
|
||||||
).
|
).
|
||||||
-define(ITEM_IDX, 2).
|
-define(ITEM_IDX, 2).
|
||||||
-define(RETRY_IDX, 3).
|
-define(RETRY_IDX, 3).
|
||||||
|
@ -350,8 +350,8 @@ resume_from_blocked(Data) ->
|
||||||
{next_state, running, Data}
|
{next_state, running, Data}
|
||||||
end;
|
end;
|
||||||
{expired, Ref, Batch} ->
|
{expired, Ref, Batch} ->
|
||||||
WorkerPid = self(),
|
BufferWorkerPid = self(),
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, WorkerPid),
|
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
||||||
Counters =
|
Counters =
|
||||||
case IsAcked of
|
case IsAcked of
|
||||||
true -> #{dropped_expired => length(Batch)};
|
true -> #{dropped_expired => length(Batch)};
|
||||||
|
@ -409,8 +409,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
||||||
{keep_state, Data1, {state_timeout, ResumeT, unblock}};
|
{keep_state, Data1, {state_timeout, ResumeT, unblock}};
|
||||||
%% Send ok or failed but the resource is working
|
%% Send ok or failed but the resource is working
|
||||||
ack ->
|
ack ->
|
||||||
WorkerPid = self(),
|
BufferWorkerPid = self(),
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, WorkerPid),
|
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
||||||
%% we need to defer bumping the counters after
|
%% we need to defer bumping the counters after
|
||||||
%% `inflight_drop' to avoid the race condition when an
|
%% `inflight_drop' to avoid the race condition when an
|
||||||
%% inflight request might get completed concurrently with
|
%% inflight request might get completed concurrently with
|
||||||
|
@ -587,16 +587,16 @@ do_flush(
|
||||||
%% we set it atomically just below; a limitation of having
|
%% we set it atomically just below; a limitation of having
|
||||||
%% to use tuples for atomic ets updates
|
%% to use tuples for atomic ets updates
|
||||||
IsRetriable = true,
|
IsRetriable = true,
|
||||||
WorkerMRef0 = undefined,
|
AsyncWorkerMRef0 = undefined,
|
||||||
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0),
|
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, AsyncWorkerMRef0),
|
||||||
%% we must append again to the table to ensure that the
|
%% we must append again to the table to ensure that the
|
||||||
%% request will be retried (i.e., it might not have been
|
%% request will be retried (i.e., it might not have been
|
||||||
%% inserted during `call_query' if the resource was down
|
%% inserted during `call_query' if the resource was down
|
||||||
%% and/or if it was a sync request).
|
%% and/or if it was a sync request).
|
||||||
inflight_append(InflightTID, InflightItem),
|
inflight_append(InflightTID, InflightItem),
|
||||||
mark_inflight_as_retriable(InflightTID, Ref),
|
mark_inflight_as_retriable(InflightTID, Ref),
|
||||||
{Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
||||||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
|
||||||
?tp(
|
?tp(
|
||||||
buffer_worker_flush_nack,
|
buffer_worker_flush_nack,
|
||||||
#{
|
#{
|
||||||
|
@ -615,17 +615,17 @@ do_flush(
|
||||||
%% must ensure the async worker is being monitored for
|
%% must ensure the async worker is being monitored for
|
||||||
%% such requests.
|
%% such requests.
|
||||||
IsUnrecoverableError = is_unrecoverable_error(Result),
|
IsUnrecoverableError = is_unrecoverable_error(Result),
|
||||||
WorkerPid = self(),
|
BufferWorkerPid = self(),
|
||||||
case is_async_return(Result) of
|
case is_async_return(Result) of
|
||||||
true when IsUnrecoverableError ->
|
true when IsUnrecoverableError ->
|
||||||
ack_inflight(InflightTID, Ref, WorkerPid);
|
ack_inflight(InflightTID, Ref, BufferWorkerPid);
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ack_inflight(InflightTID, Ref, WorkerPid)
|
ack_inflight(InflightTID, Ref, BufferWorkerPid)
|
||||||
end,
|
end,
|
||||||
{Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
||||||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
|
||||||
?tp(
|
?tp(
|
||||||
buffer_worker_flush_ack,
|
buffer_worker_flush_ack,
|
||||||
#{
|
#{
|
||||||
|
@ -672,16 +672,16 @@ do_flush(#{queue := Q1} = Data0, #{
|
||||||
%% we set it atomically just below; a limitation of having
|
%% we set it atomically just below; a limitation of having
|
||||||
%% to use tuples for atomic ets updates
|
%% to use tuples for atomic ets updates
|
||||||
IsRetriable = true,
|
IsRetriable = true,
|
||||||
WorkerMRef0 = undefined,
|
AsyncWorkerMRef0 = undefined,
|
||||||
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0),
|
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef0),
|
||||||
%% we must append again to the table to ensure that the
|
%% we must append again to the table to ensure that the
|
||||||
%% request will be retried (i.e., it might not have been
|
%% request will be retried (i.e., it might not have been
|
||||||
%% inserted during `call_query' if the resource was down
|
%% inserted during `call_query' if the resource was down
|
||||||
%% and/or if it was a sync request).
|
%% and/or if it was a sync request).
|
||||||
inflight_append(InflightTID, InflightItem),
|
inflight_append(InflightTID, InflightItem),
|
||||||
mark_inflight_as_retriable(InflightTID, Ref),
|
mark_inflight_as_retriable(InflightTID, Ref),
|
||||||
{Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
||||||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
|
||||||
?tp(
|
?tp(
|
||||||
buffer_worker_flush_nack,
|
buffer_worker_flush_nack,
|
||||||
#{
|
#{
|
||||||
|
@ -700,17 +700,17 @@ do_flush(#{queue := Q1} = Data0, #{
|
||||||
%% must ensure the async worker is being monitored for
|
%% must ensure the async worker is being monitored for
|
||||||
%% such requests.
|
%% such requests.
|
||||||
IsUnrecoverableError = is_unrecoverable_error(Result),
|
IsUnrecoverableError = is_unrecoverable_error(Result),
|
||||||
WorkerPid = self(),
|
BufferWorkerPid = self(),
|
||||||
case is_async_return(Result) of
|
case is_async_return(Result) of
|
||||||
true when IsUnrecoverableError ->
|
true when IsUnrecoverableError ->
|
||||||
ack_inflight(InflightTID, Ref, WorkerPid);
|
ack_inflight(InflightTID, Ref, BufferWorkerPid);
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ack_inflight(InflightTID, Ref, WorkerPid)
|
ack_inflight(InflightTID, Ref, BufferWorkerPid)
|
||||||
end,
|
end,
|
||||||
{Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
{Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
|
||||||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
|
||||||
CurrentCount = queue_count(Q1),
|
CurrentCount = queue_count(Q1),
|
||||||
?tp(
|
?tp(
|
||||||
buffer_worker_flush_ack,
|
buffer_worker_flush_ack,
|
||||||
|
@ -966,9 +966,9 @@ set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := Infli
|
||||||
|
|
||||||
handle_async_worker_down(Data0, Pid) ->
|
handle_async_worker_down(Data0, Pid) ->
|
||||||
#{async_workers := AsyncWorkers0} = Data0,
|
#{async_workers := AsyncWorkers0} = Data0,
|
||||||
{WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
|
{AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
|
||||||
Data = Data0#{async_workers := AsyncWorkers},
|
Data = Data0#{async_workers := AsyncWorkers},
|
||||||
mark_inflight_items_as_retriable(Data, WorkerMRef),
|
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef),
|
||||||
{keep_state, Data}.
|
{keep_state, Data}.
|
||||||
|
|
||||||
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
|
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
|
||||||
|
@ -1046,8 +1046,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
|
||||||
min_query => minimize(Query)
|
min_query => minimize(Query)
|
||||||
},
|
},
|
||||||
IsRetriable = false,
|
IsRetriable = false,
|
||||||
WorkerMRef = undefined,
|
AsyncWorkerMRef = undefined,
|
||||||
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
|
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
|
||||||
ok = inflight_append(InflightTID, InflightItem),
|
ok = inflight_append(InflightTID, InflightItem),
|
||||||
Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt),
|
Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt),
|
||||||
{async_return, Result}
|
{async_return, Result}
|
||||||
|
@ -1082,8 +1082,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
|
||||||
fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
|
fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
|
||||||
),
|
),
|
||||||
IsRetriable = false,
|
IsRetriable = false,
|
||||||
WorkerMRef = undefined,
|
AsyncWorkerMRef = undefined,
|
||||||
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
|
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
|
||||||
ok = inflight_append(InflightTID, InflightItem),
|
ok = inflight_append(InflightTID, InflightItem),
|
||||||
Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt),
|
Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt),
|
||||||
{async_return, Result}
|
{async_return, Result}
|
||||||
|
@ -1111,7 +1111,7 @@ handle_async_reply1(
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
buffer_worker := WorkerPid,
|
buffer_worker := BufferWorkerPid,
|
||||||
min_query := ?QUERY(_, _, _, ExpireAt) = _Query
|
min_query := ?QUERY(_, _, _, ExpireAt) = _Query
|
||||||
} = ReplyContext,
|
} = ReplyContext,
|
||||||
Result
|
Result
|
||||||
|
@ -1123,7 +1123,7 @@ handle_async_reply1(
|
||||||
Now = now_(),
|
Now = now_(),
|
||||||
case is_expired(ExpireAt, Now) of
|
case is_expired(ExpireAt, Now) of
|
||||||
true ->
|
true ->
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, WorkerPid),
|
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
||||||
%% evalutate metrics call here since we're not inside
|
%% evalutate metrics call here since we're not inside
|
||||||
%% buffer worker
|
%% buffer worker
|
||||||
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
||||||
|
@ -1138,7 +1138,7 @@ do_handle_async_reply(
|
||||||
query_opts := QueryOpts,
|
query_opts := QueryOpts,
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
buffer_worker := WorkerPid,
|
buffer_worker := BufferWorkerPid,
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
|
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
|
||||||
},
|
},
|
||||||
|
@ -1161,10 +1161,12 @@ do_handle_async_reply(
|
||||||
nack ->
|
nack ->
|
||||||
%% Keep retrying.
|
%% Keep retrying.
|
||||||
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
||||||
ok = ?MODULE:block(WorkerPid),
|
ok = ?MODULE:block(BufferWorkerPid),
|
||||||
blocked;
|
blocked;
|
||||||
ack ->
|
ack ->
|
||||||
ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts)
|
ok = do_async_ack(
|
||||||
|
InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts
|
||||||
|
)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_async_batch_reply(
|
handle_async_batch_reply(
|
||||||
|
@ -1213,10 +1215,10 @@ handle_async_batch_reply2([], _, _, _) ->
|
||||||
%% this usually should never happen unless the async callback is being evaluated concurrently
|
%% this usually should never happen unless the async callback is being evaluated concurrently
|
||||||
ok;
|
ok;
|
||||||
handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,
|
?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight,
|
||||||
#{
|
#{
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
buffer_worker := WorkerPid,
|
buffer_worker := BufferWorkerPid,
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
min_batch := Batch
|
min_batch := Batch
|
||||||
|
@ -1241,7 +1243,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
case RealNotExpired of
|
case RealNotExpired of
|
||||||
[] ->
|
[] ->
|
||||||
%% all expired, no need to update back the inflight batch
|
%% all expired, no need to update back the inflight batch
|
||||||
_ = ack_inflight(InflightTID, Ref, WorkerPid),
|
_ = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
%% some queries are not expired, put them back to the inflight batch
|
%% some queries are not expired, put them back to the inflight batch
|
||||||
|
@ -1252,7 +1254,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
|
|
||||||
do_handle_async_batch_reply(
|
do_handle_async_batch_reply(
|
||||||
#{
|
#{
|
||||||
buffer_worker := WorkerPid,
|
buffer_worker := BufferWorkerPid,
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
|
@ -1274,14 +1276,16 @@ do_handle_async_batch_reply(
|
||||||
nack ->
|
nack ->
|
||||||
%% Keep retrying.
|
%% Keep retrying.
|
||||||
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
||||||
ok = ?MODULE:block(WorkerPid),
|
ok = ?MODULE:block(BufferWorkerPid),
|
||||||
blocked;
|
blocked;
|
||||||
ack ->
|
ack ->
|
||||||
ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts)
|
ok = do_async_ack(
|
||||||
|
InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts
|
||||||
|
)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) ->
|
do_async_ack(InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts) ->
|
||||||
IsKnownRef = ack_inflight(InflightTID, Ref, WorkerPid),
|
IsKnownRef = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
||||||
case maps:get(simple_query, QueryOpts, false) of
|
case maps:get(simple_query, QueryOpts, false) of
|
||||||
true ->
|
true ->
|
||||||
PostFn(),
|
PostFn(),
|
||||||
|
@ -1397,7 +1401,7 @@ inflight_new(InfltWinSZ) ->
|
||||||
inflight_get_first_retriable(InflightTID, Now) ->
|
inflight_get_first_retriable(InflightTID, Now) ->
|
||||||
MatchSpec =
|
MatchSpec =
|
||||||
ets:fun2ms(
|
ets:fun2ms(
|
||||||
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when
|
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _AsyncWorkerMRef)) when
|
||||||
IsRetriable =:= true
|
IsRetriable =:= true
|
||||||
->
|
->
|
||||||
{Ref, BatchOrQuery}
|
{Ref, BatchOrQuery}
|
||||||
|
@ -1442,10 +1446,10 @@ inflight_append(undefined, _InflightItem) ->
|
||||||
ok;
|
ok;
|
||||||
inflight_append(
|
inflight_append(
|
||||||
InflightTID,
|
InflightTID,
|
||||||
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef)
|
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef)
|
||||||
) ->
|
) ->
|
||||||
Batch = mark_as_sent(Batch0),
|
Batch = mark_as_sent(Batch0),
|
||||||
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
|
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
|
||||||
IsNew = ets:insert_new(InflightTID, InflightItem),
|
IsNew = ets:insert_new(InflightTID, InflightItem),
|
||||||
BatchSize = length(Batch),
|
BatchSize = length(Batch),
|
||||||
IsNew andalso inc_inflight(InflightTID, BatchSize),
|
IsNew andalso inc_inflight(InflightTID, BatchSize),
|
||||||
|
@ -1454,11 +1458,11 @@ inflight_append(
|
||||||
inflight_append(
|
inflight_append(
|
||||||
InflightTID,
|
InflightTID,
|
||||||
?INFLIGHT_ITEM(
|
?INFLIGHT_ITEM(
|
||||||
Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
|
Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef
|
||||||
)
|
)
|
||||||
) ->
|
) ->
|
||||||
Query = mark_as_sent(Query0),
|
Query = mark_as_sent(Query0),
|
||||||
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
|
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
|
||||||
IsNew = ets:insert_new(InflightTID, InflightItem),
|
IsNew = ets:insert_new(InflightTID, InflightItem),
|
||||||
IsNew andalso inc_inflight(InflightTID, 1),
|
IsNew andalso inc_inflight(InflightTID, 1),
|
||||||
?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
|
?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
|
||||||
|
@ -1481,67 +1485,67 @@ mark_inflight_as_retriable(InflightTID, Ref) ->
|
||||||
|
|
||||||
%% Track each worker pid only once.
|
%% Track each worker pid only once.
|
||||||
ensure_async_worker_monitored(
|
ensure_async_worker_monitored(
|
||||||
Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, WorkerPid}} = _Result
|
Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, AsyncWorkerPid}} = _Result
|
||||||
) when
|
) when
|
||||||
is_pid(WorkerPid), is_map_key(WorkerPid, AsyncWorkers)
|
is_pid(AsyncWorkerPid), is_map_key(AsyncWorkerPid, AsyncWorkers)
|
||||||
->
|
->
|
||||||
WorkerMRef = maps:get(WorkerPid, AsyncWorkers),
|
AsyncWorkerMRef = maps:get(AsyncWorkerPid, AsyncWorkers),
|
||||||
{Data0, WorkerMRef};
|
{Data0, AsyncWorkerMRef};
|
||||||
ensure_async_worker_monitored(
|
ensure_async_worker_monitored(
|
||||||
Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, WorkerPid}}
|
Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, AsyncWorkerPid}}
|
||||||
) when
|
) when
|
||||||
is_pid(WorkerPid)
|
is_pid(AsyncWorkerPid)
|
||||||
->
|
->
|
||||||
WorkerMRef = monitor(process, WorkerPid),
|
AsyncWorkerMRef = monitor(process, AsyncWorkerPid),
|
||||||
AsyncWorkers = AsyncWorkers0#{WorkerPid => WorkerMRef},
|
AsyncWorkers = AsyncWorkers0#{AsyncWorkerPid => AsyncWorkerMRef},
|
||||||
Data = Data0#{async_workers := AsyncWorkers},
|
Data = Data0#{async_workers := AsyncWorkers},
|
||||||
{Data, WorkerMRef};
|
{Data, AsyncWorkerMRef};
|
||||||
ensure_async_worker_monitored(Data0, _Result) ->
|
ensure_async_worker_monitored(Data0, _Result) ->
|
||||||
{Data0, undefined}.
|
{Data0, undefined}.
|
||||||
|
|
||||||
-spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) ->
|
-spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) ->
|
||||||
ok.
|
ok.
|
||||||
store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) ->
|
store_async_worker_reference(undefined = _InflightTID, _Ref, _AsyncWorkerMRef) ->
|
||||||
ok;
|
ok;
|
||||||
store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
|
store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
|
||||||
ok;
|
ok;
|
||||||
store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
|
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) when
|
||||||
is_reference(WorkerMRef)
|
is_reference(AsyncWorkerMRef)
|
||||||
->
|
->
|
||||||
_ = ets:update_element(
|
_ = ets:update_element(
|
||||||
InflightTID, Ref, {?WORKER_MREF_IDX, WorkerMRef}
|
InflightTID, Ref, {?WORKER_MREF_IDX, AsyncWorkerMRef}
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
ack_inflight(undefined, _Ref, _WorkerPid) ->
|
ack_inflight(undefined, _Ref, _BufferWorkerPid) ->
|
||||||
false;
|
false;
|
||||||
ack_inflight(InflightTID, Ref, WorkerPid) ->
|
ack_inflight(InflightTID, Ref, BufferWorkerPid) ->
|
||||||
{Count, Removed} =
|
{Count, Removed} =
|
||||||
case ets:take(InflightTID, Ref) of
|
case ets:take(InflightTID, Ref) of
|
||||||
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
|
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] ->
|
||||||
{1, true};
|
{1, true};
|
||||||
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
|
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] ->
|
||||||
{length(Batch), true};
|
{length(Batch), true};
|
||||||
[] ->
|
[] ->
|
||||||
{0, false}
|
{0, false}
|
||||||
end,
|
end,
|
||||||
FlushCheck = dec_inflight_remove(InflightTID, Count, Removed),
|
FlushCheck = dec_inflight_remove(InflightTID, Count, Removed),
|
||||||
case FlushCheck of
|
case FlushCheck of
|
||||||
continue -> ok;
|
no_flush -> ok;
|
||||||
flush -> ?MODULE:flush_worker(WorkerPid)
|
flush -> ?MODULE:flush_worker(BufferWorkerPid)
|
||||||
end,
|
end,
|
||||||
IsKnownRef = (Count > 0),
|
IsKnownRef = (Count > 0),
|
||||||
IsKnownRef.
|
IsKnownRef.
|
||||||
|
|
||||||
mark_inflight_items_as_retriable(Data, WorkerMRef) ->
|
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) ->
|
||||||
#{inflight_tid := InflightTID} = Data,
|
#{inflight_tid := InflightTID} = Data,
|
||||||
IsRetriable = true,
|
IsRetriable = true,
|
||||||
MatchSpec =
|
MatchSpec =
|
||||||
ets:fun2ms(
|
ets:fun2ms(
|
||||||
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, WorkerMRef0)) when
|
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, AsyncWorkerMRef0)) when
|
||||||
WorkerMRef =:= WorkerMRef0
|
AsyncWorkerMRef =:= AsyncWorkerMRef0
|
||||||
->
|
->
|
||||||
?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef0)
|
?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef0)
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
|
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
|
||||||
|
@ -1559,9 +1563,9 @@ inc_inflight(InflightTID, Count) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
|
-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
|
||||||
continue | flush.
|
no_flush | flush.
|
||||||
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
|
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
|
||||||
continue;
|
no_flush;
|
||||||
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
|
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
|
||||||
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
||||||
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
|
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
|
||||||
|
@ -1570,7 +1574,7 @@ dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
|
||||||
%% make it continue flushing.
|
%% make it continue flushing.
|
||||||
case NewValue =:= MaxValue - 1 of
|
case NewValue =:= MaxValue - 1 of
|
||||||
true -> flush;
|
true -> flush;
|
||||||
false -> continue
|
false -> no_flush
|
||||||
end;
|
end;
|
||||||
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
|
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
|
||||||
%% If Count > 0, it must have been removed
|
%% If Count > 0, it must have been removed
|
||||||
|
@ -1582,7 +1586,7 @@ dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
|
||||||
%% make it continue flushing.
|
%% make it continue flushing.
|
||||||
case NewValue =:= MaxValue - 1 of
|
case NewValue =:= MaxValue - 1 of
|
||||||
true -> flush;
|
true -> flush;
|
||||||
false -> continue
|
false -> no_flush
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dec_inflight_update(_InflightTID, _Count = 0) ->
|
dec_inflight_update(_InflightTID, _Count = 0) ->
|
||||||
|
|
Loading…
Reference in New Issue