diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 99ff74d8f..0d4d40453 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -18,6 +18,7 @@ File format: ExHook now supports user customizing emqx_hook execute priority. * add api: PUT /rules/{id}/reset_metrics. This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474] +* Enhanced rule engine error handling when json parsing error. ### Bug fixes diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 68cf3993f..c836c420d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,9 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.8", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -10,7 +12,9 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -18,7 +22,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}}, @@ -27,7 +32,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -36,7 +42,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -45,7 +52,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -55,7 +63,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -66,7 +75,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -77,7 +87,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -89,7 +100,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.8", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -97,7 +110,9 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -105,7 +120,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}}, @@ -114,7 +130,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -123,7 +140,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -132,7 +150,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -142,7 +161,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -153,7 +173,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -164,7 +185,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 35121c046..2912bd7b3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -50,31 +50,8 @@ apply_rules([], _Input) -> ok; apply_rules([#rule{enabled = false}|More], Input) -> apply_rules(More, Input); -apply_rules([Rule = #rule{id = RuleID}|More], Input) -> - try apply_rule_discard_result(Rule, Input) - catch - %% ignore the errors if select or match failed - _:{select_and_transform_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "SELECT clause exception for ~s failed: ~p", - [RuleID, Error]); - _:{match_conditions_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "WHERE clause exception for ~s failed: ~p", - [RuleID, Error]); - _:{select_and_collect_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", - [RuleID, Error]); - _:{match_incase_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "INCASE clause exception for ~s failed: ~p", - [RuleID, Error]); - _:Error:StkTrace -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", - [RuleID, Error, StkTrace]) - end, +apply_rules([Rule|More], Input) -> + apply_rule_discard_result(Rule, Input), apply_rules(More, Input). apply_rule_discard_result(Rule, Input) -> @@ -84,7 +61,35 @@ apply_rule_discard_result(Rule, Input) -> apply_rule(Rule = #rule{id = RuleID}, Input) -> clear_rule_payload(), ok = emqx_rule_metrics:inc_rules_matched(RuleID), - do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). + try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})) + catch + %% ignore the errors if select or match failed + _:Reason = {select_and_transform_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "SELECT clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Reason = {match_conditions_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "WHERE clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Reason = {select_and_collect_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Reason = {match_incase_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "INCASE clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Error:StkTrace -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", + [RuleID, Error, StkTrace]), + {error, {Error, StkTrace}} + end. do_apply_rule(#rule{id = RuleId, is_foreach = true, @@ -452,7 +457,8 @@ cache_payload(DecodedP) -> safe_decode_and_cache(MaybeJson) -> try cache_payload(emqx_json:decode(MaybeJson, [return_maps])) - catch _:_ -> #{} + catch + _:_:_-> error({decode_json_failed, MaybeJson}) end. ensure_list(List) when is_list(List) -> List; diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 760205d62..26da4cead 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -77,7 +77,7 @@ test_rule(Sql, Select, Context, EventTopics) -> R of {ok, Data} -> {ok, flatten(Data)}; - {error, nomatch} -> {error, nomatch} + {error, Reason} -> {error, Reason} after ok = emqx_rule_registry:remove_action_instance_params(ActInstId) end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 1c11d756f..96480bfc5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -125,7 +125,8 @@ groups() -> t_sqlparse_array_range_1, t_sqlparse_array_range_2, t_sqlparse_true_false, - t_sqlparse_new_map + t_sqlparse_new_map, + t_sqlparse_invalid_json ]}, {rule_metrics, [], [t_metrics, @@ -2308,12 +2309,13 @@ t_sqlparse_array_range_1(_Config) -> Sql02 = "select " " payload.a[1..4] as c " "from \"t/#\" ", - ?assertThrow({select_and_transform_error, {error,{range_get,non_list_data},_}}, + ?assertMatch({error, {select_and_transform_error, {error,{range_get,non_list_data},_}}}, emqx_rule_sqltester:test( #{<<"rawsql">> => Sql02, <<"ctx">> => #{<<"payload">> => <<"{\"x\":[0,1,2,3,4,5]}">>, <<"topic">> => <<"t/a">>}})), + %% construct a range: Sql1 = "select " " [1..4] as c, " @@ -2440,6 +2442,29 @@ t_sqlparse_nested_get(_Config) -> <<"payload">> => <<"{\"a\": {\"b\": 0}}">> }})). +t_sqlparse_invalid_json(_Config) -> + Sql02 = "select " + " payload.a[1..4] as c " + "from \"t/#\" ", + ?assertMatch({error, {select_and_transform_error, {error,{decode_json_failed,_},_}}}, + emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql02, + <<"ctx">> => + #{<<"payload">> => <<"{\"x\":[0,1,2,3,}">>, + <<"topic">> => <<"t/a">>}})), + + + Sql2 = "foreach payload.sensors " + "do item.cmd as msg_type " + "from \"t/#\" ", + ?assertMatch({error, {select_and_collect_error, {error,{decode_json_failed,_},_}}}, + emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql2, + <<"ctx">> => + #{<<"payload">> => + <<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>, + <<"topic">> => <<"t/a">>}})). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------