From 6410f5a717d8d7acd2d6e3ec81b685c4aaeaea38 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 Jan 2024 10:21:27 -0300 Subject: [PATCH] fix(resource_metrics): avoid detaching handler on crashes Fixes https://emqx.atlassian.net/browse/EMQX-11821 --- .../src/emqx_resource_metrics.erl | 62 ++++++++++++++++--- .../test/emqx_resource_SUITE.erl | 53 ++++++++++++++++ changes/ce/fix-12404.en.md | 1 + 3 files changed, 106 insertions(+), 10 deletions(-) create mode 100644 changes/ce/fix-12404.en.md diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index df28d893b..7480be0cc 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -16,6 +16,8 @@ -module(emqx_resource_metrics). +-include_lib("emqx/include/logger.hrl"). + -export([ events/0, install_telemetry_handler/1, @@ -118,6 +120,52 @@ handle_telemetry_event( _Metadata = #{resource_id := ID}, _HandlerConfig ) -> + try + handle_counter_telemetry_event(Event, ID, Val) + catch + Kind:Reason:Stacktrace -> + %% We catch errors to avoid detaching the telemetry handler function. + %% When restarting a resource while it's under load, there might be transient + %% failures while the metrics are not yet created. + ?SLOG(warning, #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }), + ok + end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_set := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + try + handle_gauge_telemetry_event(Event, ID, WorkerID, Val) + catch + Kind:Reason:Stacktrace -> + %% We catch errors to avoid detaching the telemetry handler function. + %% When restarting a resource while it's under load, there might be transient + %% failures while the metrics are not yet created. + ?SLOG(warning, #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }), + ok + end; +handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> + ok. + +handle_counter_telemetry_event(Event, ID, Val) -> case Event of dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), @@ -154,13 +202,9 @@ handle_telemetry_event( emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val); _ -> ok - end; -handle_telemetry_event( - [?TELEMETRY_PREFIX, Event], - _Measurements = #{gauge_set := Val}, - _Metadata = #{resource_id := ID, worker_id := WorkerID}, - _HandlerConfig -) -> + end. + +handle_gauge_telemetry_event(Event, ID, WorkerID, Val) -> case Event of inflight -> emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val); @@ -168,9 +212,7 @@ handle_telemetry_event( emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val); _ -> ok - end; -handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> - ok. + end. %% Gauges (value can go both up and down): %% -------------------------------------- diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2462123d6..d46fc6323 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -28,6 +28,7 @@ -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). +-define(TELEMETRY_PREFIX, emqx, resource). -import(emqx_common_test_helpers, [on_exit/1]). @@ -3006,6 +3007,36 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) -> end ). +t_telemetry_handler_crash(_Config) -> + %% Check that a crash while handling a telemetry event, such as when a busy resource + %% is restarted and its metrics are not recreated while handling an increment, does + %% not lead to the handler being uninstalled. + ?check_trace( + begin + NonExistentId = <<"I-dont-exist">>, + WorkerId = 1, + HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]), + ?assertMatch([_ | _], HandlersBefore), + lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()), + emqx_common_test_helpers:with_mock( + emqx_metrics_worker, + set_gauge, + fun(_Name, _Id, _WorkerId, _Metric, _Val) -> + error(random_crash) + end, + fun() -> + lists:foreach( + fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns() + ) + end + ), + ?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])), + ok + end, + [] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -3235,3 +3266,25 @@ do_wait_until_all_marked_as_retriable(NumExpected, Seen) -> }) end end. + +counter_metric_inc_fns() -> + Mod = emqx_resource_metrics, + [ + fun Mod:Fn/1 + || {Fn, 1} <- Mod:module_info(functions), + case string:find(atom_to_list(Fn), "_inc", trailing) of + "_inc" -> true; + _ -> false + end + ]. + +gauge_metric_set_fns() -> + Mod = emqx_resource_metrics, + [ + fun Mod:Fn/3 + || {Fn, 3} <- Mod:module_info(functions), + case string:find(atom_to_list(Fn), "_set", trailing) of + "_set" -> true; + _ -> false + end + ]. diff --git a/changes/ce/fix-12404.en.md b/changes/ce/fix-12404.en.md new file mode 100644 index 000000000..8706416a8 --- /dev/null +++ b/changes/ce/fix-12404.en.md @@ -0,0 +1 @@ +Fixed an issue where restarting a busy data integration could lead to data integration metrics to stop being collected.