refactor(resource): rename metrics batched,queued -> batching,queuing

This commit is contained in:
Shawn 2022-09-02 12:41:14 +08:00
parent 33c9c7d497
commit b45f3de8db
10 changed files with 41 additions and 25 deletions

View File

@ -78,7 +78,7 @@ emqx_bridge_schema {
} }
} }
metric_batched { metric_batching {
desc { desc {
en: """Count of messages that are currently accumulated in memory waiting for sending in one batch.""" en: """Count of messages that are currently accumulated in memory waiting for sending in one batch."""
zh: """当前积压在内存里,等待批量发送的消息个数""" zh: """当前积压在内存里,等待批量发送的消息个数"""
@ -161,9 +161,9 @@ emqx_bridge_schema {
} }
} }
metric_queued { metric_queuing {
desc { desc {
en: """Count of messages that are currently queued.""" en: """Count of messages that are currently queuing."""
zh: """当前被缓存到磁盘队列的消息个数。""" zh: """当前被缓存到磁盘队列的消息个数。"""
} }
label: { label: {

View File

@ -25,7 +25,7 @@
Rcvd Rcvd
), ),
#{ #{
'batched' => Batched, 'batching' => Batched,
'dropped' => Dropped, 'dropped' => Dropped,
'dropped.other' => DroppedOther, 'dropped.other' => DroppedOther,
'dropped.queue_full' => DroppedQueueFull, 'dropped.queue_full' => DroppedQueueFull,
@ -33,7 +33,7 @@
'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped, 'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched, 'matched' => Matched,
'queued' => Queued, 'queuing' => Queued,
'sent' => Sent, 'sent' => Sent,
'sent.exception' => SentExcpt, 'sent.exception' => SentExcpt,
'sent.failed' => SentFailed, 'sent.failed' => SentFailed,
@ -67,7 +67,7 @@
Rcvd Rcvd
), ),
#{ #{
'batched' := Batched, 'batching' := Batched,
'dropped' := Dropped, 'dropped' := Dropped,
'dropped.other' := DroppedOther, 'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull, 'dropped.queue_full' := DroppedQueueFull,
@ -75,7 +75,7 @@
'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'queued' := Queued, 'queuing' := Queued,
'sent' := Sent, 'sent' := Sent,
'sent.exception' := SentExcpt, 'sent.exception' := SentExcpt,
'sent.failed' := SentFailed, 'sent.failed' := SentFailed,

View File

@ -206,7 +206,7 @@ info_example_basic(webhook) ->
worker_pool_size => 1, worker_pool_size => 1,
health_check_interval => 15000, health_check_interval => 15000,
auto_restart_interval => 15000, auto_restart_interval => 15000,
query_mode => sync, query_mode => async,
async_inflight_window => 100, async_inflight_window => 100,
enable_queue => true, enable_queue => true,
max_queue_bytes => 1024 * 1024 * 1024 max_queue_bytes => 1024 * 1024 * 1024
@ -672,7 +672,7 @@ format_resp(
format_metrics(#{ format_metrics(#{
counters := #{ counters := #{
'batched' := Batched, 'batching' := Batched,
'dropped' := Dropped, 'dropped' := Dropped,
'dropped.other' := DroppedOther, 'dropped.other' := DroppedOther,
'dropped.queue_full' := DroppedQueueFull, 'dropped.queue_full' := DroppedQueueFull,
@ -680,7 +680,7 @@ format_metrics(#{
'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped, 'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched, 'matched' := Matched,
'queued' := Queued, 'queuing' := Queued,
'sent' := Sent, 'sent' := Sent,
'sent.exception' := SentExcpt, 'sent.exception' := SentExcpt,
'sent.failed' := SentFailed, 'sent.failed' := SentFailed,

View File

@ -102,7 +102,7 @@ fields(bridges) ->
] ++ ee_fields_bridges(); ] ++ ee_fields_bridges();
fields("metrics") -> fields("metrics") ->
[ [
{"batched", mk(integer(), #{desc => ?DESC("metric_batched")})}, {"batching", mk(integer(), #{desc => ?DESC("metric_batching")})},
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
{"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})}, {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
{"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})}, {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
@ -113,7 +113,7 @@ fields("metrics") ->
{"dropped.resource_stopped", {"dropped.resource_stopped",
mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})},
{"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})},
{"queued", mk(integer(), #{desc => ?DESC("metric_queued")})}, {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})},
{"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})},
{"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})}, {"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})},
{"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},

View File

@ -31,7 +31,8 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_query_async/4, on_query_async/4,
on_get_status/2 on_get_status/2,
reply_delegator/2
]). ]).
-type url() :: emqx_http_lib:uri_map(). -type url() :: emqx_http_lib:uri_map().
@ -378,9 +379,18 @@ on_query_async(
Method, Method,
NRequest, NRequest,
Timeout, Timeout,
ReplyFunAndArgs {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
). ).
reply_delegator(ReplyFunAndArgs, Result) ->
case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout ->
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
_ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end.
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of case do_get_status(PoolName, Timeout) of
true -> true ->

View File

@ -110,7 +110,7 @@
list_group_instances/1 list_group_instances/1
]). ]).
-export([inc_received/1]). -export([inc_received/1, apply_reply_fun/2]).
-optional_callbacks([ -optional_callbacks([
on_query/3, on_query/3,
@ -441,6 +441,12 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
Error -> Error Error -> Error
end. end.
apply_reply_fun({F, A}, Result) when is_function(F) ->
_ = erlang:apply(F, A ++ [Result]),
ok;
apply_reply_fun(From, Result) ->
gen_server:reply(From, Result).
%% ================================================================================= %% =================================================================================
inc_received(ResId) -> inc_received(ResId) ->

View File

@ -132,8 +132,8 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
'matched', 'matched',
'sent', 'sent',
'dropped', 'dropped',
'queued', 'queuing',
'batched', 'batching',
'sent.success', 'sent.success',
'sent.failed', 'sent.failed',
'sent.exception', 'sent.exception',

View File

@ -131,7 +131,7 @@ init({Id, Index, Opts}) ->
false -> false ->
undefined undefined
end, end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
ok = inflight_new(Name), ok = inflight_new(Name),
St = #{ St = #{
id => Id, id => Id,
@ -273,12 +273,12 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S
drop_head(Id, Q) -> drop_head(Id, Q) ->
{Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
ok = replayq:ack(Q1, AckRef), ok = replayq:ack(Q1, AckRef),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1),
Q1. Q1.
query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
Acc1 = [?QUERY(From, Request) | Acc], Acc1 = [?QUERY(From, Request) | Acc],
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'),
St = St0#{acc := Acc1, acc_left := Left - 1}, St = St0#{acc := Acc1, acc_left := Left - 1},
case Left =< 1 of case Left =< 1 of
true -> flush(St); true -> flush(St);
@ -313,7 +313,7 @@ flush(
inflight_name => maps:get(name, St), inflight_name => maps:get(name, St),
inflight_window => maps:get(async_inflight_window, St) inflight_window => maps:get(async_inflight_window, St)
}, },
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)), emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)),
Result = call_query(configured, Id, Batch, QueryOpts), Result = call_query(configured, Id, Batch, QueryOpts),
St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller(Id, Result, Batch) of
@ -338,13 +338,13 @@ maybe_append_queue(Id, Q, Items) ->
{Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2), Dropped = length(Items2),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1 Q1
end, end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
replayq:append(Q2, Items). replayq:append(Q2, Items).
batch_reply_caller(Id, BatchResult, Batch) -> batch_reply_caller(Id, BatchResult, Batch) ->

View File

@ -76,7 +76,7 @@ auto_restart_interval(_) -> undefined.
query_mode(type) -> enum([sync, async]); query_mode(type) -> enum([sync, async]);
query_mode(desc) -> ?DESC("query_mode"); query_mode(desc) -> ?DESC("query_mode");
query_mode(default) -> sync; query_mode(default) -> async;
query_mode(required) -> false; query_mode(required) -> false;
query_mode(_) -> undefined. query_mode(_) -> undefined.

View File

@ -61,7 +61,7 @@ values(post) ->
enable_batch => false, enable_batch => false,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => async,
enable_queue => false, enable_queue => false,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_queue_bytes => ?DEFAULT_QUEUE_SIZE
} }