diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 37bee3c3c..7c19e9e55 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -3,7 +3,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1]). +-import(hoconsc, [mk/2, enum/1, ref/2]). -export([roots/0, fields/1, namespace/0, desc/1]). @@ -23,10 +23,19 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:metrics_status_fields() ++ fields("post"); +fields("creation_opts") -> + lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_resource_schema:fields("creation_opts") + ). desc("config") -> ?DESC("desc_config"); +desc("creation_opts") -> + ?DESC(emqx_resource_schema, "creation_opts"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; desc(_) -> @@ -119,13 +128,17 @@ request_config() -> ]. webhook_creation_opts() -> - Opts = emqx_resource_schema:fields(creation_opts), - lists:filter( - fun({K, _V}) -> - not lists:member(K, unsupported_opts()) - end, - Opts - ). + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ]. unsupported_opts() -> [ diff --git a/apps/emqx_connector/src/emqx_connector_utils.erl b/apps/emqx_connector/src/emqx_connector_utils.erl new file mode 100644 index 000000000..94b12921d --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_utils.erl @@ -0,0 +1,19 @@ +-module(emqx_connector_utils). + +-export([split_insert_sql/1]). + +%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> +split_insert_sql(SQL) -> + case re:split(SQL, "((?i)values)", [{return, binary}]) of + [Part1, _, Part3] -> + case string:trim(Part1, leading) of + <<"insert", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + <<"INSERT", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + _ -> + {error, not_insert_sql} + end; + _ -> + {error, not_insert_sql} + end. diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index c07573b1a..ce4c7e3b0 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -1,5 +1,27 @@ emqx_resource_schema { + resource_opts { + desc { + en: """Resource options.""" + zh: """资源相关的选项。""" + } + label { + en: """Resource Options""" + zh: """资源选项""" + } + } + + creation_opts { + desc { + en: """Creation options.""" + zh: """资源启动相关的选项。""" + } + label { + en: """Creation Options""" + zh: """资源启动选项""" + } + } + health_check_interval { desc { en: """Health check interval, in milliseconds.""" @@ -77,17 +99,6 @@ emqx_resource_schema { } } - resume_interval { - desc { - en: """Resume time interval when resource down.""" - zh: """资源不可用时的重试时间。""" - } - label { - en: """Resume interval""" - zh: """恢复时间""" - } - } - async_inflight_window { desc { en: """Async query inflight window.""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 8ec57a00e..04b3f16ea 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -92,10 +92,6 @@ -define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). -%% milliseconds --define(RESUME_INTERVAL, 15000). --define(RESUME_INTERVAL_RAW, <<"15s">>). - -define(START_AFTER_CREATED, true). %% milliseconds diff --git a/apps/emqx_resource/include/emqx_resource_errors.hrl b/apps/emqx_resource/include/emqx_resource_errors.hrl new file mode 100644 index 000000000..b11ee3c1a --- /dev/null +++ b/apps/emqx_resource/include/emqx_resource_errors.hrl @@ -0,0 +1,20 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(RESOURCE_ERROR(Reason, Msg), + {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} +). +-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 99e1f6057..0295292dd 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -298,21 +298,7 @@ get_instance(ResId) -> -spec fetch_creation_opts(map()) -> creation_opts(). fetch_creation_opts(Opts) -> - SupportedOpts = [ - health_check_interval, - start_timeout, - start_after_created, - auto_restart_interval, - enable_batch, - batch_size, - batch_time, - enable_queue, - queue_max_bytes, - query_mode, - resume_interval, - async_inflight_window - ], - maps:with(SupportedOpts, Opts). + maps:get(resource_opts, Opts, #{}). -spec list_instances() -> [resource_id()]. list_instances() -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 66d9e32b0..07abd4007 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -56,7 +56,6 @@ -record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}). -type data() :: #data{}. --define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(ETS_TABLE, ?MODULE). -define(WAIT_FOR_RESOURCE_DELAY, 100). -define(T_OPERATION, 5000). @@ -448,6 +447,11 @@ start_resource(Data, From) -> Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), {next_state, connecting, UpdatedData, Actions}; {error, Reason} = Err -> + ?SLOG(error, #{ + msg => start_resource_failed, + id => Data#data.id, + reason => Reason + }), _ = maybe_alarm(disconnected, Data#data.id), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. @@ -484,7 +488,7 @@ handle_connecting_health_check(Data) -> (connected, UpdatedData) -> {next_state, connected, UpdatedData}; (connecting, UpdatedData) -> - Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}], + Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], {keep_state, UpdatedData, Actions}; (disconnected, UpdatedData) -> {next_state, disconnected, UpdatedData} diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index e940dcb69..26b9706c9 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -21,6 +21,7 @@ -include("emqx_resource.hrl"). -include("emqx_resource_utils.hrl"). +-include("emqx_resource_errors.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -58,11 +59,6 @@ -define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). --define(RESOURCE_ERROR(Reason, Msg), - {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} -). --define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). - -type id() :: binary(). -type query() :: {query, from(), request()}. -type request() :: term(). @@ -139,7 +135,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, - resume_interval => maps:get(resume_interval, Opts, ?RESUME_INTERVAL), + resume_interval => maps:get(resume_interval, Opts, ?HEALTHCHECK_INTERVAL), acc => [], acc_left => BatchSize, tref => undefined diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 6111543d2..2272234f2 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -21,7 +21,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --export([namespace/0, roots/0, fields/1]). +-export([namespace/0, roots/0, fields/1, desc/1]). %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -30,14 +30,25 @@ namespace() -> "resource_schema". roots() -> []. -fields('creation_opts') -> +fields("resource_opts") -> + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(<<"resource_opts">>) + } + )} + ]; +fields("creation_opts") -> [ {health_check_interval, fun health_check_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, - {resume_interval, fun resume_interval/1}, {async_inflight_window, fun async_inflight_window/1}, {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, @@ -88,12 +99,6 @@ enable_queue(default) -> false; enable_queue(desc) -> ?DESC("enable_queue"); enable_queue(_) -> undefined. -resume_interval(type) -> emqx_schema:duration_ms(); -resume_interval(desc) -> ?DESC("resume_interval"); -resume_interval(default) -> ?RESUME_INTERVAL_RAW; -resume_interval(required) -> false; -resume_interval(_) -> undefined. - async_inflight_window(type) -> pos_integer(); async_inflight_window(desc) -> ?DESC("async_inflight_window"); async_inflight_window(default) -> ?DEFAULT_INFLIGHT; @@ -117,3 +122,6 @@ queue_max_bytes(desc) -> ?DESC("queue_max_bytes"); queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; queue_max_bytes(required) -> false; queue_max_bytes(_) -> undefined. + +desc("creation_opts") -> + ?DESC("creation_opts"). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 61c0f4ac1..28f90fdb9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.1"}, + {vsn, "5.0.2"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index a0d1c464a..3729d9096 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -19,6 +19,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource_errors.hrl"). -export([ apply_rule/3, @@ -322,7 +323,7 @@ handle_action(RuleId, ActId, Selected, Envs) -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), try Result = do_handle_action(ActId, Selected, Envs), - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'), + inc_action_metrics(Result, RuleId), Result catch throw:out_of_service -> @@ -501,3 +502,18 @@ ensure_list(_NotList) -> []. nested_put(Alias, Val, Columns0) -> Columns = handle_alias(Alias, Columns0), emqx_rule_maps:nested_put(Alias, Val, Columns). + +-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). +inc_action_metrics(ok, RuleId) -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); +inc_action_metrics({ok, _}, RuleId) -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); +inc_action_metrics({resource_down, _}, RuleId) -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.out_of_service'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); +inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.out_of_service'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); +inc_action_metrics(_, RuleId) -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'). diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf index 48fbd1007..bb908628c 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf @@ -1,4 +1,23 @@ emqx_ee_bridge_mysql { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 MySQL。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + sql_template { desc { en: """SQL Template""" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 6a1b2677f..dce315721 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -91,7 +91,7 @@ fields(basic) -> {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {write_syntax, fun write_syntax/1} ] ++ - emqx_resource_schema:fields('creation_opts'); + emqx_resource_schema:fields("resource_opts"); fields("post_udp") -> method_fileds(post, influxdb_udp); fields("post_api_v1") -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 5d143bf85..c9b611a52 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -43,14 +43,15 @@ values(get) -> values(post) -> #{ type => mysql, - name => <<"mysql">>, + name => <<"foo">>, + local_topic => <<"local/topic/#">>, sql_template => ?DEFAULT_SQL, connector => #{ server => <<"127.0.0.1:3306">>, database => <<"test">>, pool_size => 8, username => <<"root">>, - password => <<"public">>, + password => <<"">>, auto_reconnect => true }, enable => true, @@ -61,7 +62,7 @@ values(put) -> %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -namespace() -> "bridge". +namespace() -> "bridge_mysql". roots() -> []. @@ -69,6 +70,7 @@ fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {sql_template, mk( binary(), @@ -82,7 +84,7 @@ fields("config") -> desc => ?DESC("desc_connector") } )} - ]; + ] ++ emqx_resource_schema:fields("resource_opts"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") ->