diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 23c2aab50..c9926f56f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -26,7 +26,7 @@ -export([roots/0, fields/1]). --type tag() :: rule_creation | rule_test. +-type tag() :: rule_creation | rule_test | rule_engine. -spec check_params(map(), tag()) -> {ok, map()} | {error, term()}. check_params(Params, Tag) -> @@ -48,12 +48,15 @@ check_params(Params, Tag) -> roots() -> [ + {"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})}, {"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})}, {"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})}, {"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})}, {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})} ]. +fields("rule_engine") -> + emqx_rule_engine_schema:rule_engine_settings(); fields("rule_creation") -> emqx_rule_engine_schema:fields("rules"); fields("rule_info") -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 106693a0a..f640f8303 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -32,6 +32,7 @@ %% API callbacks -export([ + '/rule_engine'/2, '/rule_events'/2, '/rule_test'/2, '/rules'/2, @@ -41,7 +42,7 @@ ]). %% query callback --export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]). +-export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]). -define(ERR_BADARGS(REASON), begin R0 = err_msg(REASON), @@ -134,6 +135,7 @@ api_spec() -> paths() -> [ + "/rule_engine", "/rule_events", "/rule_test", "/rules", @@ -145,6 +147,9 @@ paths() -> error_schema(Code, Message) when is_atom(Code) -> emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)). +rule_engine_schema() -> + ref(emqx_rule_api_schema, "rule_engine"). + rule_creation_schema() -> ref(emqx_rule_api_schema, "rule_creation"). @@ -184,7 +189,7 @@ schema("/rules") -> responses => #{ 200 => [ - {data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})}, + {data, mk(array(rule_info_schema()), #{desc => ?DESC("api1_resp")})}, {meta, mk(ref(emqx_dashboard_swagger, meta), #{})} ], 400 => error_schema('BAD_REQUEST', "Invalid Parameters") @@ -289,6 +294,26 @@ schema("/rule_test") -> 200 => <<"Rule Test Pass">> } } + }; +schema("/rule_engine") -> + #{ + 'operationId' => '/rule_engine', + get => #{ + tags => [<<"rules">>], + description => ?DESC("api9"), + responses => #{ + 200 => rule_engine_schema() + } + }, + put => #{ + tags => [<<"rules">>], + description => ?DESC("api10"), + 'requestBody' => rule_engine_schema(), + responses => #{ + 200 => rule_engine_schema(), + 400 => error_schema('BAD_REQUEST', "Invalid request") + } + } }. param_path_id() -> @@ -309,7 +334,7 @@ param_path_id() -> QueryString, ?RULE_QS_SCHEMA, fun ?MODULE:qs2ms/2, - fun ?MODULE:format_rule_resp/1 + fun ?MODULE:format_rule_info_resp/1 ) of {error, page_limit_invalid} -> @@ -331,7 +356,7 @@ param_path_id() -> case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), - {201, format_rule_resp(Rule)}; + {201, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ msg => "create_rule_failed", @@ -362,7 +387,7 @@ param_path_id() -> '/rules/:id'(get, #{bindings := #{id := Id}}) -> case emqx_rule_engine:get_rule(Id) of {ok, Rule} -> - {200, format_rule_resp(Rule)}; + {200, format_rule_info_resp(Rule)}; not_found -> {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}} end; @@ -372,7 +397,7 @@ param_path_id() -> case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), - {200, format_rule_resp(Rule)}; + {200, format_rule_info_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{ msg => "update_rule_failed", @@ -419,6 +444,16 @@ param_path_id() -> {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}} end. +'/rule_engine'(get, _Params) -> + {200, format_rule_engine_resp(emqx_conf:get([rule_engine]))}; +'/rule_engine'(put, #{body := Params}) -> + case rule_engine_update(Params) of + {ok, Config} -> + {200, format_rule_engine_resp(Config)}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}} + end. + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ @@ -440,11 +475,9 @@ encode_nested_error(RuleError, Reason) -> {RuleError, Reason} end. -format_rule_resp(Rules) when is_list(Rules) -> - [format_rule_resp(R) || R <- Rules]; -format_rule_resp({Id, Rule}) -> - format_rule_resp(Rule#{id => Id}); -format_rule_resp(#{ +format_rule_info_resp({Id, Rule}) -> + format_rule_info_resp(Rule#{id => Id}); +format_rule_info_resp(#{ id := Id, name := Name, created_at := CreatedAt, @@ -465,6 +498,9 @@ format_rule_resp(#{ description => Descr }. +format_rule_engine_resp(Config) -> + maps:remove(rules, Config). + format_datetime(Timestamp, Unit) -> list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])). @@ -661,3 +697,14 @@ run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, like, Pattern} | Fuzzy]) - run_fuzzy_match(E, Fuzzy); run_fuzzy_match(E, [_ | Fuzzy]) -> run_fuzzy_match(E, Fuzzy). + +rule_engine_update(Params) -> + case emqx_rule_api_schema:check_params(Params, rule_engine) of + {ok, _CheckedParams} -> + {ok, #{config := Config}} = emqx_conf:update([rule_engine], Params, #{ + override_to => cluster + }), + {ok, Config}; + {error, Reason} -> + {error, Reason} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 2281eea53..5b205f355 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -27,7 +27,8 @@ roots/0, fields/1, desc/1, - post_config_update/5 + post_config_update/5, + rule_engine_settings/0 ]). -export([validate_sql/1]). @@ -40,31 +41,13 @@ tags() -> roots() -> ["rule_engine"]. fields("rule_engine") -> - [ - {ignore_sys_message, - ?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})}, - {rules, - ?HOCON(hoconsc:map("id", ?R_REF("rules")), #{ - desc => ?DESC("rule_engine_rules"), default => #{} - })}, - {jq_function_default_timeout, - ?HOCON( - emqx_schema:duration_ms(), - #{ - default => <<"10s">>, - desc => ?DESC("rule_engine_jq_function_default_timeout") - } - )}, - {jq_implementation_module, - ?HOCON( - hoconsc:enum([jq_nif, jq_port]), - #{ - default => jq_nif, - mapping => "jq.jq_implementation_module", - desc => ?DESC("rule_engine_jq_implementation_module") - } - )} - ]; + rule_engine_settings() ++ + [ + {rules, + ?HOCON(hoconsc:map("id", ?R_REF("rules")), #{ + desc => ?DESC("rule_engine_rules"), default => #{} + })} + ]; fields("rules") -> [ rule_name(), @@ -227,6 +210,31 @@ actions() -> qos() -> ?UNION([emqx_schema:qos(), binary()]). +rule_engine_settings() -> + [ + {ignore_sys_message, + ?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})}, + {jq_function_default_timeout, + ?HOCON( + emqx_schema:duration_ms(), + #{ + default => <<"10s">>, + desc => ?DESC("rule_engine_jq_function_default_timeout") + } + )}, + {jq_implementation_module, + ?HOCON( + hoconsc:enum([jq_nif, jq_port]), + #{ + default => jq_nif, + mapping => "jq.jq_implementation_module", + desc => ?DESC("rule_engine_jq_implementation_module"), + deprecated => {since, "v5.0.22"}, + importance => ?IMPORTANCE_HIDDEN + } + )} + ]. + validate_sql(Sql) -> case emqx_rule_sqlparser:parse(Sql) of {ok, _Result} -> ok; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index d89bc2651..e94806a7b 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -23,6 +23,14 @@ -include_lib("common_test/include/ct.hrl"). -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>). +-define(SIMPLE_RULE(NAME_SUFFIX), #{ + <<"description">> => <<"A simple rule">>, + <<"enable">> => true, + <<"actions">> => [#{<<"function">> => <<"console">>}], + <<"sql">> => <<"SELECT * from \"t/1\"">>, + <<"name">> => <<"test_rule", NAME_SUFFIX/binary>> +}). +-define(SIMPLE_RULE(ID, NAME_SUFFIX), ?SIMPLE_RULE(NAME_SUFFIX)#{<<"id">> => ID}). all() -> emqx_common_test_helpers:all(?MODULE). @@ -37,6 +45,9 @@ end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), ok. +init_per_testcase(t_crud_rule_api, Config) -> + meck:new(emqx_json, [passthrough]), + init_per_testcase(common, Config); init_per_testcase(_, Config) -> Config. @@ -48,7 +59,7 @@ end_per_testcase(_, _Config) -> emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}), lists:foreach( fun(#{id := Id}) -> - emqx_rule_engine_api:'/rules/:id'( + {204} = emqx_rule_engine_api:'/rules/:id'( delete, #{bindings => #{id => Id}} ) @@ -57,45 +68,38 @@ end_per_testcase(_, _Config) -> ). t_crud_rule_api(_Config) -> - RuleID = <<"my_rule">>, - Params0 = #{ - <<"description">> => <<"A simple rule">>, - <<"enable">> => true, - <<"id">> => RuleID, - <<"actions">> => [#{<<"function">> => <<"console">>}], - <<"sql">> => <<"SELECT * from \"t/1\"">>, - <<"name">> => <<"test_rule">> - }, - {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params0}), - %% if we post again with the same params, it return with 400 "rule id already exists" - ?assertMatch( - {400, #{code := _, message := _Message}}, - emqx_rule_engine_api:'/rules'(post, #{body => Params0}) - ), + RuleId = <<"my_rule">>, + Rule = simple_rule_fixture(RuleId, <<>>), + ?assertEqual(RuleId, maps:get(id, Rule)), - ?assertEqual(RuleID, maps:get(id, Rule)), {200, #{data := Rules}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}), ct:pal("RList : ~p", [Rules]), ?assert(length(Rules) > 0), + %% if we post again with the same id, it return with 400 "rule id already exists" + ?assertMatch( + {400, #{code := _, message := _Message}}, + emqx_rule_engine_api:'/rules'(post, #{body => ?SIMPLE_RULE(RuleId, <<"some_other">>)}) + ), + {204} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{ - bindings => #{id => RuleID} + bindings => #{id => RuleId} }), - {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}), + {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}), ct:pal("RShow : ~p", [Rule1]), ?assertEqual(Rule, Rule1), - {200, Metrics} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{bindings => #{id => RuleID}}), + {200, Metrics} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{bindings => #{id => RuleId}}), ct:pal("RMetrics : ~p", [Metrics]), - ?assertMatch(#{id := RuleID, metrics := _, node_metrics := _}, Metrics), + ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics), {200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{ - bindings => #{id => RuleID}, - body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>} + bindings => #{id => RuleId}, + body => ?SIMPLE_RULE(RuleId)#{<<"sql">> => <<"select * from \"t/b\"">>} }), - {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}), + {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}), %ct:pal("RShow : ~p", [Rule3]), ?assertEqual(Rule3, Rule2), ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)), @@ -112,14 +116,14 @@ t_crud_rule_api(_Config) -> {204}, emqx_rule_engine_api:'/rules/:id'( delete, - #{bindings => #{id => RuleID}} + #{bindings => #{id => RuleId}} ) ), %ct:pal("Show After Deleted: ~p", [NotFound]), ?assertMatch( {404, #{code := _, message := _Message}}, - emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}) + emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}) ), {400, #{ @@ -174,30 +178,15 @@ t_crud_rule_api(_Config) -> ok. t_list_rule_api(_Config) -> - AddIds = - lists:map( - fun(Seq0) -> - Seq = integer_to_binary(Seq0), - Params = #{ - <<"description">> => <<"A simple rule">>, - <<"enable">> => true, - <<"actions">> => [#{<<"function">> => <<"console">>}], - <<"sql">> => <<"SELECT * from \"t/1\"">>, - <<"name">> => <<"test_rule", Seq/binary>> - }, - {201, #{id := Id}} = emqx_rule_engine_api:'/rules'(post, #{body => Params}), - Id - end, - lists:seq(1, 20) - ), - + AddIds = rules_fixture(20), + ct:pal("rule ids: ~p", [AddIds]), {200, #{data := Rules, meta := #{count := Count}}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}), ?assertEqual(20, length(AddIds)), ?assertEqual(20, length(Rules)), ?assertEqual(20, Count), - [RuleID | _] = AddIds, + [RuleId | _] = AddIds, UpdateParams = #{ <<"description">> => <<"中文的描述也能搜索"/utf8>>, <<"enable">> => false, @@ -206,7 +195,7 @@ t_list_rule_api(_Config) -> <<"name">> => <<"test_rule_update1">> }, {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{ - bindings => #{id => RuleID}, + bindings => #{id => RuleId}, body => UpdateParams }), QueryStr1 = #{query_string => #{<<"enable">> => false}}, @@ -229,20 +218,13 @@ t_list_rule_api(_Config) -> {200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5), ?assertEqual(maps:get(data, Result1), maps:get(data, Result5)), - QueryStr6 = #{query_string => #{<<"like_id">> => RuleID}}, + QueryStr6 = #{query_string => #{<<"like_id">> => RuleId}}, {200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6), ?assertEqual(maps:get(data, Result1), maps:get(data, Result6)), ok. t_reset_metrics_on_disable(_Config) -> - Params = #{ - <<"description">> => <<"A simple rule">>, - <<"enable">> => true, - <<"actions">> => [#{<<"function">> => <<"console">>}], - <<"sql">> => <<"SELECT * from \"t/1\"">>, - <<"name">> => atom_to_binary(?FUNCTION_NAME) - }, - {201, #{id := RuleId}} = emqx_rule_engine_api:'/rules'(post, #{body => Params}), + #{id := RuleId} = simple_rule_fixture(), %% generate some fake metrics emqx_metrics_worker:inc(rule_metrics, RuleId, 'matched', 10), @@ -256,7 +238,7 @@ t_reset_metrics_on_disable(_Config) -> %% disable the rule; metrics should be reset {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{ bindings => #{id => RuleId}, - body => Params#{<<"enable">> := false} + body => #{<<"enable">> => false} }), {200, #{metrics := Metrics1}} = emqx_rule_engine_api:'/rules/:id/metrics'( @@ -281,3 +263,45 @@ test_rule_params(Sql, Payload) -> <<"sql">> => Sql } }. + +t_rule_engine(_) -> + _ = simple_rule_fixture(), + {200, Config} = emqx_rule_engine_api:'/rule_engine'(get, #{}), + ?assert(not maps:is_key(rules, Config)), + {200, #{ + jq_function_default_timeout := 12000 + % hidden! jq_implementation_module := jq_port + }} = emqx_rule_engine_api:'/rule_engine'(put, #{ + body => #{ + <<"jq_function_default_timeout">> => <<"12s">>, + <<"jq_implementation_module">> => <<"jq_port">> + } + }), + SomeRule = #{<<"sql">> => <<"SELECT * FROM \"t/#\"">>}, + {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{ + body => #{<<"rules">> => #{<<"some_rule">> => SomeRule}} + }), + {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}). + +rules_fixture(N) -> + lists:map( + fun(Seq0) -> + Seq = integer_to_binary(Seq0), + #{id := Id} = simple_rule_fixture(Seq), + Id + end, + lists:seq(1, N) + ). + +simple_rule_fixture() -> + simple_rule_fixture(<<>>). + +simple_rule_fixture(NameSuffix) -> + create_rule(?SIMPLE_RULE(NameSuffix)). + +simple_rule_fixture(Id, NameSuffix) -> + create_rule(?SIMPLE_RULE(Id, NameSuffix)). + +create_rule(Params) -> + {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params}), + Rule. diff --git a/changes/ce/feat-10336.en.md b/changes/ce/feat-10336.en.md new file mode 100644 index 000000000..5e6039f9b --- /dev/null +++ b/changes/ce/feat-10336.en.md @@ -0,0 +1 @@ +Add `/rule_engine` API endpoint to manage configuration of rule engine. diff --git a/rel/i18n/emqx_rule_api_schema.hocon b/rel/i18n/emqx_rule_api_schema.hocon index f9b344666..0d8253223 100644 --- a/rel/i18n/emqx_rule_api_schema.hocon +++ b/rel/i18n/emqx_rule_api_schema.hocon @@ -638,6 +638,17 @@ emqx_rule_api_schema { } } + root_rule_engine { + desc { + en: "Rule engine configurations. This API can be used to change EMQX rule engine settings. But not for the rules. To list, create, or update rules, call the '/rules' API instead." + zh: "规则引擎配置。该 API 可用于查看和修改规则引擎相关的一些设置。但不可用于规则,如需查看或修改规则,请调用 '/rules' API 进行操作。" + } + label: { + en: "Rule engine configuration" + zh: "规则引擎配置" + } + } + root_rule_creation { desc { en: "Schema for creating rules" diff --git a/rel/i18n/emqx_rule_engine_api.hocon b/rel/i18n/emqx_rule_engine_api.hocon index 39fc3186c..8a57f8e31 100644 --- a/rel/i18n/emqx_rule_engine_api.hocon +++ b/rel/i18n/emqx_rule_engine_api.hocon @@ -50,7 +50,16 @@ emqx_rule_engine_api { zh: "根据规则来源 Topic 过滤, 使用 MQTT Topic 匹配" } } - + api1_resp { + desc { + en: "List of rules" + zh: "规则列表" + } + label: { + en: "List Rules" + zh: "列出所有规则" + } + } api2 { desc { en: "Create a new rule using given Id" @@ -113,10 +122,9 @@ emqx_rule_engine_api { } label: { en: "Delete Cluster Rule" - zh: "删除集群规则" + zh: "基于给定 ID 新建一条规则" } } - api7 { desc { en: "Reset a rule metrics" @@ -127,7 +135,6 @@ emqx_rule_engine_api { zh: "重置规则计数" } } - api8 { desc { en: "Test a rule" @@ -138,14 +145,24 @@ emqx_rule_engine_api { zh: "测试规则" } } - desc9 { + api9 { desc { - en: "List of rules" - zh: "列出所有规则" + en: "Get rule engine configuration." + zh: "获取规则引擎配置。" } - label: { - en: "List Rules" - zh: "列出所有规则" + label { + en: "Get configuration" + zh: "获取配置" + } + } + api10 { + desc { + en: "Update rule engine configuration." + zh: "更新规则引擎配置。" + } + label { + en: "Update configuration" + zh: "更新配置" } - } + } }