diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 25addb817..857a817f3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -51,23 +51,27 @@ apply_rules([], _Input) -> apply_rules([#rule{enabled = false}|More], Input) -> apply_rules(More, Input); apply_rules([Rule = #rule{id = RuleID}|More], Input) -> - ok = emqx_rule_metrics:inc_rules_matched(RuleID), try apply_rule_discard_result(Rule, Input) catch %% ignore the errors if select or match failed _:{select_and_transform_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "SELECT clause exception for ~s failed: ~p", [RuleID, Error]); _:{match_conditions_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "WHERE clause exception for ~s failed: ~p", [RuleID, Error]); _:{select_and_collect_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", [RuleID, Error]); _:{match_incase_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "INCASE clause exception for ~s failed: ~p", [RuleID, Error]); _:Error:StkTrace -> + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", [RuleID, Error, StkTrace]) end, @@ -79,6 +83,7 @@ apply_rule_discard_result(Rule, Input) -> apply_rule(Rule = #rule{id = RuleID}, Input) -> clear_rule_payload(), + ok = emqx_rule_metrics:inc_rules_matched(RuleID), do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). do_apply_rule(#rule{id = RuleId, @@ -89,15 +94,13 @@ do_apply_rule(#rule{id = RuleId, conditions = Conditions, on_action_failed = OnFailed, actions = Actions}, Input) -> - {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input), - emqx_rule_metrics:inc_rules_exception(RuleId), + {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input), {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}), ColumnsAndSelected = maps:merge(Input, Selected), case ?RAISE(match_conditions(Conditions, ColumnsAndSelected), - emqx_rule_metrics:inc_rules_exception(RuleId), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - Collection2 = filter_collection(RuleId, Input, InCase, DoEach, Collection), + Collection2 = filter_collection(Input, InCase, DoEach, Collection), case Collection2 of [] -> emqx_rule_metrics:inc_rules_no_result(RuleId); _ -> emqx_rule_metrics:inc_rules_passed(RuleId) @@ -115,10 +118,8 @@ do_apply_rule(#rule{id = RuleId, on_action_failed = OnFailed, actions = Actions}, Input) -> Selected = ?RAISE(select_and_transform(Fields, Input), - emqx_rule_metrics:inc_rules_exception(RuleId), {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}), case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), - emqx_rule_metrics:inc_rules_exception(RuleId), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> ok = emqx_rule_metrics:inc_rules_passed(RuleId), @@ -176,18 +177,16 @@ select_and_collect([Field|More], Input, {Output, LastKV}) -> {nested_put(Key, Val, Output), LastKV}). %% Filter each item got from FOREACH --dialyzer({nowarn_function, filter_collection/5}). -filter_collection(RuleId, Input, InCase, DoEach, {CollKey, CollVal}) -> +-dialyzer({nowarn_function, filter_collection/4}). +filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) -> lists:filtermap( fun(Item) -> InputAndItem = maps:merge(Input, #{CollKey => Item}), case ?RAISE(match_conditions(InCase, InputAndItem), - emqx_rule_metrics:inc_rules_exception(RuleId), {match_incase_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true when DoEach == [] -> {true, InputAndItem}; true -> {true, ?RAISE(select_and_transform(DoEach, InputAndItem), - emqx_rule_metrics:inc_rules_exception(RuleId), {doeach_error, {_EXCLASS_,_EXCPTION_,_ST_}})}; false -> false end