diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index b502fa039..fba24c6fb 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -3,6 +3,7 @@ ##==================================================================== rule_engine { ignore_sys_message = true + jq_function_default_timeout = 10s #rules.my_republish_rule { # description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'" # enable = true diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf index db469ba82..1efcf9ea2 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf @@ -239,6 +239,17 @@ of the rule, then the string "undefined" is used. } } + rule_engine_jq_function_default_timeout { + desc { + en: "Default timeout for the jq/2 rule engine function" + zh: "Default timeout for the jq/2 rule engine function" + } + label: { + en: "Rule engine jq function default timeout" + zh: "Rule engine jq function default timeout" + } + } + desc_rule_engine { desc { en: """Configuration for the EMQX Rule Engine.""" diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index eed257b27..43c8e194b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -19,8 +19,16 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-type duration_ms() :: integer(). + +-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). + -behaviour(hocon_schema). +-reflect_type([ + duration_ms/0 +]). + -export([ namespace/0, roots/0, @@ -30,6 +38,11 @@ -export([validate_sql/1, validate_rule_name/1]). +% workaround: prevent being recognized as unused functions +-export([ + to_duration_ms/1 +]). + namespace() -> rule_engine. roots() -> ["rule_engine"]. @@ -41,7 +54,15 @@ fields("rule_engine") -> {rules, sc(hoconsc:map("id", ref("rules")), #{ desc => ?DESC("rule_engine_rules"), default => #{} - })} + })}, + {jq_function_default_timeout, + sc( + duration_ms(), + #{ + default => "10s", + desc => ?DESC("rule_engine_jq_function_default_timeout") + } + )} ]; fields("rules") -> [ @@ -218,3 +239,20 @@ validate_sql(Sql) -> sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). + +-spec ceiling(number()) -> integer(). +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. + +-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when + Input :: string() | binary(). +to_duration_ms(Str) -> + case hocon_postprocess:duration(Str) of + I when is_number(I) -> {ok, ceiling(I)}; + _ -> {error, Str} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 1e87458e4..e7c1095dd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -148,7 +148,8 @@ ascii/1, find/2, find/3, - jq/2 + jq/2, + jq/3 ]). %% Map Funcs @@ -780,22 +781,38 @@ find_s(S, P, Dir) -> SubStr -> SubStr end. --spec jq(FilterProgram, JSON) -> Result when +-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when FilterProgram :: binary(), JSON :: binary() | term(), + TimeoutMS :: non_neg_integer(), Result :: [term()]. -jq(FilterProgram, JSONBin) when +jq(FilterProgram, JSONBin, TimeoutMS) when is_binary(FilterProgram), is_binary(JSONBin) -> - case jq:process_json(FilterProgram, JSONBin) of + case jq:process_json(FilterProgram, JSONBin, TimeoutMS) of {ok, Result} -> [json_decode(JSONString) || JSONString <- Result]; {error, ErrorReason} -> erlang:throw({jq_exception, ErrorReason}) end; -jq(FilterProgram, JSONTerm) when is_binary(FilterProgram) -> +jq(FilterProgram, JSONTerm, TimeoutMS) when is_binary(FilterProgram) -> JSONBin = json_encode(JSONTerm), - jq(FilterProgram, JSONBin). + jq(FilterProgram, JSONBin, TimeoutMS). + +-spec jq(FilterProgram, JSON) -> Result when + FilterProgram :: binary(), + JSON :: binary() | term(), + Result :: [term()]. +jq(FilterProgram, JSONBin) -> + ConfigRootKey = emqx_rule_engine_schema:namespace(), + jq( + FilterProgram, + JSONBin, + emqx_config:get([ + ConfigRootKey, + jq_function_default_timeout + ]) + ). %%------------------------------------------------------------------------------ %% Array Funcs diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 5b27e3efa..4c8a7ead0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -28,6 +28,14 @@ -define(PROPTEST(F), ?assert(proper:quickcheck(F()))). %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))). +init_per_suite(Config) -> + application:load(emqx_conf), + ConfigConf = <<"rule_engine {jq_function_default_timeout {}}">>, + ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf), + Config. + +end_per_suite(_Config) -> + ok. %%------------------------------------------------------------------------------ %% Test cases for IoT Funcs %%------------------------------------------------------------------------------ @@ -645,7 +653,46 @@ t_jq(_) -> ?assertEqual( jq_1_elm_res("{\"b\":2}"), apply_func(jq, [<<".">>, <<"{\"b\": 2}">>]) - ). + ), + %% Expicitly set timeout + ?assertEqual( + jq_1_elm_res("{\"b\":2}"), + apply_func(jq, [<<".">>, <<"{\"b\": 2}">>, 10000]) + ), + TOProgram = erlang:iolist_to_binary( + "def while(cond; update):" + " def _while:" + " if cond then (update | _while) else . end;" + " _while;" + "while(. < 42; . * 2)" + ), + got_timeout = + try + apply_func(jq, [TOProgram, <<"-2">>, 10]) + catch + throw:{jq_exception, {timeout, _}} -> + %% Got timeout as expected + got_timeout + end, + ConfigRootKey = emqx_rule_engine_schema:namespace(), + DefaultTimeOut = emqx_config:get([ + ConfigRootKey, + jq_function_default_timeout + ]), + case DefaultTimeOut =< 15000 of + true -> + got_timeout = + try + apply_func(jq, [TOProgram, <<"-2">>]) + catch + throw:{jq_exception, {timeout, _}} -> + %% Got timeout as expected + got_timeout + end; + false -> + %% Skip test as we don't want it to take to long time to run + ok + end. ascii_string() -> list(range(0, 127)). diff --git a/mix.exs b/mix.exs index bb0960f60..9a0c88696 100644 --- a/mix.exs +++ b/mix.exs @@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do defp jq_dep() do if enable_jq?(), - do: [{:jq, github: "emqx/jq", tag: "v0.2.2", override: true}], + do: [{:jq, github: "emqx/jq", tag: "v0.3.0-beta.1", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index a597a8749..af2e402f2 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -41,7 +41,7 @@ quicer() -> {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. jq() -> - {jq, {git, "https://github.com/emqx/jq", {tag, "v0.2.2"}}}. + {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.0-beta.1"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config),