diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 4e9e75085..2d646f4b8 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -117,8 +117,7 @@ simple_sync_query(Id, Request) -> Ref = make_message_ref(), Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts), HasBeenSent = false, - BlockWorker = false, - _ = handle_query_result(Id, Result, HasBeenSent, BlockWorker), + _ = handle_query_result(Id, Result, HasBeenSent), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). @@ -135,8 +134,7 @@ simple_async_query(Id, Request, ReplyFun) -> Ref = make_message_ref(), Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts), HasBeenSent = false, - BlockWorker = false, - _ = handle_query_result(Id, Result, HasBeenSent, BlockWorker), + _ = handle_query_result(Id, Result, HasBeenSent), Result. -spec block(pid()) -> ok. @@ -304,15 +302,14 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% if we are retrying an inflight query, it has been sent HasBeenSent = true, Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), - BlockWorker = false, - case handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) of + case handle_query_result_pure(Id, Result, HasBeenSent) of %% Send failed because resource is down - {true, PostFn} -> + {nack, PostFn} -> PostFn(), ?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}), {keep_state, Data0, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working - {false, PostFn} -> + {ack, PostFn} -> IsAcked = ack_inflight(InflightTID, Ref, Id, Index), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an @@ -427,7 +424,7 @@ do_flush( %% from it again. But we must ensure it's in the inflight %% table, even if it's full, so we don't lose the request. %% And only in that case. - true -> + nack -> ok = replayq:ack(Q1, QAckRef), ShouldPreserveInInflight = is_inflight_full_result(Result) orelse @@ -436,7 +433,7 @@ do_flush( emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), {next_state, blocked, Data0}; %% Success; just ack. - false -> + ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -467,7 +464,7 @@ do_flush(Data0, #{ %% from it again. But we must ensure it's in the inflight %% table, even if it's full, so we don't lose the request. %% And only in that case. - true -> + nack -> ok = replayq:ack(Q1, QAckRef), ShouldPreserveInInflight = is_inflight_full_result(Result) orelse @@ -476,7 +473,7 @@ do_flush(Data0, #{ emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), {next_state, blocked, Data0}; %% Success; just ack. - false -> + ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -499,11 +496,11 @@ batch_reply_caller(Id, BatchResult, Batch) -> batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> lists:foldl( - fun(Reply, {BlockWorker, PostFns}) -> - {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply, BlockWorker), + fun(Reply, {_ShouldBlock, PostFns}) -> + {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply), {ShouldBlock, [PostFn | PostFns]} end, - {false, []}, + {ack, []}, %% the `Mod:on_batch_query/3` returns a single result for a batch, %% so we need to expand ?EXPAND(BatchResult, Batch) @@ -514,13 +511,9 @@ reply_caller(Id, Reply) -> PostFn(), ShouldBlock. -reply_caller_defer_metrics(Id, Reply) -> - BlockWorker = false, - reply_caller_defer_metrics(Id, Reply, BlockWorker). - -reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) -> - handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker); -reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when +reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result)) -> + handle_query_result_pure(Id, Result, HasBeenSent); +reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result)) when is_function(ReplyFun) -> _ = @@ -528,49 +521,49 @@ reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), {async_return, _} -> no_reply_for_now; _ -> apply(ReplyFun, Args ++ [Result]) end, - handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker); -reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) -> + handle_query_result_pure(Id, Result, HasBeenSent); +reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result)) -> gen_statem:reply(From, Result), - handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker). + handle_query_result_pure(Id, Result, HasBeenSent). -handle_query_result(Id, Result, HasBeenSent, BlockWorker) -> - {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker), +handle_query_result(Id, Result, HasBeenSent) -> + {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), PostFn(), ShouldBlock. -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) -> +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{msg => resource_exception, info => Msg}), inc_sent_failed(Id, HasBeenSent), ok end, - {BlockWorker, PostFn}; -handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when + {ack, PostFn}; +handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when NotWorking == not_connected; NotWorking == blocked -> - {true, fun() -> ok end}; -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) -> + {nack, fun() -> ok end}; +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), emqx_resource_metrics:dropped_resource_not_found_inc(Id), ok end, - {BlockWorker, PostFn}; -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) -> + {ack, PostFn}; +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), emqx_resource_metrics:dropped_resource_stopped_inc(Id), ok end, - {BlockWorker, PostFn}; -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) -> + {ack, PostFn}; +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), emqx_resource_metrics:dropped_other_inc(Id), ok end, - {BlockWorker, PostFn}; -handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) -> + {ack, PostFn}; +handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> %% the message will be queued in replayq or inflight window, %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% sent this message. @@ -578,32 +571,32 @@ handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), ok end, - {true, PostFn}; -handle_query_result_pure(Id, {error, Reason}, HasBeenSent, BlockWorker) -> + {nack, PostFn}; +handle_query_result_pure(Id, {error, Reason}, HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), inc_sent_failed(Id, HasBeenSent), ok end, - {BlockWorker, PostFn}; -handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) -> - {true, fun() -> ok end}; -handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) -> + {ack, PostFn}; +handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent) -> + {nack, fun() -> ok end}; +handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), inc_sent_failed(Id, HasBeenSent), ok end, - {BlockWorker, PostFn}; -handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) -> - {BlockWorker, fun() -> ok end}; -handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) -> + {ack, PostFn}; +handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) -> + {ack, fun() -> ok end}; +handle_query_result_pure(Id, Result, HasBeenSent) -> PostFn = fun() -> assert_ok_result(Result), inc_sent_success(Id, HasBeenSent), ok end, - {BlockWorker, PostFn}. + {ack, PostFn}. is_inflight_full_result({async_return, inflight_full}) -> true; @@ -732,10 +725,10 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee %% but received no ACK, NOT the number of messages queued in the %% inflight window. case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of - {true, PostFn} -> + {nack, PostFn} -> PostFn(), ?MODULE:block(Pid); - {false, PostFn} -> + {ack, PostFn} -> IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), IsAcked andalso PostFn(), ok @@ -746,10 +739,10 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> %% but received no ACK, NOT the number of messages queued in the %% inflight window. case batch_reply_caller_defer_metrics(Id, Result, Batch) of - {true, PostFns} -> + {nack, PostFns} -> lists:foreach(fun(F) -> F() end, PostFns), ?MODULE:block(Pid); - {false, PostFns} -> + {ack, PostFns} -> IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), ok