diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index de853d7e2..906a219b2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -50,35 +50,8 @@ apply_rules([], _Input) -> ok; apply_rules([#{enable := false}|More], Input) -> apply_rules(More, Input); -apply_rules([Rule = #{id := RuleID}|More], Input) -> - try apply_rule_discard_result(Rule, Input) - catch - %% ignore the errors if select or match failed - _:{select_and_transform_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), - ?SLOG(warning, #{msg => "SELECT_clause_exception", - rule_id => RuleID, reason => Error}); - _:{match_conditions_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), - ?SLOG(warning, #{msg => "WHERE_clause_exception", - rule_id => RuleID, reason => Error}); - _:{select_and_collect_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), - ?SLOG(warning, #{msg => "FOREACH_clause_exception", - rule_id => RuleID, reason => Error}); - _:{match_incase_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), - ?SLOG(warning, #{msg => "INCASE_clause_exception", - rule_id => RuleID, reason => Error}); - Class:Error:StkTrace -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), - ?SLOG(error, #{msg => "apply_rule_failed", - rule_id => RuleID, - exception => Class, - reason => Error, - stacktrace => StkTrace - }) - end, +apply_rules([Rule|More], Input) -> + apply_rule_discard_result(Rule, Input), apply_rules(More, Input). apply_rule_discard_result(Rule, Input) -> @@ -88,7 +61,39 @@ apply_rule_discard_result(Rule, Input) -> apply_rule(Rule = #{id := RuleID}, Input) -> ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'), clear_rule_payload(), - do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). + try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})) + catch + %% ignore the errors if select or match failed + _:Reason = {select_and_transform_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ?SLOG(warning, #{msg => "SELECT_clause_exception", + rule_id => RuleID, reason => Error}), + {error, Reason}; + _:Reason = {match_conditions_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ?SLOG(warning, #{msg => "WHERE_clause_exception", + rule_id => RuleID, reason => Error}), + {error, Reason}; + _:Reason = {select_and_collect_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ?SLOG(warning, #{msg => "FOREACH_clause_exception", + rule_id => RuleID, reason => Error}), + {error, Reason}; + _:Reason = {match_incase_error, Error} -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ?SLOG(warning, #{msg => "INCASE_clause_exception", + rule_id => RuleID, reason => Error}), + {error, Reason}; + Class:Error:StkTrace -> + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ?SLOG(error, #{msg => "apply_rule_failed", + rule_id => RuleID, + exception => Class, + reason => Error, + stacktrace => StkTrace + }), + {error, {Error, StkTrace}} + end. do_apply_rule(#{ id := RuleId, @@ -413,7 +418,7 @@ cache_payload(DecodedP) -> safe_decode_and_cache(MaybeJson) -> try cache_payload(emqx_json:decode(MaybeJson, [return_maps])) - catch _:_ -> #{} + catch _:_ -> error({decode_json_failed, MaybeJson}) end. ensure_list(List) when is_list(List) -> List; diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index fc6c1a5ad..a791863f6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -64,7 +64,7 @@ test_rule(Sql, Select, Context, EventTopics) -> emqx_rule_runtime:apply_rule(Rule, FullContext) of {ok, Data} -> {ok, flatten(Data)}; - {error, nomatch} -> {error, nomatch} + {error, Reason} -> {error, Reason} after ok = emqx_rule_engine:clear_metrics_for_rule(RuleId) end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 7700e305d..ce67d3cf5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -88,7 +88,8 @@ groups() -> t_sqlparse_array_range_2, t_sqlparse_true_false, t_sqlparse_undefined_variable, - t_sqlparse_new_map + t_sqlparse_new_map, + t_sqlparse_invalid_json ]}, {events, [], [t_events @@ -1182,7 +1183,7 @@ t_sqlparse_array_range_1(_Config) -> Sql02 = "select " " payload.a[1..4] as c " "from \"t/#\" ", - ?assertThrow({select_and_transform_error, {error,{range_get,non_list_data},_}}, + ?assertMatch({error, {select_and_transform_error, {error,{range_get,non_list_data},_}}}, emqx_rule_sqltester:test( #{sql => Sql02, context => @@ -1341,6 +1342,28 @@ t_sqlparse_nested_get(_Config) -> payload => <<"{\"a\": {\"b\": 0}}">> }})). +t_sqlparse_invalid_json(_Config) -> + Sql02 = "select " + " payload.a[1..4] as c " + "from \"t/#\" ", + ?assertMatch({error, {select_and_transform_error, {error,{decode_json_failed,_},_}}}, + emqx_rule_sqltester:test( + #{sql => Sql02, + context => + #{payload => <<"{\"x\":[0,1,2,3,}">>, + topic => <<"t/a">>}})), + + + Sql2 = "foreach payload.sensors " + "do item.cmd as msg_type " + "from \"t/#\" ", + ?assertMatch({error, {select_and_collect_error, {error,{decode_json_failed,_},_}}}, + emqx_rule_sqltester:test( + #{sql => Sql2, + context => + #{payload => + <<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>, + topic => <<"t/a">>}})). %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------