From b45f3de8db797f466e0392f4323b63a06d3e00b4 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 2 Sep 2022 12:41:14 +0800 Subject: [PATCH] refactor(resource): rename metrics batched,queued -> batching,queuing --- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 6 +++--- apps/emqx_bridge/include/emqx_bridge.hrl | 8 ++++---- apps/emqx_bridge/src/emqx_bridge_api.erl | 6 +++--- apps/emqx_bridge/src/schema/emqx_bridge_schema.erl | 4 ++-- apps/emqx_connector/src/emqx_connector_http.erl | 14 ++++++++++++-- apps/emqx_resource/src/emqx_resource.erl | 8 +++++++- apps/emqx_resource/src/emqx_resource_manager.erl | 4 ++-- apps/emqx_resource/src/emqx_resource_worker.erl | 12 ++++++------ .../src/schema/emqx_resource_schema.erl | 2 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl | 2 +- 10 files changed, 41 insertions(+), 25 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index 06cc41a91..4f079c738 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -78,7 +78,7 @@ emqx_bridge_schema { } } - metric_batched { + metric_batching { desc { en: """Count of messages that are currently accumulated in memory waiting for sending in one batch.""" zh: """当前积压在内存里,等待批量发送的消息个数""" @@ -161,9 +161,9 @@ emqx_bridge_schema { } } - metric_queued { + metric_queuing { desc { - en: """Count of messages that are currently queued.""" + en: """Count of messages that are currently queuing.""" zh: """当前被缓存到磁盘队列的消息个数。""" } label: { diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index bb8ee6e29..9886a1a62 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -25,7 +25,7 @@ Rcvd ), #{ - 'batched' => Batched, + 'batching' => Batched, 'dropped' => Dropped, 'dropped.other' => DroppedOther, 'dropped.queue_full' => DroppedQueueFull, @@ -33,7 +33,7 @@ 'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_stopped' => DroppedResourceStopped, 'matched' => Matched, - 'queued' => Queued, + 'queuing' => Queued, 'sent' => Sent, 'sent.exception' => SentExcpt, 'sent.failed' => SentFailed, @@ -67,7 +67,7 @@ Rcvd ), #{ - 'batched' := Batched, + 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, @@ -75,7 +75,7 @@ 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, - 'queued' := Queued, + 'queuing' := Queued, 'sent' := Sent, 'sent.exception' := SentExcpt, 'sent.failed' := SentFailed, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ddffcb79f..9ae6560af 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -206,7 +206,7 @@ info_example_basic(webhook) -> worker_pool_size => 1, health_check_interval => 15000, auto_restart_interval => 15000, - query_mode => sync, + query_mode => async, async_inflight_window => 100, enable_queue => true, max_queue_bytes => 1024 * 1024 * 1024 @@ -672,7 +672,7 @@ format_resp( format_metrics(#{ counters := #{ - 'batched' := Batched, + 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, @@ -680,7 +680,7 @@ format_metrics(#{ 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, - 'queued' := Queued, + 'queuing' := Queued, 'sent' := Sent, 'sent.exception' := SentExcpt, 'sent.failed' := SentFailed, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index f55ac840e..9180ec38a 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -102,7 +102,7 @@ fields(bridges) -> ] ++ ee_fields_bridges(); fields("metrics") -> [ - {"batched", mk(integer(), #{desc => ?DESC("metric_batched")})}, + {"batching", mk(integer(), #{desc => ?DESC("metric_batching")})}, {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})}, {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})}, @@ -113,7 +113,7 @@ fields("metrics") -> {"dropped.resource_stopped", mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, - {"queued", mk(integer(), #{desc => ?DESC("metric_queued")})}, + {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})}, {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, {"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})}, {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 3eb55c9a4..5f33714ef 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -31,7 +31,8 @@ on_stop/2, on_query/3, on_query_async/4, - on_get_status/2 + on_get_status/2, + reply_delegator/2 ]). -type url() :: emqx_http_lib:uri_map(). @@ -378,9 +379,18 @@ on_query_async( Method, NRequest, Timeout, - ReplyFunAndArgs + {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]} ). +reply_delegator(ReplyFunAndArgs, Result) -> + case Result of + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + Result1 = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + _ -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + end. + on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of true -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index c1d500e8b..309c34195 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -110,7 +110,7 @@ list_group_instances/1 ]). --export([inc_received/1]). +-export([inc_received/1, apply_reply_fun/2]). -optional_callbacks([ on_query/3, @@ -441,6 +441,12 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> Error -> Error end. +apply_reply_fun({F, A}, Result) when is_function(F) -> + _ = erlang:apply(F, A ++ [Result]), + ok; +apply_reply_fun(From, Result) -> + gen_server:reply(From, Result). + %% ================================================================================= inc_received(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 2581a3001..b12fc3a44 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -132,8 +132,8 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'matched', 'sent', 'dropped', - 'queued', - 'batched', + 'queuing', + 'batching', 'sent.success', 'sent.failed', 'sent.exception', diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a0632a761..4de8bd6ef 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -131,7 +131,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), ok = inflight_new(Name), St = #{ id => Id, @@ -273,12 +273,12 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S drop_head(Id, Q) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), ok = replayq:ack(Q1, AckRef), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), Q1. query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); @@ -313,7 +313,7 @@ flush( inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)), Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of @@ -338,13 +338,13 @@ maybe_append_queue(Id, Q, Items) -> {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), replayq:append(Q2, Items). batch_reply_caller(Id, BatchResult, Batch) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 9e54c8a7b..df284bbe8 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -76,7 +76,7 @@ auto_restart_interval(_) -> undefined. query_mode(type) -> enum([sync, async]); query_mode(desc) -> ?DESC("query_mode"); -query_mode(default) -> sync; +query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index f76eb388e..616842292 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -61,7 +61,7 @@ values(post) -> enable_batch => false, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, - query_mode => sync, + query_mode => async, enable_queue => false, max_queue_bytes => ?DEFAULT_QUEUE_SIZE }