From de3a3259539148d031c8902bcbee41943f245cb8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 16 Aug 2022 09:05:22 +0800 Subject: [PATCH] fix: revert the changes in connector mysql --- .../src/emqx_connector_mysql.erl | 26 ++++++++----------- apps/emqx_resource/src/emqx_resource.erl | 2 +- .../src/emqx_resource_manager.erl | 5 ++++ .../src/emqx_rule_engine.app.src | 2 +- .../i18n/emqx_ee_bridge_mysql.conf | 6 ++--- 5 files changed, 20 insertions(+), 21 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index a9b1cf08d..cae07433a 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -110,14 +110,9 @@ on_start( {auto_reconnect, reconn_interval(AutoReconn)}, {pool_size, PoolSize} ], - SqlTmpl = emqx_map_lib:deep_get([egress, sql_template], Config, undefined), - SqlTmplParts = emqx_connector_utils:split_insert_sql(SqlTmpl), PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = parse_prepare_sql(Config), - State0 = #{ - poolname => PoolName, auto_reconnect => AutoReconn, sql_template_parts => SqlTmplParts - }, - State = maps:merge(State0, Prepares), + State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State)}; {error, Reason} -> {error, Reason} @@ -141,17 +136,18 @@ on_query( ) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), - {ok, Conn} = ecpool_worker:client(ecpool:get_client(PoolName)), - MySqlFun = mysql_function(TypeOrKey), - {SQLOrKey2, SqlParams} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), - Result = mysql:MySqlFun(Conn, SQLOrKey2, SqlParams, Timeout), + Worker = ecpool:get_client(PoolName), + {ok, Conn} = ecpool_worker:client(Worker), + MySqlFunction = mysql_function(TypeOrKey), + {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), + Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]), case Result of {error, disconnected} -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} ), - %% kill the ecpool worker to trigger reconnection + %% kill the poll worker to trigger reconnection _ = exit(Conn, restart), Result; {error, not_prepared} -> @@ -186,7 +182,7 @@ mysql_function(prepared_query) -> execute; %% for bridge mysql_function(_) -> - execute. + mysql_function(prepared_query). on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of @@ -332,10 +328,10 @@ proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; -proc_sql_params(PreparedKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> - case maps:get(PreparedKey, ParamsTokens, undefined) of +proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> + case maps:get(TypeOrKey, ParamsTokens, undefined) of undefined -> {SQLOrData, Params}; Tokens -> - {PreparedKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} end. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a8fe297bf..0295292dd 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -298,7 +298,7 @@ get_instance(ResId) -> -spec fetch_creation_opts(map()) -> creation_opts(). fetch_creation_opts(Opts) -> - maps:get(resource_opts, Opts). + maps:get(resource_opts, Opts, #{}). -spec list_instances() -> [resource_id()]. list_instances() -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 48353124c..07abd4007 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -447,6 +447,11 @@ start_resource(Data, From) -> Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), {next_state, connecting, UpdatedData, Actions}; {error, Reason} = Err -> + ?SLOG(error, #{ + msg => start_resource_failed, + id => Data#data.id, + reason => Reason + }), _ = maybe_alarm(disconnected, Data#data.id), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 61c0f4ac1..28f90fdb9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.1"}, + {vsn, "5.0.2"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt]}, diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf index 98716ff7e..bb908628c 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf @@ -2,15 +2,13 @@ emqx_ee_bridge_mysql { local_topic { desc { - en: """ -The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic + en: """The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic matching the local_topic will be forwarded.
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that match local_topic will be forwarded. """ - zh: """ -发送到 'local_topic' 的消息都会转发到 MySQL。
+ zh: """发送到 'local_topic' 的消息都会转发到 MySQL。
注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 """ }