From 05032467bd1e16b30dcd628c37bdf0fdc9b2cacb Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 17 May 2022 11:46:00 +0200 Subject: [PATCH] feat(rule_engine): default timeout for jq/2 and jq/3 with timeout This commit adds a default timeout of 10 seconds to the rule engine's `jq/2` function, and adds a new function `jq/3` (where the last parameter is a timeout value). The default timeout can be configured with the setting "rule_engine.jq_function_default_timeout". Having a timeout when executing jq code in the rule engine is important as jq code can potentially run forever. Also, the Erlang jq library limits the number of jq programs that can execute concurrently so a jq program that loops forever could potentially also prevent a "non-buggy" jq program from ever starting. --- .../etc/emqx_rule_engine.conf | 1 + .../i18n/emqx_rule_engine_schema.conf | 11 +++++ .../src/emqx_rule_engine_schema.erl | 40 ++++++++++++++- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 29 ++++++++--- .../test/emqx_rule_funcs_SUITE.erl | 49 ++++++++++++++++++- mix.exs | 2 +- rebar.config.erl | 2 +- 7 files changed, 124 insertions(+), 10 deletions(-) diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index b502fa039..fba24c6fb 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -3,6 +3,7 @@ ##==================================================================== rule_engine { ignore_sys_message = true + jq_function_default_timeout = 10s #rules.my_republish_rule { # description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'" # enable = true diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf index db469ba82..1efcf9ea2 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf @@ -239,6 +239,17 @@ of the rule, then the string "undefined" is used. } } + rule_engine_jq_function_default_timeout { + desc { + en: "Default timeout for the jq/2 rule engine function" + zh: "Default timeout for the jq/2 rule engine function" + } + label: { + en: "Rule engine jq function default timeout" + zh: "Rule engine jq function default timeout" + } + } + desc_rule_engine { desc { en: """Configuration for the EMQX Rule Engine.""" diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index eed257b27..43c8e194b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -19,8 +19,16 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-type duration_ms() :: integer(). + +-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). + -behaviour(hocon_schema). +-reflect_type([ + duration_ms/0 +]). + -export([ namespace/0, roots/0, @@ -30,6 +38,11 @@ -export([validate_sql/1, validate_rule_name/1]). +% workaround: prevent being recognized as unused functions +-export([ + to_duration_ms/1 +]). + namespace() -> rule_engine. roots() -> ["rule_engine"]. @@ -41,7 +54,15 @@ fields("rule_engine") -> {rules, sc(hoconsc:map("id", ref("rules")), #{ desc => ?DESC("rule_engine_rules"), default => #{} - })} + })}, + {jq_function_default_timeout, + sc( + duration_ms(), + #{ + default => "10s", + desc => ?DESC("rule_engine_jq_function_default_timeout") + } + )} ]; fields("rules") -> [ @@ -218,3 +239,20 @@ validate_sql(Sql) -> sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). + +-spec ceiling(number()) -> integer(). +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. + +-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when + Input :: string() | binary(). +to_duration_ms(Str) -> + case hocon_postprocess:duration(Str) of + I when is_number(I) -> {ok, ceiling(I)}; + _ -> {error, Str} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 1e87458e4..e7c1095dd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -148,7 +148,8 @@ ascii/1, find/2, find/3, - jq/2 + jq/2, + jq/3 ]). %% Map Funcs @@ -780,22 +781,38 @@ find_s(S, P, Dir) -> SubStr -> SubStr end. --spec jq(FilterProgram, JSON) -> Result when +-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when FilterProgram :: binary(), JSON :: binary() | term(), + TimeoutMS :: non_neg_integer(), Result :: [term()]. -jq(FilterProgram, JSONBin) when +jq(FilterProgram, JSONBin, TimeoutMS) when is_binary(FilterProgram), is_binary(JSONBin) -> - case jq:process_json(FilterProgram, JSONBin) of + case jq:process_json(FilterProgram, JSONBin, TimeoutMS) of {ok, Result} -> [json_decode(JSONString) || JSONString <- Result]; {error, ErrorReason} -> erlang:throw({jq_exception, ErrorReason}) end; -jq(FilterProgram, JSONTerm) when is_binary(FilterProgram) -> +jq(FilterProgram, JSONTerm, TimeoutMS) when is_binary(FilterProgram) -> JSONBin = json_encode(JSONTerm), - jq(FilterProgram, JSONBin). + jq(FilterProgram, JSONBin, TimeoutMS). + +-spec jq(FilterProgram, JSON) -> Result when + FilterProgram :: binary(), + JSON :: binary() | term(), + Result :: [term()]. +jq(FilterProgram, JSONBin) -> + ConfigRootKey = emqx_rule_engine_schema:namespace(), + jq( + FilterProgram, + JSONBin, + emqx_config:get([ + ConfigRootKey, + jq_function_default_timeout + ]) + ). %%------------------------------------------------------------------------------ %% Array Funcs diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 5b27e3efa..4c8a7ead0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -28,6 +28,14 @@ -define(PROPTEST(F), ?assert(proper:quickcheck(F()))). %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))). +init_per_suite(Config) -> + application:load(emqx_conf), + ConfigConf = <<"rule_engine {jq_function_default_timeout {}}">>, + ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf), + Config. + +end_per_suite(_Config) -> + ok. %%------------------------------------------------------------------------------ %% Test cases for IoT Funcs %%------------------------------------------------------------------------------ @@ -645,7 +653,46 @@ t_jq(_) -> ?assertEqual( jq_1_elm_res("{\"b\":2}"), apply_func(jq, [<<".">>, <<"{\"b\": 2}">>]) - ). + ), + %% Expicitly set timeout + ?assertEqual( + jq_1_elm_res("{\"b\":2}"), + apply_func(jq, [<<".">>, <<"{\"b\": 2}">>, 10000]) + ), + TOProgram = erlang:iolist_to_binary( + "def while(cond; update):" + " def _while:" + " if cond then (update | _while) else . end;" + " _while;" + "while(. < 42; . * 2)" + ), + got_timeout = + try + apply_func(jq, [TOProgram, <<"-2">>, 10]) + catch + throw:{jq_exception, {timeout, _}} -> + %% Got timeout as expected + got_timeout + end, + ConfigRootKey = emqx_rule_engine_schema:namespace(), + DefaultTimeOut = emqx_config:get([ + ConfigRootKey, + jq_function_default_timeout + ]), + case DefaultTimeOut =< 15000 of + true -> + got_timeout = + try + apply_func(jq, [TOProgram, <<"-2">>]) + catch + throw:{jq_exception, {timeout, _}} -> + %% Got timeout as expected + got_timeout + end; + false -> + %% Skip test as we don't want it to take to long time to run + ok + end. ascii_string() -> list(range(0, 127)). diff --git a/mix.exs b/mix.exs index bb0960f60..9a0c88696 100644 --- a/mix.exs +++ b/mix.exs @@ -611,7 +611,7 @@ defmodule EMQXUmbrella.MixProject do defp jq_dep() do if enable_jq?(), - do: [{:jq, github: "emqx/jq", tag: "v0.2.2", override: true}], + do: [{:jq, github: "emqx/jq", tag: "v0.3.0-beta.1", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index a597a8749..af2e402f2 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -41,7 +41,7 @@ quicer() -> {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. jq() -> - {jq, {git, "https://github.com/emqx/jq", {tag, "v0.2.2"}}}. + {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.0-beta.1"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config),