245 lines
7.1 KiB
Erlang
245 lines
7.1 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_engine_schema).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
-behaviour(hocon_schema).
|
|
|
|
-export([
|
|
namespace/0,
|
|
tags/0,
|
|
roots/0,
|
|
fields/1,
|
|
desc/1,
|
|
post_config_update/5
|
|
]).
|
|
|
|
-export([validate_sql/1]).
|
|
|
|
namespace() -> rule_engine.
|
|
|
|
tags() ->
|
|
[<<"Rule Engine">>].
|
|
|
|
roots() -> ["rule_engine"].
|
|
|
|
fields("rule_engine") ->
|
|
[
|
|
{ignore_sys_message,
|
|
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
|
|
{rules,
|
|
?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
|
|
desc => ?DESC("rule_engine_rules"), default => #{}
|
|
})},
|
|
{jq_function_default_timeout,
|
|
?HOCON(
|
|
emqx_schema:duration_ms(),
|
|
#{
|
|
default => "10s",
|
|
desc => ?DESC("rule_engine_jq_function_default_timeout")
|
|
}
|
|
)},
|
|
{jq_implementation_module,
|
|
?HOCON(
|
|
hoconsc:enum([jq_nif, jq_port]),
|
|
#{
|
|
default => jq_nif,
|
|
mapping => "jq.jq_implementation_module",
|
|
desc => ?DESC("rule_engine_jq_implementation_module")
|
|
}
|
|
)}
|
|
];
|
|
fields("rules") ->
|
|
[
|
|
rule_name(),
|
|
{"sql",
|
|
?HOCON(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("rules_sql"),
|
|
example => "SELECT * FROM \"test/topic\" WHERE payload.x = 1",
|
|
required => true,
|
|
validator => fun ?MODULE:validate_sql/1
|
|
}
|
|
)},
|
|
{"actions",
|
|
?HOCON(
|
|
?ARRAY(?UNION(actions())),
|
|
#{
|
|
desc => ?DESC("rules_actions"),
|
|
default => [],
|
|
example => [
|
|
<<"webhook:my_webhook">>,
|
|
#{
|
|
function => republish,
|
|
args => #{
|
|
topic => <<"t/1">>, payload => <<"${payload}">>
|
|
}
|
|
},
|
|
#{function => console}
|
|
]
|
|
}
|
|
)},
|
|
{"enable", ?HOCON(boolean(), #{desc => ?DESC("rules_enable"), default => true})},
|
|
{"description",
|
|
?HOCON(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("rules_description"),
|
|
example => "Some description",
|
|
default => <<>>
|
|
}
|
|
)},
|
|
{"metadata", ?HOCON(map(), #{desc => ?DESC("rules_metadata")})}
|
|
];
|
|
fields("builtin_action_republish") ->
|
|
[
|
|
{function, ?HOCON(republish, #{desc => ?DESC("republish_function")})},
|
|
{args, ?HOCON(?R_REF("republish_args"), #{default => #{}})}
|
|
];
|
|
fields("builtin_action_console") ->
|
|
[
|
|
{function, ?HOCON(console, #{desc => ?DESC("console_function")})}
|
|
%% we may support some args for the console action in the future
|
|
%, {args, sc(map(), #{desc => "The arguments of the built-in 'console' action",
|
|
% default => #{}})}
|
|
];
|
|
fields("user_provided_function") ->
|
|
[
|
|
{function,
|
|
?HOCON(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("user_provided_function_function"),
|
|
required => true,
|
|
example => "module:function"
|
|
}
|
|
)},
|
|
{args,
|
|
?HOCON(
|
|
map(),
|
|
#{
|
|
desc => ?DESC("user_provided_function_args"),
|
|
default => #{}
|
|
}
|
|
)}
|
|
];
|
|
fields("republish_args") ->
|
|
[
|
|
{topic,
|
|
?HOCON(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("republish_args_topic"),
|
|
required => true,
|
|
example => <<"a/1">>
|
|
}
|
|
)},
|
|
{qos,
|
|
?HOCON(
|
|
qos(),
|
|
#{
|
|
desc => ?DESC("republish_args_qos"),
|
|
default => <<"${qos}">>,
|
|
example => <<"${qos}">>
|
|
}
|
|
)},
|
|
{retain,
|
|
?HOCON(
|
|
hoconsc:union([boolean(), binary()]),
|
|
#{
|
|
desc => ?DESC("republish_args_retain"),
|
|
default => <<"${retain}">>,
|
|
example => <<"${retain}">>
|
|
}
|
|
)},
|
|
{payload,
|
|
?HOCON(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("republish_args_payload"),
|
|
default => <<"${payload}">>,
|
|
example => <<"${payload}">>
|
|
}
|
|
)},
|
|
{user_properties,
|
|
?HOCON(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("republish_args_user_properties"),
|
|
default => <<"${user_properties}">>,
|
|
example => <<"${pub_props.'User-Property'}">>
|
|
}
|
|
)}
|
|
].
|
|
|
|
desc("rule_engine") ->
|
|
?DESC("desc_rule_engine");
|
|
desc("rules") ->
|
|
?DESC("desc_rules");
|
|
desc("builtin_action_republish") ->
|
|
?DESC("desc_builtin_action_republish");
|
|
desc("builtin_action_console") ->
|
|
?DESC("desc_builtin_action_console");
|
|
desc("user_provided_function") ->
|
|
?DESC("desc_user_provided_function");
|
|
desc("republish_args") ->
|
|
?DESC("desc_republish_args");
|
|
desc(_) ->
|
|
undefined.
|
|
|
|
rule_name() ->
|
|
{"name",
|
|
?HOCON(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC("rules_name"),
|
|
default => <<"">>,
|
|
required => false,
|
|
example => "foo"
|
|
}
|
|
)}.
|
|
|
|
actions() ->
|
|
[
|
|
binary(),
|
|
?R_REF("builtin_action_republish"),
|
|
?R_REF("builtin_action_console"),
|
|
?R_REF("user_provided_function")
|
|
].
|
|
|
|
qos() ->
|
|
?UNION([emqx_schema:qos(), binary()]).
|
|
|
|
validate_sql(Sql) ->
|
|
case emqx_rule_sqlparser:parse(Sql) of
|
|
{ok, _Result} -> ok;
|
|
{error, Reason} -> {error, Reason}
|
|
end.
|
|
|
|
post_config_update(
|
|
[rule_engine, jq_implementation_module],
|
|
_Req,
|
|
NewSysConf,
|
|
_OldSysConf,
|
|
_AppEnvs
|
|
) ->
|
|
jq:set_implementation_module(NewSysConf),
|
|
ok.
|