refactor: rename some metrics for emqx_resource

This commit is contained in:
Shawn 2022-09-13 13:56:53 +08:00
parent 715c78f738
commit b9ae4ea276
8 changed files with 62 additions and 71 deletions

View File

@ -171,14 +171,14 @@ emqx_bridge_schema {
zh: "被缓存" zh: "被缓存"
} }
} }
metric_sent { metric_retried {
desc { desc {
en: """Count of messages that are sent by this bridge.""" en: """Times of retried from the queue or the inflight window."""
zh: """已经发送出去的消息个数。""" zh: """从队列或者飞行窗口里重试的次数。"""
} }
label: { label: {
en: "Sent" en: "Retried"
zh: "已发送" zh: "已重试"
} }
} }

View File

@ -14,7 +14,7 @@
DroppedResourceStopped, DroppedResourceStopped,
Matched, Matched,
Queued, Queued,
Sent, Retried,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,
@ -33,10 +33,10 @@
'dropped.resource_stopped' => DroppedResourceStopped, 'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched, 'matched' => Matched,
'queuing' => Queued, 'queuing' => Queued,
'sent' => Sent, 'retried' => Retried,
'sent.failed' => SentFailed, 'failed' => SentFailed,
'sent.inflight' => SentInflight, 'inflight' => SentInflight,
'sent.success' => SentSucc, 'success' => SentSucc,
rate => RATE, rate => RATE,
rate_last5m => RATE_5, rate_last5m => RATE_5,
rate_max => RATE_MAX, rate_max => RATE_MAX,
@ -54,7 +54,7 @@
DroppedResourceStopped, DroppedResourceStopped,
Matched, Matched,
Queued, Queued,
Sent, Retried,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,
@ -73,10 +73,10 @@
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'queuing' := Queued, 'queuing' := Queued,
'sent' := Sent, 'retried' := Retried,
'sent.failed' := SentFailed, 'failed' := SentFailed,
'sent.inflight' := SentInflight, 'inflight' := SentInflight,
'sent.success' := SentSucc, 'success' := SentSucc,
rate := RATE, rate := RATE,
rate_last5m := RATE_5, rate_last5m := RATE_5,
rate_max := RATE_MAX, rate_max := RATE_MAX,

View File

@ -704,10 +704,10 @@ format_metrics(#{
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'queuing' := Queued, 'queuing' := Queued,
'sent' := Sent, 'retried' := Retried,
'sent.failed' := SentFailed, 'failed' := SentFailed,
'sent.inflight' := SentInflight, 'inflight' := SentInflight,
'sent.success' := SentSucc, 'success' := SentSucc,
'received' := Rcvd 'received' := Rcvd
}, },
rate := #{ rate := #{
@ -724,7 +724,7 @@ format_metrics(#{
DroppedResourceStopped, DroppedResourceStopped,
Matched, Matched,
Queued, Queued,
Sent, Retried,
SentFailed, SentFailed,
SentInflight, SentInflight,
SentSucc, SentSucc,

View File

@ -118,10 +118,10 @@ fields("metrics") ->
mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})},
{"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})},
{"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})}, {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})},
{"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, {"retried", mk(integer(), #{desc => ?DESC("metric_retried")})},
{"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, {"failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
{"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, {"inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})},
{"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, {"success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},
{"rate", mk(float(), #{desc => ?DESC("metric_rate")})}, {"rate", mk(float(), #{desc => ?DESC("metric_rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})}, {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})},
{"rate_last5m", {"rate_last5m",

View File

@ -245,13 +245,13 @@ t_mqtt_conn_bridge_egress(_) ->
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch( ?assertMatch(
#{ #{
<<"metrics">> := #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0}, <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
<<"node_metrics">> := <<"node_metrics">> :=
[ [
#{ #{
<<"node">> := _, <<"node">> := _,
<<"metrics">> := <<"metrics">> :=
#{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0} #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
} }
] ]
}, },
@ -464,13 +464,13 @@ t_egress_mqtt_bridge_with_rules(_) ->
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch( ?assertMatch(
#{ #{
<<"metrics">> := #{<<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0}, <<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
<<"node_metrics">> := <<"node_metrics">> :=
[ [
#{ #{
<<"node">> := _, <<"node">> := _,
<<"metrics">> := #{ <<"metrics">> := #{
<<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0 <<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0
} }
} }
] ]

View File

@ -130,18 +130,18 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
ResId, ResId,
[ [
'matched', 'matched',
'sent', 'retried',
'success',
'failed',
'dropped', 'dropped',
'queuing',
'batching',
'sent.success',
'sent.failed',
'sent.inflight',
'dropped.queue_not_enabled', 'dropped.queue_not_enabled',
'dropped.queue_full', 'dropped.queue_full',
'dropped.resource_not_found', 'dropped.resource_not_found',
'dropped.resource_stopped', 'dropped.resource_stopped',
'dropped.other', 'dropped.other',
'queuing',
'batching',
'inflight',
'received' 'received'
], ],
[matched] [matched]

View File

@ -250,6 +250,7 @@ retry_first_from_queue(Q, Id, St) ->
end. end.
retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) -> retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
Result = call_query(sync, Id, FirstQuery, #{}), Result = call_query(sync, Id, FirstQuery, #{}),
case handle_query_result(Id, Result, false) of case handle_query_result(Id, Result, false) of
%% Send failed because resource down %% Send failed because resource down
@ -376,49 +377,48 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) ->
?SLOG(error, #{msg => resource_exception, info => Msg}), ?SLOG(error, #{msg => resource_exception, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
BlockWorker; BlockWorker;
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
NotWorking == not_connected; NotWorking == blocked NotWorking == not_connected; NotWorking == blocked
-> ->
true; true;
handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) ->
?SLOG(error, #{msg => resource_not_found, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
BlockWorker; BlockWorker;
handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) ->
?SLOG(error, #{msg => resource_stopped, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
BlockWorker; BlockWorker;
handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
?SLOG(error, #{msg => other_resource_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
BlockWorker; BlockWorker;
handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) -> handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) ->
%% the message will be queued in replayq or inflight window, %% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' will increase, so we pretend that we have not %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
%% sent this message. %% sent this message.
?SLOG(warning, #{msg => recoverable_error, reason => Reason}), ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
true; true;
handle_query_result(Id, {error, Reason}, BlockWorker) -> handle_query_result(Id, {error, Reason}, BlockWorker) ->
?SLOG(error, #{msg => send_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
BlockWorker; BlockWorker;
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
true; true;
handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) -> handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) ->
?SLOG(error, #{msg => async_send_error, info => Msg}), ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
BlockWorker; BlockWorker;
handle_query_result(_Id, {async_return, ok}, BlockWorker) -> handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
BlockWorker; BlockWorker;
handle_query_result(Id, Result, BlockWorker) -> handle_query_result(Id, Result, BlockWorker) ->
assert_ok_result(Result), assert_ok_result(Result),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'),
BlockWorker. BlockWorker.
call_query(QM0, Id, Query, QueryOpts) -> call_query(QM0, Id, Query, QueryOpts) ->
@ -459,11 +459,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
Result = ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
Result;
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
@ -475,8 +471,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
ReplyFun = fun ?MODULE:reply_after_query/6, ReplyFun = fun ?MODULE:reply_after_query/6,
Ref = make_message_ref(), Ref = make_message_ref(),
Args = [self(), Id, Name, Ref, Query], Args = [self(), Id, Name, Ref, Query],
@ -489,12 +484,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Requests = [Request || ?QUERY(_From, Request) <- Batch], Requests = [Request || ?QUERY(_From, Request) <- Batch],
BatchLen = length(Batch), ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
Result = ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
Result;
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
@ -507,8 +497,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
BatchLen = length(Batch), BatchLen = length(Batch),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
ReplyFun = fun ?MODULE:batch_reply_after_query/6, ReplyFun = fun ?MODULE:batch_reply_after_query/6,
Ref = make_message_ref(), Ref = make_message_ref(),
Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
@ -521,12 +510,13 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
). ).
reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
%% NOTE: 'sent.inflight' is message count that sent but no ACK received, %% NOTE: 'inflight' is message count that sent async but no ACK received,
%% NOT the message number ququed in the inflight window. %% NOT the message number ququed in the inflight window.
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1), emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1),
case reply_caller(Id, ?REPLY(From, Request, Result)) of case reply_caller(Id, ?REPLY(From, Request, Result)) of
true -> true ->
%% we marked these messages are 'queuing' although they are in inflight window %% we marked these messages are 'queuing' although they are actually
%% keeped in inflight window, not replayq
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> false ->
@ -534,13 +524,14 @@ reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
end. end.
batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
%% NOTE: 'sent.inflight' is message count that sent but no ACK received, %% NOTE: 'inflight' is message count that sent async but no ACK received,
%% NOT the message number ququed in the inflight window. %% NOT the message number ququed in the inflight window.
BatchLen = length(Batch), BatchLen = length(Batch),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen), emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen),
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller(Id, Result, Batch) of
true -> true ->
%% we marked these messages are 'queuing' although they are in inflight window %% we marked these messages are 'queuing' although they are actually
%% keeped in inflight window, not replayq
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> false ->

View File

@ -268,7 +268,7 @@ t_query_counter_async_query(_) ->
end end
), ),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
?assertMatch(#{matched := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
ok = emqx_resource:remove_local(?ID). ok = emqx_resource:remove_local(?ID).
t_query_counter_async_callback(_) -> t_query_counter_async_callback(_) ->
@ -309,7 +309,7 @@ t_query_counter_async_callback(_) ->
end end
), ),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
?assertMatch(#{matched := 1002, sent := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
?assertMatch(1000, ets:info(Tab0, size)), ?assertMatch(1000, ets:info(Tab0, size)),
?assert( ?assert(
lists:all( lists:all(
@ -419,8 +419,8 @@ t_query_counter_async_inflight(_) ->
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
ct:pal("metrics: ~p", [C]), ct:pal("metrics: ~p", [C]),
?assertMatch( ?assertMatch(
#{matched := M, sent := St, 'sent.success' := Ss, dropped := D} when #{matched := M, success := Ss, dropped := D} when
St == Ss andalso M == St + D, M == Ss + D,
C C
), ),
?assert( ?assert(