fix: revert the changes in connector mysql

This commit is contained in:
Shawn 2022-08-16 09:05:22 +08:00
parent 2898966439
commit de3a325953
5 changed files with 20 additions and 21 deletions

View File

@ -110,14 +110,9 @@ on_start(
{auto_reconnect, reconn_interval(AutoReconn)}, {auto_reconnect, reconn_interval(AutoReconn)},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
SqlTmpl = emqx_map_lib:deep_get([egress, sql_template], Config, undefined),
SqlTmplParts = emqx_connector_utils:split_insert_sql(SqlTmpl),
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config), Prepares = parse_prepare_sql(Config),
State0 = #{ State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares),
poolname => PoolName, auto_reconnect => AutoReconn, sql_template_parts => SqlTmplParts
},
State = maps:merge(State0, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> {ok, init_prepare(State)}; ok -> {ok, init_prepare(State)};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
@ -141,17 +136,18 @@ on_query(
) -> ) ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta), ?TRACE("QUERY", "mysql_connector_received", LogMeta),
{ok, Conn} = ecpool_worker:client(ecpool:get_client(PoolName)), Worker = ecpool:get_client(PoolName),
MySqlFun = mysql_function(TypeOrKey), {ok, Conn} = ecpool_worker:client(Worker),
{SQLOrKey2, SqlParams} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), MySqlFunction = mysql_function(TypeOrKey),
Result = mysql:MySqlFun(Conn, SQLOrKey2, SqlParams, Timeout), {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]),
case Result of case Result of
{error, disconnected} -> {error, disconnected} ->
?SLOG( ?SLOG(
error, error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
), ),
%% kill the ecpool worker to trigger reconnection %% kill the poll worker to trigger reconnection
_ = exit(Conn, restart), _ = exit(Conn, restart),
Result; Result;
{error, not_prepared} -> {error, not_prepared} ->
@ -186,7 +182,7 @@ mysql_function(prepared_query) ->
execute; execute;
%% for bridge %% for bridge
mysql_function(_) -> mysql_function(_) ->
execute. mysql_function(prepared_query).
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
@ -332,10 +328,10 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params}; {SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params}; {SQLOrKey, Params};
proc_sql_params(PreparedKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
case maps:get(PreparedKey, ParamsTokens, undefined) of case maps:get(TypeOrKey, ParamsTokens, undefined) of
undefined -> undefined ->
{SQLOrData, Params}; {SQLOrData, Params};
Tokens -> Tokens ->
{PreparedKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
end. end.

View File

@ -298,7 +298,7 @@ get_instance(ResId) ->
-spec fetch_creation_opts(map()) -> creation_opts(). -spec fetch_creation_opts(map()) -> creation_opts().
fetch_creation_opts(Opts) -> fetch_creation_opts(Opts) ->
maps:get(resource_opts, Opts). maps:get(resource_opts, Opts, #{}).
-spec list_instances() -> [resource_id()]. -spec list_instances() -> [resource_id()].
list_instances() -> list_instances() ->

View File

@ -447,6 +447,11 @@ start_resource(Data, From) ->
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
{next_state, connecting, UpdatedData, Actions}; {next_state, connecting, UpdatedData, Actions};
{error, Reason} = Err -> {error, Reason} = Err ->
?SLOG(error, #{
msg => start_resource_failed,
id => Data#data.id,
reason => Reason
}),
_ = maybe_alarm(disconnected, Data#data.id), _ = maybe_alarm(disconnected, Data#data.id),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made. %% so that the Reason can be returned when the verification call is made.

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [ {application, emqx_rule_engine, [
{description, "EMQX Rule Engine"}, {description, "EMQX Rule Engine"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.1"}, {vsn, "5.0.2"},
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt]}, {applications, [kernel, stdlib, rulesql, getopt]},

View File

@ -2,15 +2,13 @@ emqx_ee_bridge_mysql {
local_topic { local_topic {
desc { desc {
en: """ en: """The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic
The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.</br> matching the local_topic will be forwarded.</br>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded. will be forwarded.
""" """
zh: """ zh: """发送到 'local_topic' 的消息都会转发到 MySQL。 </br>
发送到 'local_topic' 的消息都会转发到 MySQL。 </br>
注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
""" """
} }