Merge pull request #8006 from emqx/copy-of-kjell/jq/timeout
feat(rule_engine): default timeout for jq/2 and jq/3 with timeout
This commit is contained in:
commit
866810cea6
|
@ -3,6 +3,7 @@
|
|||
##====================================================================
|
||||
rule_engine {
|
||||
ignore_sys_message = true
|
||||
jq_function_default_timeout = 10s
|
||||
#rules.my_republish_rule {
|
||||
# description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'"
|
||||
# enable = true
|
||||
|
|
|
@ -239,6 +239,17 @@ of the rule, then the string "undefined" is used.
|
|||
}
|
||||
}
|
||||
|
||||
rule_engine_jq_function_default_timeout {
|
||||
desc {
|
||||
en: "Default timeout for the `jq` rule engine function"
|
||||
zh: "规则引擎内建函数 `jq` 默认时间限制"
|
||||
}
|
||||
label: {
|
||||
en: "Rule engine jq function default timeout"
|
||||
zh: "规则引擎 jq 函数时间限制"
|
||||
}
|
||||
}
|
||||
|
||||
desc_rule_engine {
|
||||
desc {
|
||||
en: """Configuration for the EMQX Rule Engine."""
|
||||
|
|
|
@ -41,7 +41,15 @@ fields("rule_engine") ->
|
|||
{rules,
|
||||
sc(hoconsc:map("id", ref("rules")), #{
|
||||
desc => ?DESC("rule_engine_rules"), default => #{}
|
||||
})}
|
||||
})},
|
||||
{jq_function_default_timeout,
|
||||
sc(
|
||||
emqx_schema:duration_ms(),
|
||||
#{
|
||||
default => "10s",
|
||||
desc => ?DESC("rule_engine_jq_function_default_timeout")
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields("rules") ->
|
||||
[
|
||||
|
|
|
@ -149,7 +149,8 @@
|
|||
ascii/1,
|
||||
find/2,
|
||||
find/3,
|
||||
jq/2
|
||||
jq/2,
|
||||
jq/3
|
||||
]).
|
||||
|
||||
%% Map Funcs
|
||||
|
@ -784,22 +785,38 @@ find_s(S, P, Dir) ->
|
|||
SubStr -> SubStr
|
||||
end.
|
||||
|
||||
-spec jq(FilterProgram, JSON) -> Result when
|
||||
-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when
|
||||
FilterProgram :: binary(),
|
||||
JSON :: binary() | term(),
|
||||
TimeoutMS :: non_neg_integer(),
|
||||
Result :: [term()].
|
||||
jq(FilterProgram, JSONBin) when
|
||||
jq(FilterProgram, JSONBin, TimeoutMS) when
|
||||
is_binary(FilterProgram), is_binary(JSONBin)
|
||||
->
|
||||
case jq:process_json(FilterProgram, JSONBin) of
|
||||
case jq:process_json(FilterProgram, JSONBin, TimeoutMS) of
|
||||
{ok, Result} ->
|
||||
[json_decode(JSONString) || JSONString <- Result];
|
||||
{error, ErrorReason} ->
|
||||
erlang:throw({jq_exception, ErrorReason})
|
||||
end;
|
||||
jq(FilterProgram, JSONTerm) when is_binary(FilterProgram) ->
|
||||
jq(FilterProgram, JSONTerm, TimeoutMS) when is_binary(FilterProgram) ->
|
||||
JSONBin = json_encode(JSONTerm),
|
||||
jq(FilterProgram, JSONBin).
|
||||
jq(FilterProgram, JSONBin, TimeoutMS).
|
||||
|
||||
-spec jq(FilterProgram, JSON) -> Result when
|
||||
FilterProgram :: binary(),
|
||||
JSON :: binary() | term(),
|
||||
Result :: [term()].
|
||||
jq(FilterProgram, JSONBin) ->
|
||||
ConfigRootKey = emqx_rule_engine_schema:namespace(),
|
||||
jq(
|
||||
FilterProgram,
|
||||
JSONBin,
|
||||
emqx_config:get([
|
||||
ConfigRootKey,
|
||||
jq_function_default_timeout
|
||||
])
|
||||
).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Array Funcs
|
||||
|
|
|
@ -28,6 +28,14 @@
|
|||
-define(PROPTEST(F), ?assert(proper:quickcheck(F()))).
|
||||
%%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx_conf),
|
||||
ConfigConf = <<"rule_engine {jq_function_default_timeout {}}">>,
|
||||
ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Test cases for IoT Funcs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -654,7 +662,46 @@ t_jq(_) ->
|
|||
?assertEqual(
|
||||
jq_1_elm_res("{\"b\":2}"),
|
||||
apply_func(jq, [<<".">>, <<"{\"b\": 2}">>])
|
||||
).
|
||||
),
|
||||
%% Expicitly set timeout
|
||||
?assertEqual(
|
||||
jq_1_elm_res("{\"b\":2}"),
|
||||
apply_func(jq, [<<".">>, <<"{\"b\": 2}">>, 10000])
|
||||
),
|
||||
TOProgram = erlang:iolist_to_binary(
|
||||
"def while(cond; update):"
|
||||
" def _while:"
|
||||
" if cond then (update | _while) else . end;"
|
||||
" _while;"
|
||||
"while(. < 42; . * 2)"
|
||||
),
|
||||
got_timeout =
|
||||
try
|
||||
apply_func(jq, [TOProgram, <<"-2">>, 10])
|
||||
catch
|
||||
throw:{jq_exception, {timeout, _}} ->
|
||||
%% Got timeout as expected
|
||||
got_timeout
|
||||
end,
|
||||
ConfigRootKey = emqx_rule_engine_schema:namespace(),
|
||||
DefaultTimeOut = emqx_config:get([
|
||||
ConfigRootKey,
|
||||
jq_function_default_timeout
|
||||
]),
|
||||
case DefaultTimeOut =< 15000 of
|
||||
true ->
|
||||
got_timeout =
|
||||
try
|
||||
apply_func(jq, [TOProgram, <<"-2">>])
|
||||
catch
|
||||
throw:{jq_exception, {timeout, _}} ->
|
||||
%% Got timeout as expected
|
||||
got_timeout
|
||||
end;
|
||||
false ->
|
||||
%% Skip test as we don't want it to take to long time to run
|
||||
ok
|
||||
end.
|
||||
|
||||
ascii_string() -> list(range(0, 127)).
|
||||
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
|
||||
defp jq_dep() do
|
||||
if enable_jq?(),
|
||||
do: [{:jq, github: "emqx/jq", tag: "v0.2.2", override: true}],
|
||||
do: [{:jq, github: "emqx/jq", tag: "v0.3.2", override: true}],
|
||||
else: []
|
||||
end
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ quicer() ->
|
|||
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}.
|
||||
|
||||
jq() ->
|
||||
{jq, {git, "https://github.com/emqx/jq", {tag, "v0.2.2"}}}.
|
||||
{jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.2"}}}.
|
||||
|
||||
deps(Config) ->
|
||||
{deps, OldDeps} = lists:keyfind(deps, 1, Config),
|
||||
|
|
|
@ -4,7 +4,7 @@ set -euo pipefail
|
|||
|
||||
AUTO_INSTALL_BUILD_DEPS="${AUTO_INSTALL_BUILD_DEPS:-0}"
|
||||
|
||||
required_packages_mac_osx="freetds unixodbc bison"
|
||||
required_packages_mac_osx="freetds unixodbc"
|
||||
required_cmds_mac_osx="curl zip unzip autoconf automake cmake openssl"
|
||||
|
||||
dependency_missing() {
|
||||
|
|
Loading…
Reference in New Issue