fix(rule_action): fix metrics for bridges returning `async_return`

Kafka Producer, when called asynchronously, will return
`{async_return, {ok, pid()}}`, which currently counts as an unknown failure.
This commit is contained in:
Thales Macedo Garitezi 2023-04-06 15:30:45 -03:00
parent cf71f9148e
commit 4c24b08244
2 changed files with 8 additions and 3 deletions

View File

@ -508,8 +508,6 @@ nested_put(Alias, Val, Columns0) ->
emqx_rule_maps:nested_put(Alias, Val, Columns).
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
inc_action_metrics(ok, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
@ -525,6 +523,10 @@ inc_action_metrics(R, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
end.
is_ok_result(ok) ->
true;
is_ok_result({async_return, R}) ->
is_ok_result(R);
is_ok_result(R) when is_tuple(R) ->
ok == erlang:element(1, R);
is_ok_result(_) ->

View File

@ -309,7 +309,7 @@ kafka_bridge_rest_api_helper(Config) ->
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
%% Create a rule that uses the bridge
{ok, 201, _Rule} = http_post(
{ok, 201, Rule} = http_post(
["rules"],
#{
<<"name">> => <<"kafka_bridge_rest_api_helper_rule">>,
@ -318,6 +318,7 @@ kafka_bridge_rest_api_helper(Config) ->
<<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">>
}
),
#{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
%% counters should be empty before
?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)),
@ -346,6 +347,8 @@ kafka_bridge_rest_api_helper(Config) ->
%% Check crucial counters and gauges
?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)),
?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)),
?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')),
?assertEqual(0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')),
?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),