From 39615e1cb6e97acd899304f3a4f1246258dbf5af Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 7 Jun 2024 10:41:34 -0300 Subject: [PATCH] fix(rule actions): check `republish` action result for metrics Fixes https://emqx.atlassian.net/browse/EMQX-12328 --- .../src/emqx_rule_actions.erl | 14 ++- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 11 ++- .../emqx_schema_validation_http_api_SUITE.erl | 87 ++++++++++++++++++- changes/ce/fix-13207.en.md | 1 + 4 files changed, 106 insertions(+), 7 deletions(-) create mode 100644 changes/ce/fix-13207.en.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index dcd9024ef..7d45c47e6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -238,8 +238,18 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) -> payload = Payload, timestamp = erlang:system_time(millisecond) }, - _ = emqx_broker:safe_publish(Msg), - emqx_metrics:inc_msg(Msg). + case emqx_broker:safe_publish(Msg) of + [_ | _] -> + emqx_metrics:inc_msg(Msg), + ok; + disconnect -> + error; + [] -> + %% Have to check previous logs to distinguish between schema validation + %% failure, no subscribers, blocked by authz, or anything else in the + %% `message.publish' hook evaluation. + error + end. parse_simple_var(Data) when is_binary(Data) -> emqx_template:parse(Data); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index 0f0395013..1592b1bde 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -24,6 +24,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>). +-define(REPUBLISH_TOPIC, <<"rule_apply_test_SUITE">>). all() -> [ @@ -129,7 +130,7 @@ republish_action() -> <<"payload">> => <<"MY PL">>, <<"qos">> => 0, <<"retain">> => false, - <<"topic">> => <<"rule_apply_test_SUITE">>, + <<"topic">> => ?REPUBLISH_TOPIC, <<"user_properties">> => <<>> }, <<"function">> => <<"republish">> @@ -139,6 +140,8 @@ console_print_action() -> #{<<"function">> => <<"console">>}. basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) -> + %% Subscribe to republish action target topic so there's at least one subscriber. + _ = emqx:subscribe(?REPUBLISH_TOPIC), %% Create Rule RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>, @@ -181,7 +184,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) _NAttempts0 = 20, begin Bin = read_rule_trace_file(TraceName, TraceType, Now), - io:format("THELOG:~n~s", [Bin]), + ct:pal("THELOG:~n~s", [Bin]), case PayloadEncode of hidden -> ?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>])); @@ -207,7 +210,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) _NAttempts0 = 20, begin Bin = read_rule_trace_file(TraceName, TraceType, Now), - io:format("THELOG2:~n~s", [Bin]), + ct:pal("THELOG2:~n~s", [Bin]), ?assertNotEqual( nomatch, binary:match(Bin, [<<"action_stopped_after_template_rendering">>]) ) @@ -219,7 +222,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) _NAttempts0 = 20, begin Bin = read_rule_trace_file(TraceName, TraceType, Now), - io:format("THELOG3:~n~s", [Bin]), + ct:pal("THELOG3:~n~s", [Bin]), ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])), do_final_log_check(Action, Bin) end diff --git a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl index 41731fa1b..f05521b3d 100644 --- a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl +++ b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl @@ -392,9 +392,12 @@ create_failure_tracing_rule() -> sql => <<"select * from \"$events/schema_validation_failed\" ">>, actions => [make_trace_fn_action()] }, + create_rule(Params). + +create_rule(Params) -> Path = emqx_mgmt_api_test_util:api_path(["rules"]), Res = request(post, Path, Params), - ct:pal("create failure tracing rule result:\n ~p", [Res]), + ct:pal("create rule result:\n ~p", [Res]), case Res of {ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} -> on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), @@ -1384,3 +1387,85 @@ t_load_config(_Config) -> ?assertIndexOrder([Name4, Name3, Name5], <<"t/a">>), ok. + +%% Checks that the republish action failure metric increases when schema validation fails +%% for an outgoing message. Though this is arguably more appropriate as an +%% `emqx_rule_runtime' test case, it's simpler to setup the conditions here. +t_republish_action_failure(_Config) -> + ?check_trace( + begin + Name1 = <<"1">>, + %% Always fails + Check1A = sql_check(<<"select 1 where false">>), + Validation1A = validation(Name1, [Check1A]), + {201, _} = insert(Validation1A), + + RuleTopic = <<"some/topic">>, + Params = #{ + enable => true, + sql => iolist_to_binary([ + <<"select * from \"">>, + RuleTopic, + <<"\"">> + ]), + actions => [ + #{ + function => <<"republish">>, + args => + #{ + <<"mqtt_properties">> => #{}, + <<"payload">> => <<"aaa">>, + <<"qos">> => 0, + <<"retain">> => false, + <<"topic">> => <<"t/republished">>, + <<"user_properties">> => <<>> + } + } + ] + }, + {201, #{<<"id">> := RuleId}} = create_rule(Params), + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + ok = publish(C, RuleTopic, #{}), + ?assertNotReceive({publish, _}), + + ?assertMatch( + #{ + matched := 1, + failed := 0, + passed := 1, + 'actions.total' := 1, + 'actions.success' := 0, + 'actions.failed' := 1, + 'actions.failed.unknown' := 1, + 'actions.failed.out_of_service' := 0 + }, + emqx_metrics_worker:get_counters(rule_metrics, RuleId) + ), + + %% `publish' return type is different when failure action is `disconnect' + Validation1B = validation(Name1, [Check1A], #{<<"failure_action">> => <<"disconnect">>}), + {200, _} = update(Validation1B), + + ok = publish(C, RuleTopic, #{}), + ?assertNotReceive({publish, _}), + + ?assertMatch( + #{ + matched := 2, + failed := 0, + passed := 2, + 'actions.total' := 2, + 'actions.success' := 0, + 'actions.failed' := 2, + 'actions.failed.unknown' := 2, + 'actions.failed.out_of_service' := 0 + }, + emqx_metrics_worker:get_counters(rule_metrics, RuleId) + ), + + ok + end, + [] + ), + ok. diff --git a/changes/ce/fix-13207.en.md b/changes/ce/fix-13207.en.md new file mode 100644 index 000000000..751253693 --- /dev/null +++ b/changes/ce/fix-13207.en.md @@ -0,0 +1 @@ +Previously, if the `republish` rule engine action failed to publish the message, the action success metrics were always increased. Now, if the action detects it doesn't reach at least one subscriber, action failure metrics are increased.