diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index d51c2edef..f5559f4c2 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -3,7 +3,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1]). +-import(hoconsc, [mk/2, enum/1, ref/2]). -export([roots/0, fields/1, namespace/0, desc/1]). @@ -23,7 +23,14 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:metrics_status_fields() ++ fields("post"); +fields("creation_opts") -> + lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_resource_schema:fields("creation_opts") + ). desc("config") -> ?DESC("desc_config"); @@ -117,13 +124,17 @@ request_config() -> ]. webhook_creation_opts() -> - Opts = emqx_resource_schema:fields(creation_opts), - lists:filter( - fun({K, _V}) -> - not lists:member(K, unsupported_opts()) - end, - Opts - ). + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ]. unsupported_opts() -> [ diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index cae07433a..a9b1cf08d 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -110,9 +110,14 @@ on_start( {auto_reconnect, reconn_interval(AutoReconn)}, {pool_size, PoolSize} ], + SqlTmpl = emqx_map_lib:deep_get([egress, sql_template], Config, undefined), + SqlTmplParts = emqx_connector_utils:split_insert_sql(SqlTmpl), PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = parse_prepare_sql(Config), - State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares), + State0 = #{ + poolname => PoolName, auto_reconnect => AutoReconn, sql_template_parts => SqlTmplParts + }, + State = maps:merge(State0, Prepares), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State)}; {error, Reason} -> {error, Reason} @@ -136,18 +141,17 @@ on_query( ) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), - Worker = ecpool:get_client(PoolName), - {ok, Conn} = ecpool_worker:client(Worker), - MySqlFunction = mysql_function(TypeOrKey), - {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), - Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]), + {ok, Conn} = ecpool_worker:client(ecpool:get_client(PoolName)), + MySqlFun = mysql_function(TypeOrKey), + {SQLOrKey2, SqlParams} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), + Result = mysql:MySqlFun(Conn, SQLOrKey2, SqlParams, Timeout), case Result of {error, disconnected} -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} ), - %% kill the poll worker to trigger reconnection + %% kill the ecpool worker to trigger reconnection _ = exit(Conn, restart), Result; {error, not_prepared} -> @@ -182,7 +186,7 @@ mysql_function(prepared_query) -> execute; %% for bridge mysql_function(_) -> - mysql_function(prepared_query). + execute. on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of @@ -328,10 +332,10 @@ proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; -proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> - case maps:get(TypeOrKey, ParamsTokens, undefined) of +proc_sql_params(PreparedKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> + case maps:get(PreparedKey, ParamsTokens, undefined) of undefined -> {SQLOrData, Params}; Tokens -> - {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {PreparedKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} end. diff --git a/apps/emqx_connector/src/emqx_connector_utils.erl b/apps/emqx_connector/src/emqx_connector_utils.erl new file mode 100644 index 000000000..94b12921d --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_utils.erl @@ -0,0 +1,19 @@ +-module(emqx_connector_utils). + +-export([split_insert_sql/1]). + +%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> +split_insert_sql(SQL) -> + case re:split(SQL, "((?i)values)", [{return, binary}]) of + [Part1, _, Part3] -> + case string:trim(Part1, leading) of + <<"insert", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + <<"INSERT", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + _ -> + {error, not_insert_sql} + end; + _ -> + {error, not_insert_sql} + end. diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 3ec170ebf..5ff138b0b 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -1,4 +1,14 @@ emqx_resource_schema { + resource_opts { + desc { + en: """Resource options.""" + zh: """资源相关的选项。""" + } + label { + en: """Resource Options""" + zh: """资源选项""" + } + } health_check_interval { desc { @@ -86,17 +96,6 @@ The auto restart interval after the resource is disconnected, in milliseconds. } } - resume_interval { - desc { - en: """Resume time interval when resource down.""" - zh: """资源不可用时的重试时间""" - } - label { - en: """resume_interval""" - zh: """恢复时间""" - } - } - async_inflight_window { desc { en: """Async query inflight window.""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 190c278ae..a21c3583b 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -66,7 +66,6 @@ enable_queue => boolean(), queue_max_bytes => integer(), query_mode => async | sync | dynamic, - resume_interval => integer(), async_inflight_window => integer() }. -type query_result() :: @@ -81,7 +80,6 @@ -define(DEFAULT_INFLIGHT, 100). -define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). --define(RESUME_INTERVAL, 15000). -define(START_AFTER_CREATED, true). -define(START_TIMEOUT, 5000). -define(START_TIMEOUT_RAW, <<"5s">>). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 60f0dd360..e3bcc423d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -280,21 +280,7 @@ get_instance(ResId) -> -spec fetch_creation_opts(map()) -> creation_opts(). fetch_creation_opts(Opts) -> - SupportedOpts = [ - health_check_interval, - start_timeout, - start_after_created, - auto_restart_interval, - enable_batch, - batch_size, - batch_time, - enable_queue, - queue_max_bytes, - query_mode, - resume_interval, - async_inflight_window - ], - maps:with(SupportedOpts, Opts). + maps:get(resource_opts, Opts). -spec list_instances() -> [resource_id()]. list_instances() -> diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index f27f19bdf..457f8c91a 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -59,11 +59,6 @@ -define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). --define(RESOURCE_ERROR(Reason, Msg), - {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} -). --define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). - -type id() :: binary(). -type query() :: {query, from(), request()}. -type request() :: term(). @@ -140,7 +135,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, - resume_interval => maps:get(resume_interval, Opts, ?RESUME_INTERVAL), + resume_interval => maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), acc => [], acc_left => BatchSize, tref => undefined diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index ccc31a707..446ee8ad1 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -30,14 +30,25 @@ namespace() -> "resource_schema". roots() -> []. -fields('creation_opts') -> +fields("resource_opts") -> + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(<<"resource_opts">>) + } + )} + ]; +fields("creation_opts") -> [ {health_check_interval, fun health_check_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, - {resume_interval, fun resume_interval/1}, {async_inflight_window, fun async_inflight_window/1}, {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, @@ -88,12 +99,6 @@ enable_queue(default) -> false; enable_queue(desc) -> ?DESC("enable_queue"); enable_queue(_) -> undefined. -resume_interval(type) -> emqx_schema:duration_ms(); -resume_interval(desc) -> ?DESC("resume_interval"); -resume_interval(default) -> ?RESUME_INTERVAL; -resume_interval(required) -> false; -resume_interval(_) -> undefined. - async_inflight_window(type) -> pos_integer(); async_inflight_window(desc) -> ?DESC("async_inflight_window"); async_inflight_window(default) -> ?DEFAULT_INFLIGHT; diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf index 48fbd1007..98716ff7e 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf @@ -1,4 +1,25 @@ emqx_ee_bridge_mysql { + + local_topic { + desc { + en: """ +The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """ +发送到 'local_topic' 的消息都会转发到 MySQL。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + sql_template { desc { en: """SQL Template""" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 6a1b2677f..dce315721 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -91,7 +91,7 @@ fields(basic) -> {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {write_syntax, fun write_syntax/1} ] ++ - emqx_resource_schema:fields('creation_opts'); + emqx_resource_schema:fields("resource_opts"); fields("post_udp") -> method_fileds(post, influxdb_udp); fields("post_api_v1") -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 5d143bf85..c9b611a52 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -43,14 +43,15 @@ values(get) -> values(post) -> #{ type => mysql, - name => <<"mysql">>, + name => <<"foo">>, + local_topic => <<"local/topic/#">>, sql_template => ?DEFAULT_SQL, connector => #{ server => <<"127.0.0.1:3306">>, database => <<"test">>, pool_size => 8, username => <<"root">>, - password => <<"public">>, + password => <<"">>, auto_reconnect => true }, enable => true, @@ -61,7 +62,7 @@ values(put) -> %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -namespace() -> "bridge". +namespace() -> "bridge_mysql". roots() -> []. @@ -69,6 +70,7 @@ fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {sql_template, mk( binary(), @@ -82,7 +84,7 @@ fields("config") -> desc => ?DESC("desc_connector") } )} - ]; + ] ++ emqx_resource_schema:fields("resource_opts"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") ->