diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 6dbf7062c..bc4a7bc41 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -114,10 +114,13 @@ simple_sync_query(Id, Request) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - QueryOpts = #{}, + QueryOpts = #{perform_inflight_capacity_check => false}, emqx_resource_metrics:matched_inc(Id), - Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), QueryOpts), - _ = handle_query_result(Id, Result, false, false), + Ref = make_message_ref(), + Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts), + HasBeenSent = false, + BlockWorker = false, + _ = handle_query_result(Id, Result, HasBeenSent, BlockWorker), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). @@ -129,10 +132,13 @@ simple_async_query(Id, Request, ReplyFun) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - QueryOpts = #{}, + QueryOpts = #{perform_inflight_capacity_check => false}, emqx_resource_metrics:matched_inc(Id), - Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), QueryOpts), - _ = handle_query_result(Id, Result, false, false), + Ref = make_message_ref(), + Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts), + HasBeenSent = false, + BlockWorker = false, + _ = handle_query_result(Id, Result, HasBeenSent, BlockWorker), Result. -spec block(pid() | atom()) -> ok. @@ -313,16 +319,27 @@ retry_queue( empty -> {next_state, running, Data0}; {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} -> + Data = Data0#{queue := Q1}, QueryOpts = #{inflight_name => Name}, - Result = call_query(configured, Id, Index, Query, QueryOpts), + Ref = make_message_ref(), + Result = call_query(configured, Id, Index, Ref, Query, QueryOpts), Reply = ?REPLY(undefined, Request, HasBeenSent, Result), case reply_caller(Id, Reply) of true -> - {keep_state, Data0, {state_timeout, ResumeT, resume}}; + %% Still failed, but now it's in the inflight + %% table and marked as sent, except if the result + %% says inflight is full. In this case, we must + %% ensure it's indeed in the inflight table or + %% risk lose it. + ok = replayq:ack(Q1, QAckRef), + is_inflight_full_result(Result) andalso + inflight_append(Name, Ref, Query, Id, Index), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + {keep_state, Data, {state_timeout, ResumeT, resume}}; false -> ok = replayq:ack(Q1, QAckRef), + is_async(Id) orelse inflight_drop(Name, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - Data = Data0#{queue := Q1}, retry_queue(Data) end end; @@ -341,8 +358,10 @@ retry_queue( empty -> {next_state, running, Data0}; {Q1, QAckRef, Batch0} -> + Data = Data0#{queue := Q1}, QueryOpts = #{inflight_name => Name}, - Result = call_query(configured, Id, Index, Batch0, QueryOpts), + Ref = make_message_ref(), + Result = call_query(configured, Id, Index, Ref, Batch0, QueryOpts), %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% we now change the 'from' field to 'undefined' so it will not reply the caller again. Batch = [ @@ -352,12 +371,21 @@ retry_queue( case batch_reply_caller(Id, Result, Batch) of true -> ?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}), - {keep_state, Data0, {state_timeout, ResumeT, resume}}; + %% Still failed, but now it's in the inflight + %% table and marked as sent, except if the result + %% says inflight is full. In this case, we must + %% ensure it's indeed in the inflight table or + %% risk lose it. + ok = replayq:ack(Q1, QAckRef), + is_inflight_full_result(Result) andalso + inflight_append(Name, Ref, Batch, Id, Index), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + {keep_state, Data, {state_timeout, ResumeT, resume}}; false -> ?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}), ok = replayq:ack(Q1, QAckRef), + is_async(Id) orelse inflight_drop(Name, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - Data = Data0#{queue := Q1}, retry_queue(Data) end end. @@ -372,15 +400,25 @@ retry_inflight_sync( QueryOpts = #{}, %% if we are retrying an inflight query, it has been sent HasBeenSent = true, - Result = call_query(sync, Id, Index, QueryOrBatch, QueryOpts), + Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), BlockWorker = false, - case handle_query_result(Id, Result, HasBeenSent, BlockWorker) of + case handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) of %% Send failed because resource is down - true -> + {true, PostFn} -> + PostFn(), + ?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}), {keep_state, Data0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working - false -> - inflight_drop(Name, Ref, Id, Index), + {false, PostFn} -> + IsDropped = inflight_drop(Name, Ref, Id, Index), + %% we need to defer bumping the counters after + %% `inflight_drop' to avoid the race condition when an + %% inflight request might get completed concurrently with + %% the retry, bumping them twice. Since both inflight + %% requests (repeated and original) have the safe `Ref', + %% we bump the counter when removing it from the table. + IsDropped andalso PostFn(), + ?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}), do_resume(Data0) end. @@ -442,10 +480,12 @@ flush(Data0) -> %% `nack' a `pop'. %% Maybe we could re-open the queue? Data1 = Data0#{queue := Q1}, + Ref = make_message_ref(), do_flush(Data1, #{ new_queue => Q1, is_batch => IsBatch, batch => Batch, + ref => Ref, ack_ref => QAckRef }) end. @@ -456,7 +496,16 @@ flush(Data0) -> ack_ref := replayq:ack_ref() }) -> gen_statem:event_handler_result(state(), data()). -do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> +do_flush( + Data0, + #{ + is_batch := false, + batch := Batch, + ref := Ref, + ack_ref := QAckRef, + new_queue := Q1 + } +) -> #{ id := Id, index := Index, @@ -465,37 +514,23 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que %% unwrap when not batching (i.e., batch size == 1) [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, QueryOpts = #{inflight_name => Name}, - Result = call_query(configured, Id, Index, Request, QueryOpts), - IsAsync = is_async(Id), + Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Data1 = cancel_flush_timer(Data0), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), - case {reply_caller(Id, Reply), IsAsync} of - %% failed and is not async; keep the request in the queue to - %% be retried - {true, false} -> - %% Note: currently, we cannot safely pop an item from - %% `replayq', keep the old reference to the queue and - %% later try to append new items to the old ref: by - %% popping an item, we may cause the side effect of - %% closing an open segment and opening a new one, and the - %% later `append' with the old file descriptor will fail - %% with `einval' because it has been closed. So we are - %% forced to re-append the item, changing the order of - %% requests... - ok = replayq:ack(Q1, QAckRef), - SentBatch = mark_as_sent(Batch), - Q2 = append_queue(Id, Index, Q1, SentBatch), - Data2 = Data1#{queue := Q2}, - {next_state, blocked, Data2}; - %% failed and is async; remove the request from the queue, as - %% it is already in inflight table - {true, true} -> + case reply_caller(Id, Reply) of + %% Failed; remove the request from the queue, as we cannot pop + %% from it again. But we must ensure it's in the inflight + %% table, even if it's full, so we don't lose the request. + %% And only in that case. + true -> ok = replayq:ack(Q1, QAckRef), + is_inflight_full_result(Result) andalso inflight_append(Name, Ref, Request, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), {next_state, blocked, Data1}; - %% success; just ack - {false, _} -> + %% Success; just ack. + false -> ok = replayq:ack(Q1, QAckRef), + is_async(Id) orelse inflight_drop(Name, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), case replayq:count(Q1) > 0 of true -> @@ -504,7 +539,13 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que {keep_state, Data1} end end; -do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> +do_flush(Data0, #{ + is_batch := true, + batch := Batch, + ref := Ref, + ack_ref := QAckRef, + new_queue := Q1 +}) -> #{ id := Id, index := Index, @@ -512,36 +553,22 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu name := Name } = Data0, QueryOpts = #{inflight_name => Name}, - Result = call_query(configured, Id, Index, Batch, QueryOpts), - IsAsync = is_async(Id), + Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), Data1 = cancel_flush_timer(Data0), - case {batch_reply_caller(Id, Result, Batch), IsAsync} of - %% failed and is not async; keep the request in the queue to - %% be retried - {true, false} -> - %% Note: currently, we cannot safely pop an item from - %% `replayq', keep the old reference to the queue and - %% later try to append new items to the old ref: by - %% popping an item, we may cause the side effect of - %% closing an open segment and opening a new one, and the - %% later `append' with the old file descriptor will fail - %% with `einval' because it has been closed. So we are - %% forced to re-append the item, changing the order of - %% requests... - ok = replayq:ack(Q1, QAckRef), - SentBatch = mark_as_sent(Batch), - Q2 = append_queue(Id, Index, Q1, SentBatch), - Data2 = Data1#{queue := Q2}, - {next_state, blocked, Data2}; - %% failed and is async; remove the request from the queue, as - %% it is already in inflight table - {true, true} -> + case batch_reply_caller(Id, Result, Batch) of + %% Failed; remove the request from the queue, as we cannot pop + %% from it again. But we must ensure it's in the inflight + %% table, even if it's full, so we don't lose the request. + %% And only in that case. + true -> ok = replayq:ack(Q1, QAckRef), + is_inflight_full_result(Result) andalso inflight_append(Name, Ref, Batch, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), {next_state, blocked, Data1}; - %% success; just ack - {false, _} -> + %% Success; just ack. + false -> ok = replayq:ack(Q1, QAckRef), + is_async(Id) orelse inflight_drop(Name, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), CurrentCount = replayq:count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of @@ -556,23 +583,34 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu end. batch_reply_caller(Id, BatchResult, Batch) -> + {ShouldBlock, PostFns} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch), + lists:foreach(fun(F) -> F() end, PostFns), + ShouldBlock. + +batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> lists:foldl( - fun(Reply, BlockWorker) -> - reply_caller(Id, Reply, BlockWorker) + fun(Reply, {BlockWorker, PostFns}) -> + {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply, BlockWorker), + {ShouldBlock, [PostFn | PostFns]} end, - false, + {false, []}, %% the `Mod:on_batch_query/3` returns a single result for a batch, %% so we need to expand ?EXPAND(BatchResult, Batch) ). reply_caller(Id, Reply) -> - BlockWorker = false, - reply_caller(Id, Reply, BlockWorker). + {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply), + PostFn(), + ShouldBlock. -reply_caller(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) -> - handle_query_result(Id, Result, HasBeenSent, BlockWorker); -reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when +reply_caller_defer_metrics(Id, Reply) -> + BlockWorker = false, + reply_caller_defer_metrics(Id, Reply, BlockWorker). + +reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) -> + handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker); +reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when is_function(ReplyFun) -> _ = @@ -580,55 +618,89 @@ reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) {async_return, _} -> no_reply_for_now; _ -> apply(ReplyFun, Args ++ [Result]) end, - handle_query_result(Id, Result, HasBeenSent, BlockWorker); -reply_caller(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) -> + handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker); +reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) -> gen_statem:reply(From, Result), - handle_query_result(Id, Result, HasBeenSent, BlockWorker). + handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker). -handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) -> - ?SLOG(error, #{msg => resource_exception, info => Msg}), - inc_sent_failed(Id, HasBeenSent), - BlockWorker; -handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when +handle_query_result(Id, Result, HasBeenSent, BlockWorker) -> + {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker), + PostFn(), + ShouldBlock. + +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) -> + PostFn = fun() -> + ?SLOG(error, #{msg => resource_exception, info => Msg}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {BlockWorker, PostFn}; +handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when NotWorking == not_connected; NotWorking == blocked -> - true; -handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) -> - ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_resource_metrics:dropped_resource_not_found_inc(Id), - BlockWorker; -handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) -> - ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_resource_metrics:dropped_resource_stopped_inc(Id), - BlockWorker; -handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) -> - ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_resource_metrics:dropped_other_inc(Id), - BlockWorker; -handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) -> + {true, fun() -> ok end}; +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), + emqx_resource_metrics:dropped_resource_not_found_inc(Id), + ok + end, + {BlockWorker, PostFn}; +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), + emqx_resource_metrics:dropped_resource_stopped_inc(Id), + ok + end, + {BlockWorker, PostFn}; +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), + emqx_resource_metrics:dropped_other_inc(Id), + ok + end, + {BlockWorker, PostFn}; +handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) -> %% the message will be queued in replayq or inflight window, %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% sent this message. - ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), - true; -handle_query_result(Id, {error, Reason}, HasBeenSent, BlockWorker) -> - ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), - BlockWorker; -handle_query_result(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) -> - true; -handle_query_result(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) -> - ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), - inc_sent_failed(Id, HasBeenSent), - BlockWorker; -handle_query_result(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) -> - BlockWorker; -handle_query_result(Id, Result, HasBeenSent, BlockWorker) -> - assert_ok_result(Result), - inc_sent_success(Id, HasBeenSent), - BlockWorker. + PostFn = fun() -> + ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), + ok + end, + {true, PostFn}; +handle_query_result_pure(Id, {error, Reason}, HasBeenSent, BlockWorker) -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {BlockWorker, PostFn}; +handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) -> + {true, fun() -> ok end}; +handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {BlockWorker, PostFn}; +handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) -> + {BlockWorker, fun() -> ok end}; +handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) -> + PostFn = fun() -> + assert_ok_result(Result), + inc_sent_success(Id, HasBeenSent), + ok + end, + {BlockWorker, PostFn}. -call_query(QM0, Id, Index, Query, QueryOpts) -> +is_inflight_full_result({async_return, inflight_full}) -> + true; +is_inflight_full_result(_) -> + false. + +call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> @@ -638,7 +710,7 @@ call_query(QM0, Id, Index, Query, QueryOpts) -> _ -> QM0 end, CM = maps:get(callback_mode, Data), - apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts); + apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Ref, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> @@ -665,20 +737,34 @@ call_query(QM0, Id, Index, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> - ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), - ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> +apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> + ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), + Name = maps:get(inflight_name, QueryOpts, undefined), + PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), + ?APPLY_RESOURCE( + call_query, + case PerformInflightCapacityCheck andalso is_inflight_full(Name) of + true -> + %% should be kept in the inflight table and retried + %% when resuming. + {async_return, inflight_full}; + false -> + ok = inflight_append(Name, Ref, Query, Id, Index), + Mod:on_query(Id, Request, ResSt) + end, + Request + ); +apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), + PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( call_query_async, - case is_inflight_full(Name) of + case PerformInflightCapacityCheck andalso is_inflight_full(Name) of true -> {async_return, inflight_full}; false -> ReplyFun = fun ?MODULE:reply_after_query/7, - Ref = make_message_ref(), Args = [self(), Id, Index, Name, Ref, Query], ok = inflight_append(Name, Ref, Query, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), @@ -686,21 +772,35 @@ apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, Que end, Request ); -apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), + Name = maps:get(inflight_name, QueryOpts, undefined), + PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> + ?APPLY_RESOURCE( + call_batch_query, + case PerformInflightCapacityCheck andalso is_inflight_full(Name) of + true -> + %% should be kept in the inflight table and retried + %% when resuming. + {async_return, inflight_full}; + false -> + ok = inflight_append(Name, Ref, Batch, Id, Index), + Mod:on_batch_query(Id, Requests, ResSt) + end, + Batch + ); +apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), + PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( call_batch_query_async, - case is_inflight_full(Name) of + case PerformInflightCapacityCheck andalso is_inflight_full(Name) of true -> {async_return, inflight_full}; false -> ReplyFun = fun ?MODULE:batch_reply_after_query/7, - Ref = make_message_ref(), ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ok = inflight_append(Name, Ref, Batch, Id, Index), @@ -714,29 +814,36 @@ reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent), %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - case reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Result)) of - true -> + case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of + {true, PostFn} -> + PostFn(), ?MODULE:block(Pid); - false -> - drop_inflight_and_resume(Pid, Name, Ref, Id, Index) + {false, PostFn} -> + IsDropped = drop_inflight_and_resume(Pid, Name, Ref, Id, Index), + IsDropped andalso PostFn(), + ok end. batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - case batch_reply_caller(Id, Result, Batch) of - true -> + case batch_reply_caller_defer_metrics(Id, Result, Batch) of + {true, PostFns} -> + lists:foreach(fun(F) -> F() end, PostFns), ?MODULE:block(Pid); - false -> - drop_inflight_and_resume(Pid, Name, Ref, Id, Index) + {false, PostFns} -> + IsDropped = drop_inflight_and_resume(Pid, Name, Ref, Id, Index), + IsDropped andalso lists:foreach(fun(F) -> F() end, PostFns), + ok end. drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> case is_inflight_full(Name) of true -> - inflight_drop(Name, Ref, Id, Index), - ?MODULE:resume(Pid); + IsDropped = inflight_drop(Name, Ref, Id, Index), + ?MODULE:resume(Pid), + IsDropped; false -> inflight_drop(Name, Ref, Id, Index) end. @@ -836,16 +943,18 @@ inflight_append(undefined, _Ref, _Query, _Id, _Index) -> ok; inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> Batch = mark_as_sent(Batch0), - ets:insert(Name, {Ref, Batch}), + IsNew = ets:insert_new(Name, {Ref, Batch}), BatchSize = length(Batch), - ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}), + IsNew andalso ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), + ?tp(resource_worker_appended_to_inflight, #{batch => Batch, is_new => IsNew}), ok; inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> Query = mark_as_sent(Query0), - ets:insert(Name, {Ref, Query}), - ets:update_counter(Name, ?SIZE_REF, {2, 1}), + IsNew = ets:insert_new(Name, {Ref, Query}), + IsNew andalso ets:update_counter(Name, ?SIZE_REF, {2, 1}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), + ?tp(resource_worker_appended_to_inflight, #{query => Query, is_new => IsNew}), ok; inflight_append(Name, Ref, Data, _Id, _Index) -> ets:insert(Name, {Ref, Data}), @@ -853,8 +962,8 @@ inflight_append(Name, Ref, Data, _Id, _Index) -> %% the inflight metric. ok. -inflight_drop(undefined, _, _Id, _Index) -> - ok; +inflight_drop(undefined, _Ref, _Id, _Index) -> + false; inflight_drop(Name, Ref, Id, Index) -> Count = case ets:take(Name, Ref) of @@ -862,9 +971,10 @@ inflight_drop(Name, Ref, Id, Index) -> [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); _ -> 0 end, - Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}), + IsDropped = Count > 0, + IsDropped andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), - ok. + IsDropped. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 44648cd84..52a640aed 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -411,8 +411,21 @@ t_query_counter_async_inflight(_) -> %% send async query to make the inflight window full ?check_trace( - ?TRACE_OPTS, - inc_counter_in_parallel(WindowSize, ReqOpts), + begin + {ok, SRef} = snabbkaffe:subscribe( + ?match_event( + #{ + ?snk_kind := resource_worker_appended_to_inflight, + is_new := true + } + ), + WindowSize, + _Timeout = 5_000 + ), + inc_counter_in_parallel(WindowSize, ReqOpts), + {ok, _} = snabbkaffe:receive_events(SRef), + ok + end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) @@ -455,29 +468,29 @@ t_query_counter_async_inflight(_) -> %% +1 because the tmp_query above will be retried and succeed %% this time. WindowSize + 1, - _Timeout = 60_000 + _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), tap_metrics(?LINE), {ok, _} = snabbkaffe:receive_events(SRef0), + tap_metrics(?LINE), %% since the previous tmp_query was enqueued to be retried, we %% take it again from the table; this time, it should have %% succeeded. ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), - ?assertEqual(WindowSize, ets:info(Tab0, size)), + ?assertEqual(WindowSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), %% send async query, this time everything should be ok. Num = 10, ?check_trace( - ?TRACE_OPTS, begin {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), Num, - _Timeout = 60_000 + _Timeout0 = 10_000 ), - inc_counter_in_parallel(Num, ReqOpts), + inc_counter_in_parallel_increasing(Num, 1, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef), ok end, @@ -502,17 +515,18 @@ t_query_counter_async_inflight(_) -> ), %% this will block the resource_worker - ok = emqx_resource:query(?ID, {inc_counter, 1}), + ok = emqx_resource:query(?ID, {inc_counter, 4}), Sent = WindowSize + Num + WindowSize, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), WindowSize, - _Timeout = 60_000 + _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), {ok, _} = snabbkaffe:receive_events(SRef1), ?assertEqual(Sent, ets:info(Tab0, size)), + tap_metrics(?LINE), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), @@ -842,6 +856,8 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid0)), %% metrics are reset when recreating + %% depending on timing, might show the request we just did. + ct:sleep(500), ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ok = emqx_resource:stop(?ID), @@ -861,6 +877,7 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid1)), %% now stop while resetting the metrics + ct:sleep(500), emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), @@ -1059,7 +1076,7 @@ t_retry_batch(_Config) -> %% batch shall remain enqueued. {ok, _} = snabbkaffe:block_until( - ?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}), + ?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}), 5_000 ), %% should not have increased the matched count with the retries @@ -1071,7 +1088,7 @@ t_retry_batch(_Config) -> {ok, {ok, _}} = ?wait_async_action( ok = emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := resource_worker_retry_queue_batch_succeeded}, + #{?snk_kind := resource_worker_retry_inflight_succeeded}, 5_000 ), %% 1 more because of the `resume' call @@ -1124,17 +1141,16 @@ t_delete_and_re_create_with_same_name(_Config) -> ), %% pre-condition: we should have just created a new queue Queuing0 = emqx_resource_metrics:queuing_get(?ID), + Inflight0 = emqx_resource_metrics:inflight_get(?ID), ?assertEqual(0, Queuing0), + ?assertEqual(0, Inflight0), ?check_trace( begin ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), NumRequests = 10, {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := resource_worker_appended_to_queue}), - %% +1 because the first request will fail, - %% block the resource, and will be - %% re-appended to the queue. - NumRequests + 1, + NumRequests, _Timeout = 5_000 ), %% ensure replayq offloads to disk @@ -1153,8 +1169,11 @@ t_delete_and_re_create_with_same_name(_Config) -> {ok, _} = snabbkaffe:receive_events(SRef), %% ensure that stuff got enqueued into disk + tap_metrics(?LINE), Queuing1 = emqx_resource_metrics:queuing_get(?ID), - ?assertEqual(NumRequests, Queuing1), + Inflight1 = emqx_resource_metrics:inflight_get(?ID), + ?assertEqual(NumRequests - 1, Queuing1), + ?assertEqual(1, Inflight1), %% now, we delete the resource ok = emqx_resource:remove_local(?ID), @@ -1182,7 +1201,9 @@ t_delete_and_re_create_with_same_name(_Config) -> %% it shouldn't have anything enqueued, as it's a fresh resource Queuing2 = emqx_resource_metrics:queuing_get(?ID), + Inflight2 = emqx_resource_metrics:queuing_get(?ID), ?assertEqual(0, Queuing2), + ?assertEqual(0, Inflight2), ok end, @@ -1219,6 +1240,29 @@ inc_counter_in_parallel(N, Opts0) -> || Pid <- Pids ]. +inc_counter_in_parallel_increasing(N, StartN, Opts0) -> + Parent = self(), + Pids = [ + erlang:spawn(fun() -> + Opts = + case is_function(Opts0) of + true -> Opts0(); + false -> Opts0 + end, + emqx_resource:query(?ID, {inc_counter, M}, Opts), + Parent ! {complete, self()} + end) + || M <- lists:seq(StartN, StartN + N - 1) + ], + [ + receive + {complete, Pid} -> ok + after 1000 -> + ct:fail({wait_for_query_timeout, Pid}) + end + || Pid <- Pids + ]. + bin_config() -> <<"\"name\": \"test_resource\"">>. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 91fd2f399..94ac52209 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -508,14 +508,16 @@ install_telemetry_handler(TestCase) -> wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> Events = receive_all_events(GaugeName, Timeout), - case lists:last(Events) of + case length(Events) > 0 andalso lists:last(Events) of #{measurements := #{gauge_set := ExpectedValue}} -> ok; #{measurements := #{gauge_set := Value}} -> ct:fail( "gauge ~p didn't reach expected value ~p; last value: ~p", [GaugeName, ExpectedValue, Value] - ) + ); + false -> + ct:pal("no ~p gauge events received!", [GaugeName]) end. receive_all_events(EventName, Timeout) -> @@ -605,6 +607,8 @@ t_publish_success(Config) -> ResourceId, #{n_events => ExpectedInflightEvents, timeout => 5_000} ), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), assert_metrics( #{ dropped => 0, @@ -653,6 +657,8 @@ t_publish_success_local_topic(Config) -> ResourceId, #{n_events => ExpectedInflightEvents, timeout => 5_000} ), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), assert_metrics( #{ dropped => 0, @@ -739,6 +745,8 @@ t_publish_templated(Config) -> ResourceId, #{n_events => ExpectedInflightEvents, timeout => 5_000} ), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 0, 500), assert_metrics( #{ dropped => 0, @@ -1111,19 +1119,17 @@ do_econnrefused_or_timeout_test(Config, Error) -> ResourceId ); {_, sync} -> - wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ - timeout => 10_000, n_events => 2 - }), %% even waiting, hard to avoid flakiness... simpler to just sleep %% a bit until stabilization. - ct:sleep(200), + wait_until_gauge_is(queuing, 0, 500), + wait_until_gauge_is(inflight, 1, 500), assert_metrics( #{ dropped => 0, failed => 0, - inflight => 0, + inflight => 1, matched => 1, - queuing => 1, + queuing => 0, retried => 0, success => 0 },