fix(resource_metrics): avoid detaching handler on crashes
Fixes https://emqx.atlassian.net/browse/EMQX-11821
This commit is contained in:
parent
cb8c84098e
commit
6410f5a717
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
-module(emqx_resource_metrics).
|
-module(emqx_resource_metrics).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
events/0,
|
events/0,
|
||||||
install_telemetry_handler/1,
|
install_telemetry_handler/1,
|
||||||
|
@ -118,6 +120,52 @@ handle_telemetry_event(
|
||||||
_Metadata = #{resource_id := ID},
|
_Metadata = #{resource_id := ID},
|
||||||
_HandlerConfig
|
_HandlerConfig
|
||||||
) ->
|
) ->
|
||||||
|
try
|
||||||
|
handle_counter_telemetry_event(Event, ID, Val)
|
||||||
|
catch
|
||||||
|
Kind:Reason:Stacktrace ->
|
||||||
|
%% We catch errors to avoid detaching the telemetry handler function.
|
||||||
|
%% When restarting a resource while it's under load, there might be transient
|
||||||
|
%% failures while the metrics are not yet created.
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "handle_resource_metrics_failed",
|
||||||
|
hint => "transient failures may occur when restarting a resource",
|
||||||
|
kind => Kind,
|
||||||
|
reason => Reason,
|
||||||
|
stacktrace => Stacktrace,
|
||||||
|
resource_id => ID,
|
||||||
|
event => Event
|
||||||
|
}),
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
handle_telemetry_event(
|
||||||
|
[?TELEMETRY_PREFIX, Event],
|
||||||
|
_Measurements = #{gauge_set := Val},
|
||||||
|
_Metadata = #{resource_id := ID, worker_id := WorkerID},
|
||||||
|
_HandlerConfig
|
||||||
|
) ->
|
||||||
|
try
|
||||||
|
handle_gauge_telemetry_event(Event, ID, WorkerID, Val)
|
||||||
|
catch
|
||||||
|
Kind:Reason:Stacktrace ->
|
||||||
|
%% We catch errors to avoid detaching the telemetry handler function.
|
||||||
|
%% When restarting a resource while it's under load, there might be transient
|
||||||
|
%% failures while the metrics are not yet created.
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "handle_resource_metrics_failed",
|
||||||
|
hint => "transient failures may occur when restarting a resource",
|
||||||
|
kind => Kind,
|
||||||
|
reason => Reason,
|
||||||
|
stacktrace => Stacktrace,
|
||||||
|
resource_id => ID,
|
||||||
|
event => Event
|
||||||
|
}),
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
handle_counter_telemetry_event(Event, ID, Val) ->
|
||||||
case Event of
|
case Event of
|
||||||
dropped_other ->
|
dropped_other ->
|
||||||
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
|
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
|
||||||
|
@ -154,13 +202,9 @@ handle_telemetry_event(
|
||||||
emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val);
|
emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val);
|
||||||
_ ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
end;
|
end.
|
||||||
handle_telemetry_event(
|
|
||||||
[?TELEMETRY_PREFIX, Event],
|
handle_gauge_telemetry_event(Event, ID, WorkerID, Val) ->
|
||||||
_Measurements = #{gauge_set := Val},
|
|
||||||
_Metadata = #{resource_id := ID, worker_id := WorkerID},
|
|
||||||
_HandlerConfig
|
|
||||||
) ->
|
|
||||||
case Event of
|
case Event of
|
||||||
inflight ->
|
inflight ->
|
||||||
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
|
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
|
||||||
|
@ -168,9 +212,7 @@ handle_telemetry_event(
|
||||||
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
|
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
|
||||||
_ ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
end;
|
end.
|
||||||
handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% Gauges (value can go both up and down):
|
%% Gauges (value can go both up and down):
|
||||||
%% --------------------------------------
|
%% --------------------------------------
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
||||||
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
|
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
|
||||||
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
|
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
|
||||||
|
-define(TELEMETRY_PREFIX, emqx, resource).
|
||||||
|
|
||||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
|
@ -3006,6 +3007,36 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_telemetry_handler_crash(_Config) ->
|
||||||
|
%% Check that a crash while handling a telemetry event, such as when a busy resource
|
||||||
|
%% is restarted and its metrics are not recreated while handling an increment, does
|
||||||
|
%% not lead to the handler being uninstalled.
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
NonExistentId = <<"I-dont-exist">>,
|
||||||
|
WorkerId = 1,
|
||||||
|
HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]),
|
||||||
|
?assertMatch([_ | _], HandlersBefore),
|
||||||
|
lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()),
|
||||||
|
emqx_common_test_helpers:with_mock(
|
||||||
|
emqx_metrics_worker,
|
||||||
|
set_gauge,
|
||||||
|
fun(_Name, _Id, _WorkerId, _Metric, _Val) ->
|
||||||
|
error(random_crash)
|
||||||
|
end,
|
||||||
|
fun() ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns()
|
||||||
|
)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -3235,3 +3266,25 @@ do_wait_until_all_marked_as_retriable(NumExpected, Seen) ->
|
||||||
})
|
})
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
counter_metric_inc_fns() ->
|
||||||
|
Mod = emqx_resource_metrics,
|
||||||
|
[
|
||||||
|
fun Mod:Fn/1
|
||||||
|
|| {Fn, 1} <- Mod:module_info(functions),
|
||||||
|
case string:find(atom_to_list(Fn), "_inc", trailing) of
|
||||||
|
"_inc" -> true;
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
].
|
||||||
|
|
||||||
|
gauge_metric_set_fns() ->
|
||||||
|
Mod = emqx_resource_metrics,
|
||||||
|
[
|
||||||
|
fun Mod:Fn/3
|
||||||
|
|| {Fn, 3} <- Mod:module_info(functions),
|
||||||
|
case string:find(atom_to_list(Fn), "_set", trailing) of
|
||||||
|
"_set" -> true;
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
].
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where restarting a busy data integration could lead to data integration metrics to stop being collected.
|
Loading…
Reference in New Issue