From bd0e2a74baf0eaf45e574b41af7291160db78807 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 16 Jan 2023 16:56:07 -0300 Subject: [PATCH] refactor: rename inflight_name field to inflight_tid --- .../src/emqx_resource_worker.erl | 116 ++++++++---------- 1 file changed, 52 insertions(+), 64 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 4e0ae9352..ca8e244ae 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -301,12 +301,22 @@ pick_cast(Id, Key, Query) -> resume_from_blocked(Data) -> #{inflight_tid := InflightTID} = Data, case inflight_get_first_retriable(InflightTID) of - empty -> - {next_state, running, Data}; + none -> + case is_inflight_full(InflightTID) of + true -> + {keep_state, Data}; + false -> + {next_state, running, Data} + end; {Ref, FirstQuery} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. - retry_inflight_sync(Ref, FirstQuery, Data) + case is_inflight_full(InflightTID) of + true -> + {keep_state, Data}; + false -> + retry_inflight_sync(Ref, FirstQuery, Data) + end end. retry_inflight_sync(Ref, QueryOrBatch, Data0) -> @@ -452,7 +462,7 @@ do_flush( } = Data0, %% unwrap when not batching (i.e., batch size == 1) [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, - QueryOpts = #{inflight_name => InflightTID}, + QueryOpts = #{inflight_tid => InflightTID}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), case reply_caller(Id, Reply) of @@ -519,7 +529,7 @@ do_flush(Data0, #{ batch_size := BatchSize, inflight_tid := InflightTID } = Data0, - QueryOpts = #{inflight_name => InflightTID}, + QueryOpts = #{inflight_tid => InflightTID}, Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), case batch_reply_caller(Id, Result, Batch) of %% Failed; remove the request from the queue, as we cannot pop @@ -746,21 +756,15 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}), - InflightTID = maps:get(inflight_name, QueryOpts, undefined), - PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), + InflightTID = maps:get(inflight_tid, QueryOpts, undefined), ?APPLY_RESOURCE( call_query, - case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of - true -> - %% should be kept in the inflight table and retried - %% when resuming. - {async_return, inflight_full}; - false -> - IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), - Mod:on_query(Id, Request, ResSt) + begin + IsRetriable = false, + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), + ok = inflight_append(InflightTID, InflightItem, Id, Index), + Mod:on_query(Id, Request, ResSt) end, Request ); @@ -768,22 +772,18 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async }), - InflightTID = maps:get(inflight_name, QueryOpts, undefined), - PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), + InflightTID = maps:get(inflight_tid, QueryOpts, undefined), ?APPLY_RESOURCE( call_query_async, - case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of - true -> - {async_return, inflight_full}; - false -> - ReplyFun = fun ?MODULE:reply_after_query/7, - Args = [self(), Id, Index, InflightTID, Ref, Query], - IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), - Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), - {async_return, Result} + begin + ReplyFun = fun ?MODULE:reply_after_query/7, + Args = [self(), Id, Index, InflightTID, Ref, Query], + IsRetriable = false, + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), + ok = inflight_append(InflightTID, InflightItem, Id, Index), + Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), + {async_return, Result} end, Request ); @@ -791,22 +791,16 @@ apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - InflightTID = maps:get(inflight_name, QueryOpts, undefined), - PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), + InflightTID = maps:get(inflight_tid, QueryOpts, undefined), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ?APPLY_RESOURCE( call_batch_query, - case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of - true -> - %% should be kept in the inflight table and retried - %% when resuming. - {async_return, inflight_full}; - false -> - IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), - Mod:on_batch_query(Id, Requests, ResSt) + begin + IsRetriable = false, + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), + ok = inflight_append(InflightTID, InflightItem, Id, Index), + Mod:on_batch_query(Id, Requests, ResSt) end, Batch ); @@ -814,23 +808,19 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt ?tp(call_batch_query_async, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async }), - InflightTID = maps:get(inflight_name, QueryOpts, undefined), - PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), + InflightTID = maps:get(inflight_tid, QueryOpts, undefined), ?APPLY_RESOURCE( call_batch_query_async, - case PerformInflightCapacityCheck andalso is_inflight_full(InflightTID) of - true -> - {async_return, inflight_full}; - false -> - ReplyFun = fun ?MODULE:batch_reply_after_query/7, - ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, - Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), - Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), - {async_return, Result} + begin + ReplyFun = fun ?MODULE:batch_reply_after_query/7, + ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, + Requests = [Request || ?QUERY(_From, Request, _) <- Batch], + IsRetriable = false, + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), + ok = inflight_append(InflightTID, InflightItem, Id, Index), + Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), + {async_return, Result} end, Batch ). @@ -935,7 +925,7 @@ inflight_new(InfltWinSZ, Id, Index) -> TableId. -spec inflight_get_first_retriable(ets:tid()) -> - empty | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}. + none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}. inflight_get_first_retriable(InflightTID) -> MatchSpec = ets:fun2ms( @@ -947,13 +937,11 @@ inflight_get_first_retriable(InflightTID) -> ), case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> - empty; + none; {[{Ref, BatchOrQuery}], _Continuation} -> {Ref, BatchOrQuery} end. -is_inflight_full(undefined) -> - false; is_inflight_full(InflightTID) -> [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages