feat: emit telemetry events for all resource worker metrics

This commit is contained in:
Thales Macedo Garitezi 2022-10-13 11:04:47 -03:00
parent a6ba8494d8
commit 1b2b629cdd
5 changed files with 148 additions and 41 deletions

View File

@ -9,7 +9,8 @@
stdlib, stdlib,
gproc, gproc,
jsx, jsx,
emqx emqx,
telemetry
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -23,9 +23,18 @@
-export([start/2, stop/1]). -export([start/2, stop/1]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
%% since the handler is generic and executed in the process
%% emitting the event, we need to install only a single handler
%% for the whole app.
TelemetryHandlerID = telemetry_handler_id(),
ok = emqx_resource_metrics:install_telemetry_handler(TelemetryHandlerID),
emqx_resource_sup:start_link(). emqx_resource_sup:start_link().
stop(_State) -> stop(_State) ->
TelemetryHandlerID = telemetry_handler_id(),
ok = emqx_resource_metrics:uninstall_telemetry_handler(TelemetryHandlerID),
ok. ok.
%% internal functions %% internal functions
telemetry_handler_id() ->
<<"emqx-resource-app-telemetry-handler">>.

View File

@ -16,6 +16,13 @@
-module(emqx_resource_metrics). -module(emqx_resource_metrics).
-export([
events/0,
install_telemetry_handler/1,
uninstall_telemetry_handler/1,
handle_telemetry_event/4
]).
-export([ -export([
batching_change/2, batching_change/2,
batching_get/1, batching_get/1,
@ -62,6 +69,91 @@
]). ]).
-define(RES_METRICS, resource_metrics). -define(RES_METRICS, resource_metrics).
-define(TELEMETRY_PREFIX, emqx, resource).
-spec events() -> [telemetry:event_name()].
events() ->
[
[?TELEMETRY_PREFIX, Event]
|| Event <- [
batching,
dropped_other,
dropped_queue_full,
dropped_queue_not_enabled,
dropped_resource_not_found,
dropped_resource_stopped,
failed,
inflight,
matched,
queuing,
retried_failed,
retried_success,
success
]
].
-spec install_telemetry_handler(binary()) -> ok.
install_telemetry_handler(HandlerID) ->
_ = telemetry:attach_many(
HandlerID,
events(),
fun ?MODULE:handle_telemetry_event/4,
_HandlerConfig = #{}
),
ok.
-spec uninstall_telemetry_handler(binary()) -> ok.
uninstall_telemetry_handler(HandlerID) ->
_ = telemetry:detach(HandlerID),
ok.
handle_telemetry_event(
[?TELEMETRY_PREFIX, Event],
_Measurements = #{counter_inc := Val},
_Metadata = #{resource_id := ID},
_HandlerConfig
) ->
case Event of
batching ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val);
dropped_other ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val);
dropped_queue_full ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val);
dropped_queue_not_enabled ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val);
dropped_resource_not_found ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val);
dropped_resource_stopped ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val);
failed ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
inflight ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val);
matched ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
queuing ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val);
retried_failed ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val);
retried_success ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val);
success ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val);
_ ->
ok
end;
handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
ok.
%% Gauges (value can go both up and down): %% Gauges (value can go both up and down):
%% -------------------------------------- %% --------------------------------------
@ -69,14 +161,14 @@
%% @doc Count of messages that are currently accumulated in memory waiting for %% @doc Count of messages that are currently accumulated in memory waiting for
%% being sent in one batch %% being sent in one batch
batching_change(ID, Val) -> batching_change(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val). telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}).
batching_get(ID) -> batching_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'batching'). emqx_metrics_worker:get(?RES_METRICS, ID, 'batching').
%% @doc Count of messages that are currently queuing. [Gauge] %% @doc Count of messages that are currently queuing. [Gauge]
queuing_change(ID, Val) -> queuing_change(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val). telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}).
queuing_get(ID) -> queuing_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing'). emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing').
@ -84,7 +176,7 @@ queuing_get(ID) ->
%% @doc Count of messages that were sent asynchronously but ACKs are not %% @doc Count of messages that were sent asynchronously but ACKs are not
%% received. [Gauge] %% received. [Gauge]
inflight_change(ID, Val) -> inflight_change(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val). telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}).
inflight_get(ID) -> inflight_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight'). emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight').
@ -94,120 +186,134 @@ inflight_get(ID) ->
%% @doc Count of messages dropped %% @doc Count of messages dropped
dropped_inc(ID) -> dropped_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped'). dropped_inc(ID, 1).
dropped_inc(ID, Val) -> dropped_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val). telemetry:execute([?TELEMETRY_PREFIX, dropped], #{counter_inc => Val}, #{resource_id => ID}).
dropped_get(ID) -> dropped_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped'). emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped').
%% @doc Count of messages dropped due to other reasons %% @doc Count of messages dropped due to other reasons
dropped_other_inc(ID) -> dropped_other_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other'). dropped_other_inc(ID, 1).
dropped_other_inc(ID, Val) -> dropped_other_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val). telemetry:execute([?TELEMETRY_PREFIX, dropped_other], #{counter_inc => Val}, #{
resource_id => ID
}).
dropped_other_get(ID) -> dropped_other_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other'). emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other').
%% @doc Count of messages dropped because the queue was full %% @doc Count of messages dropped because the queue was full
dropped_queue_full_inc(ID) -> dropped_queue_full_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full'). dropped_queue_full_inc(ID, 1).
dropped_queue_full_inc(ID, Val) -> dropped_queue_full_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val). telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_full], #{counter_inc => Val}, #{
resource_id => ID
}).
dropped_queue_full_get(ID) -> dropped_queue_full_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full').
%% @doc Count of messages dropped because the queue was not enabled %% @doc Count of messages dropped because the queue was not enabled
dropped_queue_not_enabled_inc(ID) -> dropped_queue_not_enabled_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled'). dropped_queue_not_enabled_inc(ID, 1).
dropped_queue_not_enabled_inc(ID, Val) -> dropped_queue_not_enabled_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val). telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{
resource_id => ID
}).
dropped_queue_not_enabled_get(ID) -> dropped_queue_not_enabled_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled').
%% @doc Count of messages dropped because the resource was not found %% @doc Count of messages dropped because the resource was not found
dropped_resource_not_found_inc(ID) -> dropped_resource_not_found_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found'). dropped_resource_not_found_inc(ID, 1).
dropped_resource_not_found_inc(ID, Val) -> dropped_resource_not_found_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val). telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_not_found], #{counter_inc => Val}, #{
resource_id => ID
}).
dropped_resource_not_found_get(ID) -> dropped_resource_not_found_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found'). emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found').
%% @doc Count of messages dropped because the resource was stopped %% @doc Count of messages dropped because the resource was stopped
dropped_resource_stopped_inc(ID) -> dropped_resource_stopped_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped'). dropped_resource_stopped_inc(ID, 1).
dropped_resource_stopped_inc(ID, Val) -> dropped_resource_stopped_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val). telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_stopped], #{counter_inc => Val}, #{
resource_id => ID
}).
dropped_resource_stopped_get(ID) -> dropped_resource_stopped_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped'). emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped').
%% @doc Count of how many times this bridge has been matched and queried %% @doc Count of how many times this bridge has been matched and queried
matched_inc(ID) -> matched_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched'). matched_inc(ID, 1).
matched_inc(ID, Val) -> matched_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val). telemetry:execute([?TELEMETRY_PREFIX, matched], #{counter_inc => Val}, #{resource_id => ID}).
matched_get(ID) -> matched_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'matched'). emqx_metrics_worker:get(?RES_METRICS, ID, 'matched').
%% @doc The number of times message sends have been retried %% @doc The number of times message sends have been retried
retried_inc(ID) -> retried_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried'). retried_inc(ID, 1).
retried_inc(ID, Val) -> retried_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val). telemetry:execute([?TELEMETRY_PREFIX, retried], #{counter_inc => Val}, #{resource_id => ID}).
retried_get(ID) -> retried_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'retried'). emqx_metrics_worker:get(?RES_METRICS, ID, 'retried').
%% @doc Count of message sends that have failed %% @doc Count of message sends that have failed
failed_inc(ID) -> failed_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed'). failed_inc(ID, 1).
failed_inc(ID, Val) -> failed_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val). telemetry:execute([?TELEMETRY_PREFIX, failed], #{counter_inc => Val}, #{resource_id => ID}).
failed_get(ID) -> failed_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'failed'). emqx_metrics_worker:get(?RES_METRICS, ID, 'failed').
%%% @doc Count of message sends that have failed after having been retried %%% @doc Count of message sends that have failed after having been retried
retried_failed_inc(ID) -> retried_failed_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed'). retried_failed_inc(ID, 1).
retried_failed_inc(ID, Val) -> retried_failed_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val). telemetry:execute([?TELEMETRY_PREFIX, retried_failed], #{counter_inc => Val}, #{
resource_id => ID
}).
retried_failed_get(ID) -> retried_failed_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed'). emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed').
%% @doc Count messages that were sucessfully sent after at least one retry %% @doc Count messages that were sucessfully sent after at least one retry
retried_success_inc(ID) -> retried_success_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success'). retried_success_inc(ID, 1).
retried_success_inc(ID, Val) -> retried_success_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val). telemetry:execute([?TELEMETRY_PREFIX, retried_success], #{counter_inc => Val}, #{
resource_id => ID
}).
retried_success_get(ID) -> retried_success_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success'). emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success').
%% @doc Count of messages that have been sent successfully %% @doc Count of messages that have been sent successfully
success_inc(ID) -> success_inc(ID) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'success'). success_inc(ID, 1).
success_inc(ID, Val) -> success_inc(ID, Val) ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val). telemetry:execute([?TELEMETRY_PREFIX, success], #{counter_inc => Val}, #{resource_id => ID}).
success_get(ID) -> success_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'success'). emqx_metrics_worker:get(?RES_METRICS, ID, 'success').

View File

@ -376,17 +376,14 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when
true; true;
handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
emqx_resource_metrics:dropped_inc(Id),
emqx_resource_metrics:dropped_resource_not_found_inc(Id), emqx_resource_metrics:dropped_resource_not_found_inc(Id),
BlockWorker; BlockWorker;
handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
emqx_resource_metrics:dropped_inc(Id),
emqx_resource_metrics:dropped_resource_stopped_inc(Id), emqx_resource_metrics:dropped_resource_stopped_inc(Id),
BlockWorker; BlockWorker;
handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) ->
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
emqx_resource_metrics:dropped_inc(Id),
emqx_resource_metrics:dropped_other_inc(Id), emqx_resource_metrics:dropped_other_inc(Id),
BlockWorker; BlockWorker;
handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) ->
@ -546,7 +543,6 @@ estimate_size(QItem) ->
size(queue_item_marshaller(QItem)). size(queue_item_marshaller(QItem)).
maybe_append_queue(Id, undefined, _Items) -> maybe_append_queue(Id, undefined, _Items) ->
emqx_resource_metrics:dropped_inc(Id),
emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), emqx_resource_metrics:dropped_queue_not_enabled_inc(Id),
undefined; undefined;
maybe_append_queue(Id, Q, Items) -> maybe_append_queue(Id, Q, Items) ->
@ -560,7 +556,6 @@ maybe_append_queue(Id, Q, Items) ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2), Dropped = length(Items2),
emqx_resource_metrics:queuing_change(Id, -Dropped), emqx_resource_metrics:queuing_change(Id, -Dropped),
emqx_resource_metrics:dropped_inc(Id),
emqx_resource_metrics:dropped_queue_full_inc(Id), emqx_resource_metrics:dropped_queue_full_inc(Id),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1 Q1
@ -641,16 +636,12 @@ inflight_drop(Name, Ref) ->
%%============================================================================== %%==============================================================================
inc_sent_failed(Id, true) -> inc_sent_failed(Id, _HasSent = true) ->
emqx_resource_metrics:failed_inc(Id),
emqx_resource_metrics:retried_inc(Id),
emqx_resource_metrics:retried_failed_inc(Id); emqx_resource_metrics:retried_failed_inc(Id);
inc_sent_failed(Id, _HasSent) -> inc_sent_failed(Id, _HasSent) ->
emqx_resource_metrics:failed_inc(Id). emqx_resource_metrics:failed_inc(Id).
inc_sent_success(Id, true) -> inc_sent_success(Id, _HasSent = true) ->
emqx_resource_metrics:success_inc(Id),
emqx_resource_metrics:retried_inc(Id),
emqx_resource_metrics:retried_success_inc(Id); emqx_resource_metrics:retried_success_inc(Id);
inc_sent_success(Id, _HasSent) -> inc_sent_success(Id, _HasSent) ->
emqx_resource_metrics:success_inc(Id). emqx_resource_metrics:success_inc(Id).

View File

@ -90,14 +90,14 @@ on_start(InstId, Config) ->
end. end.
on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
with_log_at_error( _ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
#{ #{
msg => "failed_to_delete_kafka_producer", msg => "failed_to_delete_kafka_producer",
client_id => ClientID client_id => ClientID
} }
), ),
with_log_at_error( _ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, fun() -> wolff:stop_and_delete_supervised_client(ClientID) end,
#{ #{
msg => "failed_to_delete_kafka_client", msg => "failed_to_delete_kafka_client",