diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 993e69749..47769418b 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -63,8 +63,8 @@ -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). -define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)). -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). --define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), - {Ref, BatchOrQuery, IsRetriable, WorkerMRef} +-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), + {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} ). -define(ITEM_IDX, 2). -define(RETRY_IDX, 3). @@ -350,8 +350,8 @@ resume_from_blocked(Data) -> {next_state, running, Data} end; {expired, Ref, Batch} -> - WorkerPid = self(), - IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + BufferWorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), Counters = case IsAcked of true -> #{dropped_expired => length(Batch)}; @@ -409,8 +409,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> {keep_state, Data1, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working ack -> - WorkerPid = self(), - IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + BufferWorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an %% inflight request might get completed concurrently with @@ -587,16 +587,16 @@ do_flush( %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates IsRetriable = true, - WorkerMRef0 = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0), + AsyncWorkerMRef0 = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, AsyncWorkerMRef0), %% we must append again to the table to ensure that the %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_nack, #{ @@ -615,17 +615,17 @@ do_flush( %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), - WorkerPid = self(), + BufferWorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, WorkerPid); + ack_inflight(InflightTID, Ref, BufferWorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, WorkerPid) + ack_inflight(InflightTID, Ref, BufferWorkerPid) end, - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_ack, #{ @@ -672,16 +672,16 @@ do_flush(#{queue := Q1} = Data0, #{ %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates IsRetriable = true, - WorkerMRef0 = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0), + AsyncWorkerMRef0 = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef0), %% we must append again to the table to ensure that the %% request will be retried (i.e., it might not have been %% inserted during `call_query' if the resource was down %% and/or if it was a sync request). inflight_append(InflightTID, InflightItem), mark_inflight_as_retriable(InflightTID, Ref), - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), ?tp( buffer_worker_flush_nack, #{ @@ -700,17 +700,17 @@ do_flush(#{queue := Q1} = Data0, #{ %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), - WorkerPid = self(), + BufferWorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, WorkerPid); + ack_inflight(InflightTID, Ref, BufferWorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, WorkerPid) + ack_inflight(InflightTID, Ref, BufferWorkerPid) end, - {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result), - store_async_worker_reference(InflightTID, Ref, WorkerMRef), + {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result), + store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef), CurrentCount = queue_count(Q1), ?tp( buffer_worker_flush_ack, @@ -966,9 +966,9 @@ set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := Infli handle_async_worker_down(Data0, Pid) -> #{async_workers := AsyncWorkers0} = Data0, - {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), + {AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), Data = Data0#{async_workers := AsyncWorkers}, - mark_inflight_items_as_retriable(Data, WorkerMRef), + mark_inflight_items_as_retriable(Data, AsyncWorkerMRef), {keep_state, Data}. -spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _. @@ -1046,8 +1046,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re min_query => minimize(Query) }, IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), + AsyncWorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} @@ -1082,8 +1082,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch ), IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), + AsyncWorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} @@ -1111,7 +1111,7 @@ handle_async_reply1( request_ref := Ref, inflight_tid := InflightTID, resource_id := Id, - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result @@ -1123,7 +1123,7 @@ handle_async_reply1( Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsAcked = ack_inflight(InflightTID, Ref, WorkerPid), + IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), %% evalutate metrics call here since we're not inside %% buffer worker IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), @@ -1138,7 +1138,7 @@ do_handle_async_reply( query_opts := QueryOpts, resource_id := Id, request_ref := Ref, - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, @@ -1161,10 +1161,12 @@ do_handle_async_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(WorkerPid), + ok = ?MODULE:block(BufferWorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) + ok = do_async_ack( + InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts + ) end. handle_async_batch_reply( @@ -1213,10 +1215,10 @@ handle_async_batch_reply2([], _, _, _) -> %% this usually should never happen unless the async callback is being evaluated concurrently ok; handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> - ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, + ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight, #{ resource_id := Id, - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch @@ -1241,7 +1243,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - _ = ack_inflight(InflightTID, Ref, WorkerPid), + _ = ack_inflight(InflightTID, Ref, BufferWorkerPid), ok; _ -> %% some queries are not expired, put them back to the inflight batch @@ -1252,7 +1254,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> do_handle_async_batch_reply( #{ - buffer_worker := WorkerPid, + buffer_worker := BufferWorkerPid, resource_id := Id, inflight_tid := InflightTID, request_ref := Ref, @@ -1274,14 +1276,16 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(WorkerPid), + ok = ?MODULE:block(BufferWorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) + ok = do_async_ack( + InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts + ) end. -do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) -> - IsKnownRef = ack_inflight(InflightTID, Ref, WorkerPid), +do_async_ack(InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts) -> + IsKnownRef = ack_inflight(InflightTID, Ref, BufferWorkerPid), case maps:get(simple_query, QueryOpts, false) of true -> PostFn(), @@ -1397,7 +1401,7 @@ inflight_new(InfltWinSZ) -> inflight_get_first_retriable(InflightTID, Now) -> MatchSpec = ets:fun2ms( - fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _AsyncWorkerMRef)) when IsRetriable =:= true -> {Ref, BatchOrQuery} @@ -1442,10 +1446,10 @@ inflight_append(undefined, _InflightItem) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef) + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef) ) -> Batch = mark_as_sent(Batch0), - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), IsNew andalso inc_inflight(InflightTID, BatchSize), @@ -1454,11 +1458,11 @@ inflight_append( inflight_append( InflightTID, ?INFLIGHT_ITEM( - Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef + Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef ) ) -> Query = mark_as_sent(Query0), - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso inc_inflight(InflightTID, 1), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), @@ -1481,67 +1485,67 @@ mark_inflight_as_retriable(InflightTID, Ref) -> %% Track each worker pid only once. ensure_async_worker_monitored( - Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, WorkerPid}} = _Result + Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, AsyncWorkerPid}} = _Result ) when - is_pid(WorkerPid), is_map_key(WorkerPid, AsyncWorkers) + is_pid(AsyncWorkerPid), is_map_key(AsyncWorkerPid, AsyncWorkers) -> - WorkerMRef = maps:get(WorkerPid, AsyncWorkers), - {Data0, WorkerMRef}; + AsyncWorkerMRef = maps:get(AsyncWorkerPid, AsyncWorkers), + {Data0, AsyncWorkerMRef}; ensure_async_worker_monitored( - Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, WorkerPid}} + Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, AsyncWorkerPid}} ) when - is_pid(WorkerPid) + is_pid(AsyncWorkerPid) -> - WorkerMRef = monitor(process, WorkerPid), - AsyncWorkers = AsyncWorkers0#{WorkerPid => WorkerMRef}, + AsyncWorkerMRef = monitor(process, AsyncWorkerPid), + AsyncWorkers = AsyncWorkers0#{AsyncWorkerPid => AsyncWorkerMRef}, Data = Data0#{async_workers := AsyncWorkers}, - {Data, WorkerMRef}; + {Data, AsyncWorkerMRef}; ensure_async_worker_monitored(Data0, _Result) -> {Data0, undefined}. -spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) -> ok. -store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) -> +store_async_worker_reference(undefined = _InflightTID, _Ref, _AsyncWorkerMRef) -> ok; store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) -> ok; -store_async_worker_reference(InflightTID, Ref, WorkerMRef) when - is_reference(WorkerMRef) +store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) when + is_reference(AsyncWorkerMRef) -> _ = ets:update_element( - InflightTID, Ref, {?WORKER_MREF_IDX, WorkerMRef} + InflightTID, Ref, {?WORKER_MREF_IDX, AsyncWorkerMRef} ), ok. -ack_inflight(undefined, _Ref, _WorkerPid) -> +ack_inflight(undefined, _Ref, _BufferWorkerPid) -> false; -ack_inflight(InflightTID, Ref, WorkerPid) -> +ack_inflight(InflightTID, Ref, BufferWorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] -> {1, true}; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] -> {length(Batch), true}; [] -> {0, false} end, FlushCheck = dec_inflight_remove(InflightTID, Count, Removed), case FlushCheck of - continue -> ok; - flush -> ?MODULE:flush_worker(WorkerPid) + no_flush -> ok; + flush -> ?MODULE:flush_worker(BufferWorkerPid) end, IsKnownRef = (Count > 0), IsKnownRef. -mark_inflight_items_as_retriable(Data, WorkerMRef) -> +mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) -> #{inflight_tid := InflightTID} = Data, IsRetriable = true, MatchSpec = ets:fun2ms( - fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, WorkerMRef0)) when - WorkerMRef =:= WorkerMRef0 + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, AsyncWorkerMRef0)) when + AsyncWorkerMRef =:= AsyncWorkerMRef0 -> - ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef0) + ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef0) end ), _NumAffected = ets:select_replace(InflightTID, MatchSpec), @@ -1559,9 +1563,9 @@ inc_inflight(InflightTID, Count) -> ok. -spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) -> - continue | flush. + no_flush | flush. dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) -> - continue; + no_flush; dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), @@ -1570,7 +1574,7 @@ dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> %% make it continue flushing. case NewValue =:= MaxValue - 1 of true -> flush; - false -> continue + false -> no_flush end; dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> %% If Count > 0, it must have been removed @@ -1582,7 +1586,7 @@ dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> %% make it continue flushing. case NewValue =:= MaxValue - 1 of true -> flush; - false -> continue + false -> no_flush end. dec_inflight_update(_InflightTID, _Count = 0) ->