diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index db4b294a8..3360c3c5d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -129,17 +129,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ?RES_METRICS, ResId, [ - matched, - sent, - dropped, - queued, - batched, - inflight, + 'matched', + 'sent', + 'dropped', + 'queued', + 'batched', + 'retried', 'sent.success', 'sent.failed', 'sent.exception', - 'dropped.inflight', - 'dropped.queued', + 'sent.inflight', + 'dropped.queue_not_enabled', + 'dropped.queue_full', + 'dropped.resource_not_found', + 'dropped.resource_stopped', 'dropped.other' ], [matched] diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 8b8b1467c..dabaf037c 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -77,27 +77,27 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> Result = call_query(sync, Id, ?QUERY(self(), Request), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'), _ = handle_query_result(Id, Result, false), Result. @@ -252,6 +252,7 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S case handle_query_result(Id, Result, false) of %% Send failed because resource down true -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> @@ -263,18 +264,20 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S inflight_drop(Name, Ref), St0; _ -> - St0#{queue => drop_head(Q)} + St0#{queue => drop_head(Id, Q)} end, {keep_state, St, {state_timeout, 0, resume}} end. -drop_head(Q) -> +drop_head(Id, Q) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1), Q1. -query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) -> +query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -308,6 +311,7 @@ flush( inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -322,8 +326,20 @@ maybe_append_queue(Id, undefined, _Items) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), undefined; maybe_append_queue(Id, Q, Items) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), - replayq:append(Q, Items). + case replayq:overflow(Q) of + Overflow when Overflow =< 0 -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + replayq:append(Q, Items); + Overflow -> + PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, + {Q1, QAckRef, Items} = replayq:pop(Q, PopOpts), + ok = replayq:ack(Q1, QAckRef), + Dropped = length(Items), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), + Q1 + end. batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( @@ -375,7 +391,8 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> handle_query_result(Id, {error, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; -handle_query_result(_Id, {recoverable_error, _}, _BlockWorker) -> +handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; @@ -426,7 +443,7 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), @@ -438,7 +455,8 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -451,7 +469,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), @@ -463,7 +481,8 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -477,14 +496,20 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> case reply_caller(Id, ?REPLY(From, Request, Result)) of - true -> ?MODULE:block(Pid); - false -> inflight_drop(Name, Ref) + true -> + ?MODULE:block(Pid); + false -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), + inflight_drop(Name, Ref) end. batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> case batch_reply_caller(Id, Result, Batch) of - true -> ?MODULE:block(Pid); - false -> inflight_drop(Name, Ref) + true -> + ?MODULE:block(Pid); + false -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)), + inflight_drop(Name, Ref) end. %%============================================================================== %% the inflight queue for async query