Merge pull request #8687 from lafirest/fix/bridge_mysql
fix(bridge): replace prepare_statement by sql_template
This commit is contained in:
commit
404e79b7d7
|
@ -111,7 +111,7 @@ on_start(
|
||||||
{pool_size, PoolSize}
|
{pool_size, PoolSize}
|
||||||
],
|
],
|
||||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||||
Prepares = parse_prepare_sql(maps:get(prepare_statement, Config, #{})),
|
Prepares = parse_prepare_sql(Config),
|
||||||
State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares),
|
State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares),
|
||||||
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
||||||
ok -> {ok, init_prepare(State)};
|
ok -> {ok, init_prepare(State)};
|
||||||
|
@ -303,7 +303,19 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
|
||||||
unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
|
unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
|
||||||
mysql:unprepare(Conn, PrepareSqlKey).
|
mysql:unprepare(Conn, PrepareSqlKey).
|
||||||
|
|
||||||
parse_prepare_sql(SQL) ->
|
parse_prepare_sql(Config) ->
|
||||||
|
SQL =
|
||||||
|
case maps:get(prepare_statement, Config, undefined) of
|
||||||
|
undefined ->
|
||||||
|
case emqx_map_lib:deep_get([egress, sql_template], Config, undefined) of
|
||||||
|
undefined ->
|
||||||
|
#{};
|
||||||
|
Template ->
|
||||||
|
#{send_message => Template}
|
||||||
|
end;
|
||||||
|
Any ->
|
||||||
|
Any
|
||||||
|
end,
|
||||||
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
||||||
|
|
||||||
parse_prepare_sql([{Key, H} | T], SQL, Tokens) ->
|
parse_prepare_sql([{Key, H} | T], SQL, Tokens) ->
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
emqx_ee_bridge_mysql {
|
emqx_ee_bridge_mysql {
|
||||||
sql {
|
sql_template {
|
||||||
desc {
|
desc {
|
||||||
en: """SQL Template"""
|
en: """SQL Template"""
|
||||||
zh: """SQL 模板"""
|
zh: """SQL 模板"""
|
||||||
|
|
|
@ -44,14 +44,14 @@ values(post) ->
|
||||||
#{
|
#{
|
||||||
type => mysql,
|
type => mysql,
|
||||||
name => <<"mysql">>,
|
name => <<"mysql">>,
|
||||||
|
sql_template => ?DEFAULT_SQL,
|
||||||
connector => #{
|
connector => #{
|
||||||
server => <<"127.0.0.1:3306">>,
|
server => <<"127.0.0.1:3306">>,
|
||||||
database => <<"test">>,
|
database => <<"test">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
auto_reconnect => true,
|
auto_reconnect => true
|
||||||
prepare_statement => #{send_message => ?DEFAULT_SQL}
|
|
||||||
},
|
},
|
||||||
enable => true,
|
enable => true,
|
||||||
direction => egress
|
direction => egress
|
||||||
|
@ -69,6 +69,11 @@ fields("config") ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
||||||
|
{sql_template,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||||
|
)},
|
||||||
{connector,
|
{connector,
|
||||||
mk(
|
mk(
|
||||||
ref(?MODULE, connector),
|
ref(?MODULE, connector),
|
||||||
|
@ -85,8 +90,7 @@ fields("put") ->
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post");
|
emqx_bridge_schema:metrics_status_fields() ++ fields("post");
|
||||||
fields(connector) ->
|
fields(connector) ->
|
||||||
(emqx_connector_mysql:fields(config) --
|
emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields().
|
||||||
emqx_connector_schema_lib:prepare_statement_fields()) ++ prepare_statement_fields().
|
|
||||||
|
|
||||||
desc("config") ->
|
desc("config") ->
|
||||||
?DESC("desc_config");
|
?DESC("desc_config");
|
||||||
|
@ -104,13 +108,3 @@ type_field() ->
|
||||||
|
|
||||||
name_field() ->
|
name_field() ->
|
||||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
||||||
|
|
||||||
prepare_statement_fields() ->
|
|
||||||
[
|
|
||||||
{prepare_statement,
|
|
||||||
mk(map(), #{
|
|
||||||
desc => ?DESC(emqx_connector_schema_lib, prepare_statement),
|
|
||||||
default => #{<<"send_message">> => ?DEFAULT_SQL},
|
|
||||||
example => #{<<"send_message">> => ?DEFAULT_SQL}
|
|
||||||
})}
|
|
||||||
].
|
|
||||||
|
|
Loading…
Reference in New Issue