From 4c24b08244b27afa59f25c8184024ba734d17f28 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 6 Apr 2023 15:30:45 -0300 Subject: [PATCH] fix(rule_action): fix metrics for bridges returning `async_return` Kafka Producer, when called asynchronously, will return `{async_return, {ok, pid()}}`, which currently counts as an unknown failure. --- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 6 ++++-- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 153832246..51491df53 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -508,8 +508,6 @@ nested_put(Alias, Val, Columns0) -> emqx_rule_maps:nested_put(Alias, Val, Columns). -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). -inc_action_metrics(ok, RuleId) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); inc_action_metrics({error, {recoverable_error, _}}, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> @@ -525,6 +523,10 @@ inc_action_metrics(R, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success') end. +is_ok_result(ok) -> + true; +is_ok_result({async_return, R}) -> + is_ok_result(R); is_ok_result(R) when is_tuple(R) -> ok == erlang:element(1, R); is_ok_result(_) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 9e32f818d..9b6ac05a7 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -309,7 +309,7 @@ kafka_bridge_rest_api_helper(Config) -> AtomsAfter = erlang:system_info(atom_count), ?assertEqual(AtomsBefore, AtomsAfter), %% Create a rule that uses the bridge - {ok, 201, _Rule} = http_post( + {ok, 201, Rule} = http_post( ["rules"], #{ <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>, @@ -318,6 +318,7 @@ kafka_bridge_rest_api_helper(Config) -> <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> } ), + #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]), %% counters should be empty before ?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)), @@ -346,6 +347,8 @@ kafka_bridge_rest_api_helper(Config) -> %% Check crucial counters and gauges ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)), ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')), + ?assertEqual(0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')), ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),