diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d0d93a701..1e0f0b45d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -281,8 +281,10 @@ query(ResId, Request, Opts) -> {ok, _Group, #{query_mode := QM, error := Error}} -> case {QM, Error} of {_, unhealthy_target} -> + emqx_resource_metrics:dropped_resource_stopped_inc(ResId), ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); {_, {unhealthy_target, _Message}} -> + emqx_resource_metrics:dropped_resource_stopped_inc(ResId), ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); {simple_async, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 77e680933..d4bc9a8a7 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -899,6 +899,29 @@ t_healthy(_) -> end ). +t_unhealthy_target(_) -> + HealthCheckError = {unhealthy_target, "some message"}, + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, health_check_error => {msg, HealthCheckError}} + ) + ), + ?assertEqual( + {ok, disconnected}, + emqx_resource:health_check(?ID) + ), + ?assertMatch( + {ok, _Group, #{error := HealthCheckError}}, + emqx_resource_manager:lookup(?ID) + ), + %% messages are dropped when bridge is unhealthy + emqx_resource:query(?ID, message), + ?assertEqual(1, emqx_resource_metrics:dropped_resource_stopped_get(?ID)). + t_stop_start(_) -> ?check_trace( begin diff --git a/changes/ce/fix-11534.en.md b/changes/ce/fix-11534.en.md new file mode 100644 index 000000000..15c89f392 --- /dev/null +++ b/changes/ce/fix-11534.en.md @@ -0,0 +1 @@ +Fixed increment on data bridge statistics when bridge is unhealthy. Now, messages sent to unhealthy bridges are being counted as dropped messages.