Merge pull request #13207 from thalesmg/republish-metrics-m-20240607

fix(rule actions): check `republish` action result for metrics
This commit is contained in:
Thales Macedo Garitezi 2024-06-11 13:29:58 -03:00 committed by GitHub
commit 686bcc8a48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 106 additions and 7 deletions

View File

@ -238,8 +238,18 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
payload = Payload,
timestamp = erlang:system_time(millisecond)
},
_ = emqx_broker:safe_publish(Msg),
emqx_metrics:inc_msg(Msg).
case emqx_broker:safe_publish(Msg) of
[_ | _] ->
emqx_metrics:inc_msg(Msg),
ok;
disconnect ->
error;
[] ->
%% Have to check previous logs to distinguish between schema validation
%% failure, no subscribers, blocked by authz, or anything else in the
%% `message.publish' hook evaluation.
error
end.
parse_simple_var(Data) when is_binary(Data) ->
emqx_template:parse(Data);

View File

@ -24,6 +24,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
-define(REPUBLISH_TOPIC, <<"rule_apply_test_SUITE">>).
all() ->
[
@ -129,7 +130,7 @@ republish_action() ->
<<"payload">> => <<"MY PL">>,
<<"qos">> => 0,
<<"retain">> => false,
<<"topic">> => <<"rule_apply_test_SUITE">>,
<<"topic">> => ?REPUBLISH_TOPIC,
<<"user_properties">> => <<>>
},
<<"function">> => <<"republish">>
@ -139,6 +140,8 @@ console_print_action() ->
#{<<"function">> => <<"console">>}.
basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) ->
%% Subscribe to republish action target topic so there's at least one subscriber.
_ = emqx:subscribe(?REPUBLISH_TOPIC),
%% Create Rule
RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>,
@ -181,7 +184,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode)
_NAttempts0 = 20,
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG:~n~s", [Bin]),
ct:pal("THELOG:~n~s", [Bin]),
case PayloadEncode of
hidden ->
?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]));
@ -207,7 +210,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode)
_NAttempts0 = 20,
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG2:~n~s", [Bin]),
ct:pal("THELOG2:~n~s", [Bin]),
?assertNotEqual(
nomatch, binary:match(Bin, [<<"action_stopped_after_template_rendering">>])
)
@ -219,7 +222,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode)
_NAttempts0 = 20,
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG3:~n~s", [Bin]),
ct:pal("THELOG3:~n~s", [Bin]),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])),
do_final_log_check(Action, Bin)
end

View File

@ -392,9 +392,12 @@ create_failure_tracing_rule() ->
sql => <<"select * from \"$events/schema_validation_failed\" ">>,
actions => [make_trace_fn_action()]
},
create_rule(Params).
create_rule(Params) ->
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
Res = request(post, Path, Params),
ct:pal("create failure tracing rule result:\n ~p", [Res]),
ct:pal("create rule result:\n ~p", [Res]),
case Res of
{ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} ->
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
@ -1384,3 +1387,85 @@ t_load_config(_Config) ->
?assertIndexOrder([Name4, Name3, Name5], <<"t/a">>),
ok.
%% Checks that the republish action failure metric increases when schema validation fails
%% for an outgoing message. Though this is arguably more appropriate as an
%% `emqx_rule_runtime' test case, it's simpler to setup the conditions here.
t_republish_action_failure(_Config) ->
?check_trace(
begin
Name1 = <<"1">>,
%% Always fails
Check1A = sql_check(<<"select 1 where false">>),
Validation1A = validation(Name1, [Check1A]),
{201, _} = insert(Validation1A),
RuleTopic = <<"some/topic">>,
Params = #{
enable => true,
sql => iolist_to_binary([
<<"select * from \"">>,
RuleTopic,
<<"\"">>
]),
actions => [
#{
function => <<"republish">>,
args =>
#{
<<"mqtt_properties">> => #{},
<<"payload">> => <<"aaa">>,
<<"qos">> => 0,
<<"retain">> => false,
<<"topic">> => <<"t/republished">>,
<<"user_properties">> => <<>>
}
}
]
},
{201, #{<<"id">> := RuleId}} = create_rule(Params),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, RuleTopic, #{}),
?assertNotReceive({publish, _}),
?assertMatch(
#{
matched := 1,
failed := 0,
passed := 1,
'actions.total' := 1,
'actions.success' := 0,
'actions.failed' := 1,
'actions.failed.unknown' := 1,
'actions.failed.out_of_service' := 0
},
emqx_metrics_worker:get_counters(rule_metrics, RuleId)
),
%% `publish' return type is different when failure action is `disconnect'
Validation1B = validation(Name1, [Check1A], #{<<"failure_action">> => <<"disconnect">>}),
{200, _} = update(Validation1B),
ok = publish(C, RuleTopic, #{}),
?assertNotReceive({publish, _}),
?assertMatch(
#{
matched := 2,
failed := 0,
passed := 2,
'actions.total' := 2,
'actions.success' := 0,
'actions.failed' := 2,
'actions.failed.unknown' := 2,
'actions.failed.out_of_service' := 0
},
emqx_metrics_worker:get_counters(rule_metrics, RuleId)
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1 @@
Previously, if the `republish` rule engine action failed to publish the message, the action success metrics were always increased. Now, if the action detects it doesn't reach at least one subscriber, action failure metrics are increased.