From b9ae4ea27694af87f5ea1664dbc565a82fd998d8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 13 Sep 2022 13:56:53 +0800 Subject: [PATCH] refactor: rename some metrics for emqx_resource --- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 10 ++-- apps/emqx_bridge/include/emqx_bridge.hrl | 20 +++---- apps/emqx_bridge/src/emqx_bridge_api.erl | 10 ++-- .../src/schema/emqx_bridge_schema.erl | 8 +-- .../test/emqx_bridge_mqtt_SUITE.erl | 8 +-- .../src/emqx_resource_manager.erl | 12 ++-- .../src/emqx_resource_worker.erl | 57 ++++++++----------- .../test/emqx_resource_SUITE.erl | 8 +-- 8 files changed, 62 insertions(+), 71 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index de9e10e74..08fe9c299 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -171,14 +171,14 @@ emqx_bridge_schema { zh: "被缓存" } } - metric_sent { + metric_retried { desc { - en: """Count of messages that are sent by this bridge.""" - zh: """已经发送出去的消息个数。""" + en: """Times of retried from the queue or the inflight window.""" + zh: """从队列或者飞行窗口里重试的次数。""" } label: { - en: "Sent" - zh: "已发送" + en: "Retried" + zh: "已重试" } } diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index 2b64dba70..6bc80f9cc 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -14,7 +14,7 @@ DroppedResourceStopped, Matched, Queued, - Sent, + Retried, SentFailed, SentInflight, SentSucc, @@ -33,10 +33,10 @@ 'dropped.resource_stopped' => DroppedResourceStopped, 'matched' => Matched, 'queuing' => Queued, - 'sent' => Sent, - 'sent.failed' => SentFailed, - 'sent.inflight' => SentInflight, - 'sent.success' => SentSucc, + 'retried' => Retried, + 'failed' => SentFailed, + 'inflight' => SentInflight, + 'success' => SentSucc, rate => RATE, rate_last5m => RATE_5, rate_max => RATE_MAX, @@ -54,7 +54,7 @@ DroppedResourceStopped, Matched, Queued, - Sent, + Retried, SentFailed, SentInflight, SentSucc, @@ -73,10 +73,10 @@ 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'queuing' := Queued, - 'sent' := Sent, - 'sent.failed' := SentFailed, - 'sent.inflight' := SentInflight, - 'sent.success' := SentSucc, + 'retried' := Retried, + 'failed' := SentFailed, + 'inflight' := SentInflight, + 'success' := SentSucc, rate := RATE, rate_last5m := RATE_5, rate_max := RATE_MAX, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ba896d9b7..a353c9cf0 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -704,10 +704,10 @@ format_metrics(#{ 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'queuing' := Queued, - 'sent' := Sent, - 'sent.failed' := SentFailed, - 'sent.inflight' := SentInflight, - 'sent.success' := SentSucc, + 'retried' := Retried, + 'failed' := SentFailed, + 'inflight' := SentInflight, + 'success' := SentSucc, 'received' := Rcvd }, rate := #{ @@ -724,7 +724,7 @@ format_metrics(#{ DroppedResourceStopped, Matched, Queued, - Sent, + Retried, SentFailed, SentInflight, SentSucc, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 3beeb6e28..8bfc1c78a 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -118,10 +118,10 @@ fields("metrics") -> mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})}, - {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, - {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, - {"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, - {"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, + {"retried", mk(integer(), #{desc => ?DESC("metric_retried")})}, + {"failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, + {"inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, + {"success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, {"rate", mk(float(), #{desc => ?DESC("metric_rate")})}, {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})}, {"rate_last5m", diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 02b76d64b..e35ad5fe5 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -245,13 +245,13 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), ?assertMatch( #{ - <<"metrics">> := #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0}, + <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, <<"node_metrics">> := [ #{ <<"node">> := _, <<"metrics">> := - #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0} + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} } ] }, @@ -464,13 +464,13 @@ t_egress_mqtt_bridge_with_rules(_) -> {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), ?assertMatch( #{ - <<"metrics">> := #{<<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0}, + <<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0}, <<"node_metrics">> := [ #{ <<"node">> := _, <<"metrics">> := #{ - <<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0 + <<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0 } } ] diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c8c097c30..e4ba92b5c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -130,18 +130,18 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ResId, [ 'matched', - 'sent', + 'retried', + 'success', + 'failed', 'dropped', - 'queuing', - 'batching', - 'sent.success', - 'sent.failed', - 'sent.inflight', 'dropped.queue_not_enabled', 'dropped.queue_full', 'dropped.resource_not_found', 'dropped.resource_stopped', 'dropped.other', + 'queuing', + 'batching', + 'inflight', 'received' ], [matched] diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a451939b6..f683b67ed 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -250,6 +250,7 @@ retry_first_from_queue(Q, Id, St) -> end. retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), Result = call_query(sync, Id, FirstQuery, #{}), case handle_query_result(Id, Result, false) of %% Send failed because resource down @@ -376,49 +377,48 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) -> ?SLOG(error, #{msg => resource_exception, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), BlockWorker; handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when NotWorking == not_connected; NotWorking == blocked -> true; handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) -> - ?SLOG(error, #{msg => resource_not_found, info => Msg}), + ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) -> - ?SLOG(error, #{msg => resource_stopped, info => Msg}), + ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> - ?SLOG(error, #{msg => other_resource_error, reason => Reason}), + ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), BlockWorker; handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) -> %% the message will be queued in replayq or inflight window, - %% i.e. the counter 'queuing' will increase, so we pretend that we have not + %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% sent this message. - ?SLOG(warning, #{msg => recoverable_error, reason => Reason}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), + ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), true; handle_query_result(Id, {error, Reason}, BlockWorker) -> - ?SLOG(error, #{msg => send_error, reason => Reason}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), + ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), BlockWorker; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) -> - ?SLOG(error, #{msg => async_send_error, info => Msg}), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), + ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'), BlockWorker; handle_query_result(_Id, {async_return, ok}, BlockWorker) -> BlockWorker; handle_query_result(Id, Result, BlockWorker) -> assert_ok_result(Result), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'), BlockWorker. call_query(QM0, Id, Query, QueryOpts) -> @@ -459,11 +459,7 @@ call_query(QM0, Id, Query, QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), - Result = ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), - Result; + ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), @@ -475,8 +471,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'), ReplyFun = fun ?MODULE:reply_after_query/6, Ref = make_message_ref(), Args = [self(), Id, Name, Ref, Query], @@ -489,12 +484,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], - BatchLen = length(Batch), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen), - Result = ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), - Result; + ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Name = maps:get(inflight_name, QueryOpts, undefined), @@ -507,8 +497,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> {async_return, inflight_full}; false -> BatchLen = length(Batch), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen), ReplyFun = fun ?MODULE:batch_reply_after_query/6, Ref = make_message_ref(), Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, @@ -521,12 +510,13 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ). reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> - %% NOTE: 'sent.inflight' is message count that sent but no ACK received, + %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1), case reply_caller(Id, ?REPLY(From, Request, Result)) of true -> - %% we marked these messages are 'queuing' although they are in inflight window + %% we marked these messages are 'queuing' although they are actually + %% keeped in inflight window, not replayq emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), ?MODULE:block(Pid); false -> @@ -534,13 +524,14 @@ reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> end. batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> - %% NOTE: 'sent.inflight' is message count that sent but no ACK received, + %% NOTE: 'inflight' is message count that sent async but no ACK received, %% NOT the message number ququed in the inflight window. BatchLen = length(Batch), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen), case batch_reply_caller(Id, Result, Batch) of true -> - %% we marked these messages are 'queuing' although they are in inflight window + %% we marked these messages are 'queuing' although they are actually + %% keeped in inflight window, not replayq emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), ?MODULE:block(Pid); false -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 68b4fb6dd..3f42850ad 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -268,7 +268,7 @@ t_query_counter_async_query(_) -> end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), - ?assertMatch(#{matched := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), + ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C), ok = emqx_resource:remove_local(?ID). t_query_counter_async_callback(_) -> @@ -309,7 +309,7 @@ t_query_counter_async_callback(_) -> end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), - ?assertMatch(#{matched := 1002, sent := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), + ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C), ?assertMatch(1000, ets:info(Tab0, size)), ?assert( lists:all( @@ -419,8 +419,8 @@ t_query_counter_async_inflight(_) -> {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), ?assertMatch( - #{matched := M, sent := St, 'sent.success' := Ss, dropped := D} when - St == Ss andalso M == St + D, + #{matched := M, success := Ss, dropped := D} when + M == Ss + D, C ), ?assert(