Merge pull request #8730 from terry-xiaoyu/resource_opts

Resource opts
This commit is contained in:
Xinyu Liu 2022-08-16 10:48:53 +08:00 committed by GitHub
commit 03d83ffda5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 153 additions and 63 deletions

View File

@ -3,7 +3,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-export([roots/0, fields/1, namespace/0, desc/1]). -export([roots/0, fields/1, namespace/0, desc/1]).
@ -23,10 +23,19 @@ fields("post") ->
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post"). emqx_bridge_schema:metrics_status_fields() ++ fields("post");
fields("creation_opts") ->
lists:filter(
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
emqx_resource_schema:fields("creation_opts")
).
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc("creation_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."]; ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc(_) -> desc(_) ->
@ -119,13 +128,17 @@ request_config() ->
]. ].
webhook_creation_opts() -> webhook_creation_opts() ->
Opts = emqx_resource_schema:fields(creation_opts), [
lists:filter( {resource_opts,
fun({K, _V}) -> mk(
not lists:member(K, unsupported_opts()) ref(?MODULE, "creation_opts"),
end, #{
Opts required => false,
). default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
unsupported_opts() -> unsupported_opts() ->
[ [

View File

@ -0,0 +1,19 @@
-module(emqx_connector_utils).
-export([split_insert_sql/1]).
%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
split_insert_sql(SQL) ->
case re:split(SQL, "((?i)values)", [{return, binary}]) of
[Part1, _, Part3] ->
case string:trim(Part1, leading) of
<<"insert", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
<<"INSERT", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
_ ->
{error, not_insert_sql}
end;
_ ->
{error, not_insert_sql}
end.

View File

@ -1,5 +1,27 @@
emqx_resource_schema { emqx_resource_schema {
resource_opts {
desc {
en: """Resource options."""
zh: """资源相关的选项。"""
}
label {
en: """Resource Options"""
zh: """资源选项"""
}
}
creation_opts {
desc {
en: """Creation options."""
zh: """资源启动相关的选项。"""
}
label {
en: """Creation Options"""
zh: """资源启动选项"""
}
}
health_check_interval { health_check_interval {
desc { desc {
en: """Health check interval, in milliseconds.""" en: """Health check interval, in milliseconds."""
@ -77,17 +99,6 @@ emqx_resource_schema {
} }
} }
resume_interval {
desc {
en: """Resume time interval when resource down."""
zh: """资源不可用时的重试时间。"""
}
label {
en: """Resume interval"""
zh: """恢复时间"""
}
}
async_inflight_window { async_inflight_window {
desc { desc {
en: """Async query inflight window.""" en: """Async query inflight window."""

View File

@ -92,10 +92,6 @@
-define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL, 15000).
-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
%% milliseconds
-define(RESUME_INTERVAL, 15000).
-define(RESUME_INTERVAL_RAW, <<"15s">>).
-define(START_AFTER_CREATED, true). -define(START_AFTER_CREATED, true).
%% milliseconds %% milliseconds

View File

@ -0,0 +1,20 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(RESOURCE_ERROR(Reason, Msg),
{error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
).
-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).

View File

@ -298,21 +298,7 @@ get_instance(ResId) ->
-spec fetch_creation_opts(map()) -> creation_opts(). -spec fetch_creation_opts(map()) -> creation_opts().
fetch_creation_opts(Opts) -> fetch_creation_opts(Opts) ->
SupportedOpts = [ maps:get(resource_opts, Opts, #{}).
health_check_interval,
start_timeout,
start_after_created,
auto_restart_interval,
enable_batch,
batch_size,
batch_time,
enable_queue,
queue_max_bytes,
query_mode,
resume_interval,
async_inflight_window
],
maps:with(SupportedOpts, Opts).
-spec list_instances() -> [resource_id()]. -spec list_instances() -> [resource_id()].
list_instances() -> list_instances() ->

View File

@ -56,7 +56,6 @@
-record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}). -record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}).
-type data() :: #data{}. -type data() :: #data{}.
-define(SHORT_HEALTHCHECK_INTERVAL, 1000).
-define(ETS_TABLE, ?MODULE). -define(ETS_TABLE, ?MODULE).
-define(WAIT_FOR_RESOURCE_DELAY, 100). -define(WAIT_FOR_RESOURCE_DELAY, 100).
-define(T_OPERATION, 5000). -define(T_OPERATION, 5000).
@ -448,6 +447,11 @@ start_resource(Data, From) ->
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
{next_state, connecting, UpdatedData, Actions}; {next_state, connecting, UpdatedData, Actions};
{error, Reason} = Err -> {error, Reason} = Err ->
?SLOG(error, #{
msg => start_resource_failed,
id => Data#data.id,
reason => Reason
}),
_ = maybe_alarm(disconnected, Data#data.id), _ = maybe_alarm(disconnected, Data#data.id),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made. %% so that the Reason can be returned when the verification call is made.
@ -484,7 +488,7 @@ handle_connecting_health_check(Data) ->
(connected, UpdatedData) -> (connected, UpdatedData) ->
{next_state, connected, UpdatedData}; {next_state, connected, UpdatedData};
(connecting, UpdatedData) -> (connecting, UpdatedData) ->
Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}], Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{keep_state, UpdatedData, Actions}; {keep_state, UpdatedData, Actions};
(disconnected, UpdatedData) -> (disconnected, UpdatedData) ->
{next_state, disconnected, UpdatedData} {next_state, disconnected, UpdatedData}

View File

@ -21,6 +21,7 @@
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include("emqx_resource_utils.hrl"). -include("emqx_resource_utils.hrl").
-include("emqx_resource_errors.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -58,11 +59,6 @@
-define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}). -define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}).
-define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]).
-define(RESOURCE_ERROR(Reason, Msg),
{error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
).
-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
-type id() :: binary(). -type id() :: binary().
-type query() :: {query, from(), request()}. -type query() :: {query, from(), request()}.
-type request() :: term(). -type request() :: term().
@ -139,7 +135,7 @@ init({Id, Index, Opts}) ->
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue, queue => Queue,
resume_interval => maps:get(resume_interval, Opts, ?RESUME_INTERVAL), resume_interval => maps:get(resume_interval, Opts, ?HEALTHCHECK_INTERVAL),
acc => [], acc => [],
acc_left => BatchSize, acc_left => BatchSize,
tref => undefined tref => undefined

View File

@ -21,7 +21,7 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-export([namespace/0, roots/0, fields/1]). -export([namespace/0, roots/0, fields/1, desc/1]).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
@ -30,14 +30,25 @@ namespace() -> "resource_schema".
roots() -> []. roots() -> [].
fields('creation_opts') -> fields("resource_opts") ->
[
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(<<"resource_opts">>)
}
)}
];
fields("creation_opts") ->
[ [
{health_check_interval, fun health_check_interval/1}, {health_check_interval, fun health_check_interval/1},
{start_after_created, fun start_after_created/1}, {start_after_created, fun start_after_created/1},
{start_timeout, fun start_timeout/1}, {start_timeout, fun start_timeout/1},
{auto_restart_interval, fun auto_restart_interval/1}, {auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1}, {query_mode, fun query_mode/1},
{resume_interval, fun resume_interval/1},
{async_inflight_window, fun async_inflight_window/1}, {async_inflight_window, fun async_inflight_window/1},
{enable_batch, fun enable_batch/1}, {enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1}, {batch_size, fun batch_size/1},
@ -88,12 +99,6 @@ enable_queue(default) -> false;
enable_queue(desc) -> ?DESC("enable_queue"); enable_queue(desc) -> ?DESC("enable_queue");
enable_queue(_) -> undefined. enable_queue(_) -> undefined.
resume_interval(type) -> emqx_schema:duration_ms();
resume_interval(desc) -> ?DESC("resume_interval");
resume_interval(default) -> ?RESUME_INTERVAL_RAW;
resume_interval(required) -> false;
resume_interval(_) -> undefined.
async_inflight_window(type) -> pos_integer(); async_inflight_window(type) -> pos_integer();
async_inflight_window(desc) -> ?DESC("async_inflight_window"); async_inflight_window(desc) -> ?DESC("async_inflight_window");
async_inflight_window(default) -> ?DEFAULT_INFLIGHT; async_inflight_window(default) -> ?DEFAULT_INFLIGHT;
@ -117,3 +122,6 @@ queue_max_bytes(desc) -> ?DESC("queue_max_bytes");
queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
queue_max_bytes(required) -> false; queue_max_bytes(required) -> false;
queue_max_bytes(_) -> undefined. queue_max_bytes(_) -> undefined.
desc("creation_opts") ->
?DESC("creation_opts").

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [ {application, emqx_rule_engine, [
{description, "EMQX Rule Engine"}, {description, "EMQX Rule Engine"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.1"}, {vsn, "5.0.2"},
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt]}, {applications, [kernel, stdlib, rulesql, getopt]},

View File

@ -19,6 +19,7 @@
-include("rule_engine.hrl"). -include("rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource_errors.hrl").
-export([ -export([
apply_rule/3, apply_rule/3,
@ -322,7 +323,7 @@ handle_action(RuleId, ActId, Selected, Envs) ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
try try
Result = do_handle_action(ActId, Selected, Envs), Result = do_handle_action(ActId, Selected, Envs),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'), inc_action_metrics(Result, RuleId),
Result Result
catch catch
throw:out_of_service -> throw:out_of_service ->
@ -501,3 +502,18 @@ ensure_list(_NotList) -> [].
nested_put(Alias, Val, Columns0) -> nested_put(Alias, Val, Columns0) ->
Columns = handle_alias(Alias, Columns0), Columns = handle_alias(Alias, Columns0),
emqx_rule_maps:nested_put(Alias, Val, Columns). emqx_rule_maps:nested_put(Alias, Val, Columns).
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
inc_action_metrics(ok, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({ok, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({resource_down, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.out_of_service'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.out_of_service'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
inc_action_metrics(_, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown').

View File

@ -1,4 +1,23 @@
emqx_ee_bridge_mysql { emqx_ee_bridge_mysql {
local_topic {
desc {
en: """The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.</br>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded.
"""
zh: """发送到 'local_topic' 的消息都会转发到 MySQL。 </br>
注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
"""
}
label {
en: "Local Topic"
zh: "本地 Topic"
}
}
sql_template { sql_template {
desc { desc {
en: """SQL Template""" en: """SQL Template"""

View File

@ -91,7 +91,7 @@ fields(basic) ->
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{write_syntax, fun write_syntax/1} {write_syntax, fun write_syntax/1}
] ++ ] ++
emqx_resource_schema:fields('creation_opts'); emqx_resource_schema:fields("resource_opts");
fields("post_udp") -> fields("post_udp") ->
method_fileds(post, influxdb_udp); method_fileds(post, influxdb_udp);
fields("post_api_v1") -> fields("post_api_v1") ->

View File

@ -43,14 +43,15 @@ values(get) ->
values(post) -> values(post) ->
#{ #{
type => mysql, type => mysql,
name => <<"mysql">>, name => <<"foo">>,
local_topic => <<"local/topic/#">>,
sql_template => ?DEFAULT_SQL, sql_template => ?DEFAULT_SQL,
connector => #{ connector => #{
server => <<"127.0.0.1:3306">>, server => <<"127.0.0.1:3306">>,
database => <<"test">>, database => <<"test">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"">>,
auto_reconnect => true auto_reconnect => true
}, },
enable => true, enable => true,
@ -61,7 +62,7 @@ values(put) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge". namespace() -> "bridge_mysql".
roots() -> []. roots() -> [].
@ -69,6 +70,7 @@ fields("config") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{sql_template, {sql_template,
mk( mk(
binary(), binary(),
@ -82,7 +84,7 @@ fields("config") ->
desc => ?DESC("desc_connector") desc => ?DESC("desc_connector")
} }
)} )}
]; ] ++ emqx_resource_schema:fields("resource_opts");
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->