diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index b688e3c11..38dac5449 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -9,7 +9,8 @@ stdlib, gproc, jsx, - emqx + emqx, + telemetry ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_resource/src/emqx_resource_app.erl b/apps/emqx_resource/src/emqx_resource_app.erl index 72838a8c1..51e7b2556 100644 --- a/apps/emqx_resource/src/emqx_resource_app.erl +++ b/apps/emqx_resource/src/emqx_resource_app.erl @@ -23,9 +23,18 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + %% since the handler is generic and executed in the process + %% emitting the event, we need to install only a single handler + %% for the whole app. + TelemetryHandlerID = telemetry_handler_id(), + ok = emqx_resource_metrics:install_telemetry_handler(TelemetryHandlerID), emqx_resource_sup:start_link(). stop(_State) -> + TelemetryHandlerID = telemetry_handler_id(), + ok = emqx_resource_metrics:uninstall_telemetry_handler(TelemetryHandlerID), ok. %% internal functions +telemetry_handler_id() -> + <<"emqx-resource-app-telemetry-handler">>. diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 96d955db0..e6637b68f 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -16,6 +16,13 @@ -module(emqx_resource_metrics). +-export([ + events/0, + install_telemetry_handler/1, + uninstall_telemetry_handler/1, + handle_telemetry_event/4 +]). + -export([ batching_change/2, batching_get/1, @@ -62,6 +69,91 @@ ]). -define(RES_METRICS, resource_metrics). +-define(TELEMETRY_PREFIX, emqx, resource). + +-spec events() -> [telemetry:event_name()]. +events() -> + [ + [?TELEMETRY_PREFIX, Event] + || Event <- [ + batching, + dropped_other, + dropped_queue_full, + dropped_queue_not_enabled, + dropped_resource_not_found, + dropped_resource_stopped, + failed, + inflight, + matched, + queuing, + retried_failed, + retried_success, + success + ] + ]. + +-spec install_telemetry_handler(binary()) -> ok. +install_telemetry_handler(HandlerID) -> + _ = telemetry:attach_many( + HandlerID, + events(), + fun ?MODULE:handle_telemetry_event/4, + _HandlerConfig = #{} + ), + ok. + +-spec uninstall_telemetry_handler(binary()) -> ok. +uninstall_telemetry_handler(HandlerID) -> + _ = telemetry:detach(HandlerID), + ok. + +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{counter_inc := Val}, + _Metadata = #{resource_id := ID}, + _HandlerConfig +) -> + case Event of + batching -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val); + dropped_other -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val); + dropped_queue_full -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val); + dropped_queue_not_enabled -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val); + dropped_resource_not_found -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val); + dropped_resource_stopped -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val); + failed -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val); + inflight -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val); + matched -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val); + queuing -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val); + retried_failed -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val); + retried_success -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val), + emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val); + success -> + emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val); + _ -> + ok + end; +handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> + ok. %% Gauges (value can go both up and down): %% -------------------------------------- @@ -69,14 +161,14 @@ %% @doc Count of messages that are currently accumulated in memory waiting for %% being sent in one batch batching_change(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val). + telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}). batching_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). %% @doc Count of messages that are currently queuing. [Gauge] queuing_change(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val). + telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}). queuing_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). @@ -84,7 +176,7 @@ queuing_get(ID) -> %% @doc Count of messages that were sent asynchronously but ACKs are not %% received. [Gauge] inflight_change(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val). + telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}). inflight_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). @@ -94,120 +186,134 @@ inflight_get(ID) -> %% @doc Count of messages dropped dropped_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped'). + dropped_inc(ID, 1). dropped_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped], #{counter_inc => Val}, #{resource_id => ID}). dropped_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped'). %% @doc Count of messages dropped due to other reasons dropped_other_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other'). + dropped_other_inc(ID, 1). dropped_other_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_other], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_other_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). %% @doc Count of messages dropped because the queue was full dropped_queue_full_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full'). + dropped_queue_full_inc(ID, 1). dropped_queue_full_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_full], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_queue_full_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). %% @doc Count of messages dropped because the queue was not enabled dropped_queue_not_enabled_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled'). + dropped_queue_not_enabled_inc(ID, 1). dropped_queue_not_enabled_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_queue_not_enabled_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). %% @doc Count of messages dropped because the resource was not found dropped_resource_not_found_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found'). + dropped_resource_not_found_inc(ID, 1). dropped_resource_not_found_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_not_found], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_resource_not_found_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found'). %% @doc Count of messages dropped because the resource was stopped dropped_resource_stopped_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped'). + dropped_resource_stopped_inc(ID, 1). dropped_resource_stopped_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val). + telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_stopped], #{counter_inc => Val}, #{ + resource_id => ID + }). dropped_resource_stopped_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped'). %% @doc Count of how many times this bridge has been matched and queried matched_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched'). + matched_inc(ID, 1). matched_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val). + telemetry:execute([?TELEMETRY_PREFIX, matched], #{counter_inc => Val}, #{resource_id => ID}). matched_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). %% @doc The number of times message sends have been retried retried_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried'). + retried_inc(ID, 1). retried_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val). + telemetry:execute([?TELEMETRY_PREFIX, retried], #{counter_inc => Val}, #{resource_id => ID}). retried_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'retried'). %% @doc Count of message sends that have failed failed_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed'). + failed_inc(ID, 1). failed_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val). + telemetry:execute([?TELEMETRY_PREFIX, failed], #{counter_inc => Val}, #{resource_id => ID}). failed_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'failed'). %%% @doc Count of message sends that have failed after having been retried retried_failed_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed'). + retried_failed_inc(ID, 1). retried_failed_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val). + telemetry:execute([?TELEMETRY_PREFIX, retried_failed], #{counter_inc => Val}, #{ + resource_id => ID + }). retried_failed_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed'). %% @doc Count messages that were sucessfully sent after at least one retry retried_success_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success'). + retried_success_inc(ID, 1). retried_success_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val). + telemetry:execute([?TELEMETRY_PREFIX, retried_success], #{counter_inc => Val}, #{ + resource_id => ID + }). retried_success_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success'). %% @doc Count of messages that have been sent successfully success_inc(ID) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'success'). + success_inc(ID, 1). success_inc(ID, Val) -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val). + telemetry:execute([?TELEMETRY_PREFIX, success], #{counter_inc => Val}, #{resource_id => ID}). success_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'success'). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 75a63d427..b309299f6 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -376,17 +376,14 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when true; handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_resource_not_found_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_resource_stopped_inc(Id), BlockWorker; handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_other_inc(Id), BlockWorker; handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> @@ -546,7 +543,6 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). maybe_append_queue(Id, undefined, _Items) -> - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), undefined; maybe_append_queue(Id, Q, Items) -> @@ -560,7 +556,6 @@ maybe_append_queue(Id, Q, Items) -> ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), emqx_resource_metrics:queuing_change(Id, -Dropped), - emqx_resource_metrics:dropped_inc(Id), emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 @@ -641,16 +636,12 @@ inflight_drop(Name, Ref) -> %%============================================================================== -inc_sent_failed(Id, true) -> - emqx_resource_metrics:failed_inc(Id), - emqx_resource_metrics:retried_inc(Id), +inc_sent_failed(Id, _HasSent = true) -> emqx_resource_metrics:retried_failed_inc(Id); inc_sent_failed(Id, _HasSent) -> emqx_resource_metrics:failed_inc(Id). -inc_sent_success(Id, true) -> - emqx_resource_metrics:success_inc(Id), - emqx_resource_metrics:retried_inc(Id), +inc_sent_success(Id, _HasSent = true) -> emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasSent) -> emqx_resource_metrics:success_inc(Id). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 0a03f582f..19f110bd9 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -90,14 +90,14 @@ on_start(InstId, Config) -> end. on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> - with_log_at_error( + _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, #{ msg => "failed_to_delete_kafka_producer", client_id => ClientID } ), - with_log_at_error( + _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, #{ msg => "failed_to_delete_kafka_client",