From b9d012e07247b91a04dfd64248ff059bbae3774a Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Mon, 2 Jan 2023 14:43:49 +0100 Subject: [PATCH] refactor(emqx_resource): ingress bridge counter Unify code paths for resource metrics by removing emqx_resource:inc_received/1 and adding emqx_resource_metrics:received_inc/1 & friends. --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 5 +---- apps/emqx_resource/src/emqx_resource_metrics.erl | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 6c73a14c0..522f15ccf 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -136,7 +136,7 @@ drop_bridge(Name) -> %% When use this bridge as a data source, ?MODULE:on_message_received will be called %% if the bridge received msgs from the remote broker. on_message_received(Msg, HookPoint, ResId) -> - emqx_resource:inc_received(ResId), + emqx_resource_metrics:received_inc(ResId), emqx:run_hook(HookPoint, [Msg]). %% =================================================================== diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 30934e0e7..aff66c287 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -111,7 +111,7 @@ list_group_instances/1 ]). --export([inc_received/1, apply_reply_fun/2]). +-export([apply_reply_fun/2]). -optional_callbacks([ on_query/3, @@ -467,8 +467,5 @@ apply_reply_fun(From, Result) -> %% ================================================================================= -inc_received(ResId) -> - emqx_metrics_worker:inc(?RES_METRICS, ResId, 'received'). - filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index ff764ab3c..64f014918 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -55,6 +55,9 @@ matched_inc/1, matched_inc/2, matched_get/1, + received_inc/1, + received_inc/2, + received_get/1, retried_inc/1, retried_inc/2, retried_get/1, @@ -87,6 +90,7 @@ events() -> inflight, matched, queuing, + received, retried_failed, retried_success, success @@ -134,6 +138,8 @@ handle_telemetry_event( emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); matched -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val); + received -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'received', Val); retried_failed -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val), @@ -309,6 +315,16 @@ matched_inc(ID, Val) -> matched_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). +%% @doc The number of messages that have been received from a bridge +received_inc(ID) -> + received_inc(ID, 1). + +received_inc(ID, Val) -> + telemetry:execute([?TELEMETRY_PREFIX, received], #{counter_inc => Val}, #{resource_id => ID}). + +received_get(ID) -> + emqx_metrics_worker:get(?RES_METRICS, ID, 'received'). + %% @doc The number of times message sends have been retried retried_inc(ID) -> retried_inc(ID, 1).