Merge pull request #7523 from EMQ-YangM/enhanced_rule_engine_error_handling
feat: enhanced rule engine error handling when json parsing error
This commit is contained in:
commit
1f00598d19
|
@ -50,35 +50,8 @@ apply_rules([], _Input) ->
|
||||||
ok;
|
ok;
|
||||||
apply_rules([#{enable := false}|More], Input) ->
|
apply_rules([#{enable := false}|More], Input) ->
|
||||||
apply_rules(More, Input);
|
apply_rules(More, Input);
|
||||||
apply_rules([Rule = #{id := RuleID}|More], Input) ->
|
apply_rules([Rule|More], Input) ->
|
||||||
try apply_rule_discard_result(Rule, Input)
|
apply_rule_discard_result(Rule, Input),
|
||||||
catch
|
|
||||||
%% ignore the errors if select or match failed
|
|
||||||
_:{select_and_transform_error, Error} ->
|
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
||||||
?SLOG(warning, #{msg => "SELECT_clause_exception",
|
|
||||||
rule_id => RuleID, reason => Error});
|
|
||||||
_:{match_conditions_error, Error} ->
|
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
||||||
?SLOG(warning, #{msg => "WHERE_clause_exception",
|
|
||||||
rule_id => RuleID, reason => Error});
|
|
||||||
_:{select_and_collect_error, Error} ->
|
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
||||||
?SLOG(warning, #{msg => "FOREACH_clause_exception",
|
|
||||||
rule_id => RuleID, reason => Error});
|
|
||||||
_:{match_incase_error, Error} ->
|
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
||||||
?SLOG(warning, #{msg => "INCASE_clause_exception",
|
|
||||||
rule_id => RuleID, reason => Error});
|
|
||||||
Class:Error:StkTrace ->
|
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
||||||
?SLOG(error, #{msg => "apply_rule_failed",
|
|
||||||
rule_id => RuleID,
|
|
||||||
exception => Class,
|
|
||||||
reason => Error,
|
|
||||||
stacktrace => StkTrace
|
|
||||||
})
|
|
||||||
end,
|
|
||||||
apply_rules(More, Input).
|
apply_rules(More, Input).
|
||||||
|
|
||||||
apply_rule_discard_result(Rule, Input) ->
|
apply_rule_discard_result(Rule, Input) ->
|
||||||
|
@ -88,7 +61,39 @@ apply_rule_discard_result(Rule, Input) ->
|
||||||
apply_rule(Rule = #{id := RuleID}, Input) ->
|
apply_rule(Rule = #{id := RuleID}, Input) ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'),
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'),
|
||||||
clear_rule_payload(),
|
clear_rule_payload(),
|
||||||
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
|
try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
|
||||||
|
catch
|
||||||
|
%% ignore the errors if select or match failed
|
||||||
|
_:Reason = {select_and_transform_error, Error} ->
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
|
?SLOG(warning, #{msg => "SELECT_clause_exception",
|
||||||
|
rule_id => RuleID, reason => Error}),
|
||||||
|
{error, Reason};
|
||||||
|
_:Reason = {match_conditions_error, Error} ->
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
|
?SLOG(warning, #{msg => "WHERE_clause_exception",
|
||||||
|
rule_id => RuleID, reason => Error}),
|
||||||
|
{error, Reason};
|
||||||
|
_:Reason = {select_and_collect_error, Error} ->
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
|
?SLOG(warning, #{msg => "FOREACH_clause_exception",
|
||||||
|
rule_id => RuleID, reason => Error}),
|
||||||
|
{error, Reason};
|
||||||
|
_:Reason = {match_incase_error, Error} ->
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
|
?SLOG(warning, #{msg => "INCASE_clause_exception",
|
||||||
|
rule_id => RuleID, reason => Error}),
|
||||||
|
{error, Reason};
|
||||||
|
Class:Error:StkTrace ->
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
|
?SLOG(error, #{msg => "apply_rule_failed",
|
||||||
|
rule_id => RuleID,
|
||||||
|
exception => Class,
|
||||||
|
reason => Error,
|
||||||
|
stacktrace => StkTrace
|
||||||
|
}),
|
||||||
|
{error, {Error, StkTrace}}
|
||||||
|
end.
|
||||||
|
|
||||||
do_apply_rule(#{
|
do_apply_rule(#{
|
||||||
id := RuleId,
|
id := RuleId,
|
||||||
|
@ -413,7 +418,7 @@ cache_payload(DecodedP) ->
|
||||||
|
|
||||||
safe_decode_and_cache(MaybeJson) ->
|
safe_decode_and_cache(MaybeJson) ->
|
||||||
try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
|
try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
|
||||||
catch _:_ -> #{}
|
catch _:_ -> error({decode_json_failed, MaybeJson})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_list(List) when is_list(List) -> List;
|
ensure_list(List) when is_list(List) -> List;
|
||||||
|
|
|
@ -64,7 +64,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
|
||||||
emqx_rule_runtime:apply_rule(Rule, FullContext)
|
emqx_rule_runtime:apply_rule(Rule, FullContext)
|
||||||
of
|
of
|
||||||
{ok, Data} -> {ok, flatten(Data)};
|
{ok, Data} -> {ok, flatten(Data)};
|
||||||
{error, nomatch} -> {error, nomatch}
|
{error, Reason} -> {error, Reason}
|
||||||
after
|
after
|
||||||
ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
|
ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -88,7 +88,8 @@ groups() ->
|
||||||
t_sqlparse_array_range_2,
|
t_sqlparse_array_range_2,
|
||||||
t_sqlparse_true_false,
|
t_sqlparse_true_false,
|
||||||
t_sqlparse_undefined_variable,
|
t_sqlparse_undefined_variable,
|
||||||
t_sqlparse_new_map
|
t_sqlparse_new_map,
|
||||||
|
t_sqlparse_invalid_json
|
||||||
]},
|
]},
|
||||||
{events, [],
|
{events, [],
|
||||||
[t_events
|
[t_events
|
||||||
|
@ -1182,7 +1183,7 @@ t_sqlparse_array_range_1(_Config) ->
|
||||||
Sql02 = "select "
|
Sql02 = "select "
|
||||||
" payload.a[1..4] as c "
|
" payload.a[1..4] as c "
|
||||||
"from \"t/#\" ",
|
"from \"t/#\" ",
|
||||||
?assertThrow({select_and_transform_error, {error,{range_get,non_list_data},_}},
|
?assertMatch({error, {select_and_transform_error, {error,{range_get,non_list_data},_}}},
|
||||||
emqx_rule_sqltester:test(
|
emqx_rule_sqltester:test(
|
||||||
#{sql => Sql02,
|
#{sql => Sql02,
|
||||||
context =>
|
context =>
|
||||||
|
@ -1341,6 +1342,28 @@ t_sqlparse_nested_get(_Config) ->
|
||||||
payload => <<"{\"a\": {\"b\": 0}}">>
|
payload => <<"{\"a\": {\"b\": 0}}">>
|
||||||
}})).
|
}})).
|
||||||
|
|
||||||
|
t_sqlparse_invalid_json(_Config) ->
|
||||||
|
Sql02 = "select "
|
||||||
|
" payload.a[1..4] as c "
|
||||||
|
"from \"t/#\" ",
|
||||||
|
?assertMatch({error, {select_and_transform_error, {error,{decode_json_failed,_},_}}},
|
||||||
|
emqx_rule_sqltester:test(
|
||||||
|
#{sql => Sql02,
|
||||||
|
context =>
|
||||||
|
#{payload => <<"{\"x\":[0,1,2,3,}">>,
|
||||||
|
topic => <<"t/a">>}})),
|
||||||
|
|
||||||
|
|
||||||
|
Sql2 = "foreach payload.sensors "
|
||||||
|
"do item.cmd as msg_type "
|
||||||
|
"from \"t/#\" ",
|
||||||
|
?assertMatch({error, {select_and_collect_error, {error,{decode_json_failed,_},_}}},
|
||||||
|
emqx_rule_sqltester:test(
|
||||||
|
#{sql => Sql2,
|
||||||
|
context =>
|
||||||
|
#{payload =>
|
||||||
|
<<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>,
|
||||||
|
topic => <<"t/a">>}})).
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal helpers
|
%% Internal helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue