From 05032467bd1e16b30dcd628c37bdf0fdc9b2cacb Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 17 May 2022 11:46:00 +0200 Subject: [PATCH 1/5] feat(rule_engine): default timeout for jq/2 and jq/3 with timeout This commit adds a default timeout of 10 seconds to the rule engine's `jq/2` function, and adds a new function `jq/3` (where the last parameter is a timeout value). The default timeout can be configured with the setting "rule_engine.jq_function_default_timeout". Having a timeout when executing jq code in the rule engine is important as jq code can potentially run forever. Also, the Erlang jq library limits the number of jq programs that can execute concurrently so a jq program that loops forever could potentially also prevent a "non-buggy" jq program from ever starting. --- .../etc/emqx_rule_engine.conf | 1 + .../i18n/emqx_rule_engine_schema.conf | 11 +++++ .../src/emqx_rule_engine_schema.erl | 40 ++++++++++++++- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 29 ++++++++--- .../test/emqx_rule_funcs_SUITE.erl | 49 ++++++++++++++++++- mix.exs | 2 +- rebar.config.erl | 2 +- 7 files changed, 124 insertions(+), 10 deletions(-) diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index b502fa039..fba24c6fb 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -3,6 +3,7 @@ ##==================================================================== rule_engine { ignore_sys_message = true + jq_function_default_timeout = 10s #rules.my_republish_rule { # description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'" # enable = true diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf index db469ba82..1efcf9ea2 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf @@ -239,6 +239,17 @@ of the rule, then the string "undefined" is used. } } + rule_engine_jq_function_default_timeout { + desc { + en: "Default timeout for the jq/2 rule engine function" + zh: "Default timeout for the jq/2 rule engine function" + } + label: { + en: "Rule engine jq function default timeout" + zh: "Rule engine jq function default timeout" + } + } + desc_rule_engine { desc { en: """Configuration for the EMQX Rule Engine.""" diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index eed257b27..43c8e194b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -19,8 +19,16 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-type duration_ms() :: integer(). + +-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). + -behaviour(hocon_schema). +-reflect_type([ + duration_ms/0 +]). + -export([ namespace/0, roots/0, @@ -30,6 +38,11 @@ -export([validate_sql/1, validate_rule_name/1]). +% workaround: prevent being recognized as unused functions +-export([ + to_duration_ms/1 +]). + namespace() -> rule_engine. roots() -> ["rule_engine"]. @@ -41,7 +54,15 @@ fields("rule_engine") -> {rules, sc(hoconsc:map("id", ref("rules")), #{ desc => ?DESC("rule_engine_rules"), default => #{} - })} + })}, + {jq_function_default_timeout, + sc( + duration_ms(), + #{ + default => "10s", + desc => ?DESC("rule_engine_jq_function_default_timeout") + } + )} ]; fields("rules") -> [ @@ -218,3 +239,20 @@ validate_sql(Sql) -> sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). + +-spec ceiling(number()) -> integer(). +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. + +-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when + Input :: string() | binary(). +to_duration_ms(Str) -> + case hocon_postprocess:duration(Str) of + I when is_number(I) -> {ok, ceiling(I)}; + _ -> {error, Str} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 1e87458e4..e7c1095dd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -148,7 +148,8 @@ ascii/1, find/2, find/3, - jq/2 + jq/2, + jq/3 ]). %% Map Funcs @@ -780,22 +781,38 @@ find_s(S, P, Dir) -> SubStr -> SubStr end. --spec jq(FilterProgram, JSON) -> Result when +-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when FilterProgram :: binary(), JSON :: binary() | term(), + TimeoutMS :: non_neg_integer(), Result :: [term()]. -jq(FilterProgram, JSONBin) when +jq(FilterProgram, JSONBin, TimeoutMS) when is_binary(FilterProgram), is_binary(JSONBin) -> - case jq:process_json(FilterProgram, JSONBin) of + case jq:process_json(FilterProgram, JSONBin, TimeoutMS) of {ok, Result} -> [json_decode(JSONString) || JSONString <- Result]; {error, ErrorReason} -> erlang:throw({jq_exception, ErrorReason}) end; -jq(FilterProgram, JSONTerm) when is_binary(FilterProgram) -> +jq(FilterProgram, JSONTerm, TimeoutMS) when is_binary(FilterProgram) -> JSONBin = json_encode(JSONTerm), - jq(FilterProgram, JSONBin). + jq(FilterProgram, JSONBin, TimeoutMS). + +-spec jq(FilterProgram, JSON) -> Result when + FilterProgram :: binary(), + JSON :: binary() | term(), + Result :: [term()]. +jq(FilterProgram, JSONBin) -> + ConfigRootKey = emqx_rule_engine_schema:namespace(), + jq( + FilterProgram, + JSONBin, + emqx_config:get([ + ConfigRootKey, + jq_function_default_timeout + ]) + ). %%------------------------------------------------------------------------------ %% Array Funcs diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 5b27e3efa..4c8a7ead0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -28,6 +28,14 @@ -define(PROPTEST(F), ?assert(proper:quickcheck(F()))). %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))). +init_per_suite(Config) -> + application:load(emqx_conf), + ConfigConf = <<"rule_engine {jq_function_default_timeout {}}">>, + ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf), + Config. + +end_per_suite(_Config) -> + ok. %%------------------------------------------------------------------------------ %% Test cases for IoT Funcs %%------------------------------------------------------------------------------ @@ -645,7 +653,46 @@ t_jq(_) -> ?assertEqual( jq_1_elm_res("{\"b\":2}"), apply_func(jq, [<<".">>, <<"{\"b\": 2}">>]) - ). + ), + %% Expicitly set timeout + ?assertEqual( + jq_1_elm_res("{\"b\":2}"), + apply_func(jq, [<<".">>, <<"{\"b\": 2}">>, 10000]) + ), + TOProgram = erlang:iolist_to_binary( + "def while(cond; update):" + " def _while:" + " if cond then (update | _while) else . end;" + " _while;" + "while(. < 42; . * 2)" + ), + got_timeout = + try + apply_func(jq, [TOProgram, <<"-2">>, 10]) + catch + throw:{jq_exception, {timeout, _}} -> + %% Got timeout as expected + got_timeout + end, + ConfigRootKey = emqx_rule_engine_schema:namespace(), + DefaultTimeOut = emqx_config:get([ + ConfigRootKey, + jq_function_default_timeout + ]), + case DefaultTimeOut =< 15000 of + true -> + got_timeout = + try + apply_func(jq, [TOProgram, <<"-2">>]) + catch + throw:{jq_exception, {timeout, _}} -> + %% Got timeout as expected + got_timeout + end; + false -> + %% Skip test as we don't want it to take to long time to run + ok + end. ascii_string() -> list(range(0, 127)). diff --git a/mix.exs b/mix.exs index bb0960f60..9a0c88696 100644 --- a/mix.exs +++ b/mix.exs @@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do defp jq_dep() do if enable_jq?(), - do: [{:jq, github: "emqx/jq", tag: "v0.2.2", override: true}], + do: [{:jq, github: "emqx/jq", tag: "v0.3.0-beta.1", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index a597a8749..af2e402f2 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -41,7 +41,7 @@ quicer() -> {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. jq() -> - {jq, {git, "https://github.com/emqx/jq", {tag, "v0.2.2"}}}. + {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.0-beta.1"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config), From 402ab7e759f89956116a9a0e713879c75a7bdcc2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 18 May 2022 06:45:40 +0200 Subject: [PATCH 2/5] chore: remove unnecessary repeated code Co-authored-by: Zaiming (Stone) Shi --- .../src/emqx_rule_engine_schema.erl | 32 +------------------ mix.exs | 2 +- rebar.config.erl | 2 +- 3 files changed, 3 insertions(+), 33 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 43c8e194b..1db1382a7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -19,16 +19,8 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --type duration_ms() :: integer(). - --typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). - -behaviour(hocon_schema). --reflect_type([ - duration_ms/0 -]). - -export([ namespace/0, roots/0, @@ -38,11 +30,6 @@ -export([validate_sql/1, validate_rule_name/1]). -% workaround: prevent being recognized as unused functions --export([ - to_duration_ms/1 -]). - namespace() -> rule_engine. roots() -> ["rule_engine"]. @@ -57,7 +44,7 @@ fields("rule_engine") -> })}, {jq_function_default_timeout, sc( - duration_ms(), + emqx_schema:duration_ms(), #{ default => "10s", desc => ?DESC("rule_engine_jq_function_default_timeout") @@ -239,20 +226,3 @@ validate_sql(Sql) -> sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). - --spec ceiling(number()) -> integer(). -ceiling(X) -> - T = erlang:trunc(X), - case (X - T) of - Neg when Neg < 0 -> T; - Pos when Pos > 0 -> T + 1; - _ -> T - end. - --spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when - Input :: string() | binary(). -to_duration_ms(Str) -> - case hocon_postprocess:duration(Str) of - I when is_number(I) -> {ok, ceiling(I)}; - _ -> {error, Str} - end. diff --git a/mix.exs b/mix.exs index 9a0c88696..a55954211 100644 --- a/mix.exs +++ b/mix.exs @@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do defp jq_dep() do if enable_jq?(), - do: [{:jq, github: "emqx/jq", tag: "v0.3.0-beta.1", override: true}], + do: [{:jq, github: "emqx/jq", tag: "v0.3.0", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index af2e402f2..b83d41e3e 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -41,7 +41,7 @@ quicer() -> {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. jq() -> - {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.0-beta.1"}}}. + {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.0"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config), From efd6461e154c338cf55b4d9899a5e0cb3f65deca Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Sat, 21 May 2022 07:01:26 +0200 Subject: [PATCH 3/5] docs: chinese documentation for jq timeout configuration Co-authored-by: Zaiming (Stone) Shi --- apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf index 1efcf9ea2..01d42f9ce 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf @@ -241,12 +241,12 @@ of the rule, then the string "undefined" is used. rule_engine_jq_function_default_timeout { desc { - en: "Default timeout for the jq/2 rule engine function" - zh: "Default timeout for the jq/2 rule engine function" + en: "Default timeout for the `jq` rule engine function" + zh: “规则引擎内建函数 `jq` 默认时间限制” } label: { en: "Rule engine jq function default timeout" - zh: "Rule engine jq function default timeout" + zh: "规则引擎 jq 函数时间限制" } } From 7bc2d9edbb6493c2a97dcc1f24cae35faf81d047 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 21 May 2022 14:17:12 +0800 Subject: [PATCH 4/5] fix: remove bison from the required tools when building emqx --- apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf | 2 +- mix.exs | 2 +- rebar.config.erl | 2 +- scripts/prepare-build-deps.sh | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf index 01d42f9ce..ae2b821c2 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf @@ -242,7 +242,7 @@ of the rule, then the string "undefined" is used. rule_engine_jq_function_default_timeout { desc { en: "Default timeout for the `jq` rule engine function" - zh: “规则引擎内建函数 `jq` 默认时间限制” + zh: "规则引擎内建函数 `jq` 默认时间限制" } label: { en: "Rule engine jq function default timeout" diff --git a/mix.exs b/mix.exs index a55954211..1b1b15264 100644 --- a/mix.exs +++ b/mix.exs @@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do defp jq_dep() do if enable_jq?(), - do: [{:jq, github: "emqx/jq", tag: "v0.3.0", override: true}], + do: [{:jq, github: "emqx/jq", tag: "v0.3.1", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index b83d41e3e..e3a344282 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -41,7 +41,7 @@ quicer() -> {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. jq() -> - {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.0"}}}. + {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.1"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config), diff --git a/scripts/prepare-build-deps.sh b/scripts/prepare-build-deps.sh index 3c1048a27..b54611326 100755 --- a/scripts/prepare-build-deps.sh +++ b/scripts/prepare-build-deps.sh @@ -4,7 +4,7 @@ set -euo pipefail AUTO_INSTALL_BUILD_DEPS="${AUTO_INSTALL_BUILD_DEPS:-0}" -required_packages_mac_osx="freetds unixodbc bison" +required_packages_mac_osx="freetds unixodbc" required_cmds_mac_osx="curl zip unzip autoconf automake cmake openssl" dependency_missing() { From 86508429946562c2324952df70c502e28db72f5c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 22 May 2022 14:14:02 +0800 Subject: [PATCH 5/5] fix: pin jq v0.3.2 --- mix.exs | 2 +- rebar.config.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index 1b1b15264..7197913d5 100644 --- a/mix.exs +++ b/mix.exs @@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do defp jq_dep() do if enable_jq?(), - do: [{:jq, github: "emqx/jq", tag: "v0.3.1", override: true}], + do: [{:jq, github: "emqx/jq", tag: "v0.3.2", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index e3a344282..c7ef3ef2c 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -41,7 +41,7 @@ quicer() -> {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. jq() -> - {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.1"}}}. + {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.2"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config),