diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index d951d7a9f..a0632a761 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -303,11 +303,12 @@ flush(#{acc := []} = St) -> flush( #{ id := Id, - acc := Batch, + acc := Batch0, batch_size := Size, queue := Q0 } = St ) -> + Batch = lists:reverse(Batch0), QueryOpts = #{ inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) @@ -393,12 +394,12 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; -handle_query_result(Id, {error, _}, BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), - BlockWorker; handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), true; +handle_query_result(Id, {error, _}, BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), + BlockWorker; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> @@ -449,7 +450,15 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), - ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); + ?APPLY_RESOURCE( + begin + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), + Result = Mod:on_query(Id, Request, ResSt), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), + Result + end, + Request + ); apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), @@ -475,7 +484,15 @@ apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), - ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); + ?APPLY_RESOURCE( + begin + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), + Result = Mod:on_batch_query(Id, Requests, ResSt), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), + Result + end, + Batch + ); apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined),