From 3e9c4f444f03f3ebc31ce67db556025fecbeef92 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 11 Jan 2023 19:24:51 +0800 Subject: [PATCH] refactor: remove the auto_reconnect field --- apps/emqx_authz/README.md | 3 --- apps/emqx_connector/README.md | 2 +- .../i18n/emqx_connector_schema_lib.conf | 10 ++++----- .../emqx_connector/include/emqx_connector.hrl | 2 ++ .../src/emqx_connector_ldap.erl | 8 ++----- .../src/emqx_connector_mqtt.erl | 11 +++++----- .../src/emqx_connector_mysql.erl | 20 ++++++------------ .../src/emqx_connector_pgsql.erl | 20 ++++++------------ .../src/emqx_connector_redis.erl | 21 +++++++------------ .../src/emqx_connector_schema_lib.erl | 1 + .../src/emqx_ee_bridge_mysql.erl | 1 - .../src/emqx_ee_bridge_pgsql.erl | 1 - .../src/emqx_ee_bridge_redis.erl | 1 - mix.exs | 2 +- rebar.config | 2 +- 15 files changed, 38 insertions(+), 67 deletions(-) diff --git a/apps/emqx_authz/README.md b/apps/emqx_authz/README.md index 8c05f21be..af543e478 100644 --- a/apps/emqx_authz/README.md +++ b/apps/emqx_authz/README.md @@ -15,7 +15,6 @@ authz:{ pool_size: 1 username: root password: public - auto_reconnect: true ssl: { enable: true cacertfile: "etc/certs/cacert.pem" @@ -33,7 +32,6 @@ authz:{ pool_size: 1 username: root password: public - auto_reconnect: true ssl: {enable: false} } sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = ${peerhost} or username = ${username} or username = '$all' or clientid = ${clientid}" @@ -45,7 +43,6 @@ authz:{ database: 0 pool_size: 1 password: public - auto_reconnect: true ssl: {enable: false} } cmd: "HGETALL mqtt_authz:${username}" diff --git a/apps/emqx_connector/README.md b/apps/emqx_connector/README.md index 7ef3a8c4a..6baba29de 100644 --- a/apps/emqx_connector/README.md +++ b/apps/emqx_connector/README.md @@ -14,7 +14,7 @@ An MySQL connector can be used as following: ``` (emqx@127.0.0.1)5> emqx_resource:list_instances_verbose(). [#{config => - #{auto_reconnect => true,cacertfile => [],certfile => [], + #{cacertfile => [],certfile => [], database => "mqtt",keyfile => [],password => "public", pool_size => 1, server => {{127,0,0,1},3306}, diff --git a/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf b/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf index f5caf29c4..8f25e0352 100644 --- a/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf +++ b/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf @@ -68,13 +68,13 @@ emqx_connector_schema_lib { auto_reconnect { desc { - en: "Enable automatic reconnect to the database." - zh: "自动重连数据库。" + en: "Deprecated. Enable automatic reconnect to the database." + zh: "已弃用。自动重连数据库。" } label: { - en: "Auto Reconnect Database" - zh: "自动重连数据库" - } + en: "Deprecated. Auto Reconnect Database" + zh: "已弃用。自动重连数据库" + } } } diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl index 96b6ba4d6..82c946cfc 100644 --- a/apps/emqx_connector/include/emqx_connector.hrl +++ b/apps/emqx_connector/include/emqx_connector.hrl @@ -24,6 +24,8 @@ -define(REDIS_DEFAULT_PORT, 6379). -define(PGSQL_DEFAULT_PORT, 5432). +-define(AUTO_RECONNECT_INTERVAL, 2). + -define(SERVERS_DESC, "A Node list for Cluster to connect to. The nodes should be separated with commas, such as: `Node[,Node].`
" "For each Node should be: " diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 1cb65034d..1930d9e68 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -59,7 +59,6 @@ on_start( bind_password := BindPassword, timeout := Timeout, pool_size := PoolSize, - auto_reconnect := AutoReconn, ssl := SSL } = Config ) -> @@ -86,11 +85,11 @@ on_start( {bind_password, BindPassword}, {timeout, Timeout}, {pool_size, PoolSize}, - {auto_reconnect, reconn_interval(AutoReconn)} + {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of - ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; + ok -> {ok, #{poolname => PoolName}}; {error, Reason} -> {error, Reason} end. @@ -129,9 +128,6 @@ on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = S on_get_status(_InstId, _State) -> connected. -reconn_interval(true) -> 15; -reconn_interval(false) -> false. - search(Conn, Base, Filter, Attributes) -> eldap2:search(Conn, [ {base, Base}, diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 522f15ccf..c28aa2514 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -15,6 +15,8 @@ %%-------------------------------------------------------------------- -module(emqx_connector_mqtt). +-include("emqx_connector.hrl"). + -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -198,12 +200,10 @@ on_query_async( ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). -on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> - AutoReconn = maps:get(auto_reconnect, Conf, true), +on_get_status(_InstId, #{name := InstanceId}) -> case emqx_connector_mqtt_worker:status(InstanceId) of connected -> connected; - _ when AutoReconn == true -> connecting; - _ when AutoReconn == false -> disconnected + _ -> connecting end. ensure_mqtt_worker_started(InstanceId, BridgeConf) -> @@ -236,7 +236,6 @@ make_forward_confs(FrowardConf) -> basic_config( #{ server := Server, - reconnect_interval := ReconnIntv, proto_ver := ProtoVer, bridge_mode := BridgeMode, clean_start := CleanStart, @@ -252,7 +251,7 @@ basic_config( %% 30s connect_timeout => 30, auto_reconnect => true, - reconnect_interval => ReconnIntv, + reconnect_interval => ?AUTO_RECONNECT_INTERVAL, proto_ver => ProtoVer, %% Opening bridge_mode will form a non-standard mqtt connection message. %% A load balancing server (such as haproxy) is often set up before the emqx broker server. diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 6c0ff7210..d119a01e9 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -52,7 +52,6 @@ -type state() :: #{ poolname := atom(), - auto_reconnect := boolean(), prepare_statement := prepares(), params_tokens := params_tokens(), batch_inserts := sqls(), @@ -84,8 +83,6 @@ on_start( server := Server, database := DB, username := User, - password := Password, - auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config @@ -107,14 +104,14 @@ on_start( {host, Host}, {port, Port}, {user, User}, - {password, Password}, + {password, maps:get(password, Config, <<>>)}, {database, DB}, - {auto_reconnect, reconn_interval(AutoReconn)}, + {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = parse_prepare_sql(Config), - State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares), + State = maps:merge(#{poolname => PoolName}, Prepares), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State)}; @@ -194,7 +191,7 @@ mysql_function(prepared_query) -> mysql_function(_) -> mysql_function(prepared_query). -on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> +on_get_status(_InstId, #{poolname := Pool} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of true -> case do_check_prepares(State) of @@ -205,10 +202,10 @@ on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State {connected, NState}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - conn_status(AutoReconn) + connecting end; false -> - conn_status(AutoReconn) + connecting end. do_get_status(Conn) -> @@ -227,11 +224,6 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P end. %% =================================================================== -conn_status(_AutoReconn = true) -> connecting; -conn_status(_AutoReconn = false) -> disconnected. - -reconn_interval(true) -> 15; -reconn_interval(false) -> false. connect(Options) -> mysql:start_link(Options). diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 4b565a614..25a22455d 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -56,7 +56,6 @@ -type state() :: #{ poolname := atom(), - auto_reconnect := boolean(), prepare_sql := prepares(), params_tokens := params_tokens(), prepare_statement := epgsql:statement() @@ -87,8 +86,6 @@ on_start( server := Server, database := DB, username := User, - password := Password, - auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config @@ -113,14 +110,14 @@ on_start( {host, Host}, {port, Port}, {username, User}, - {password, emqx_secret:wrap(Password)}, + {password, emqx_secret:wrap(maps:get(password, Config, ""))}, {database, DB}, - {auto_reconnect, reconn_interval(AutoReconn)}, + {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = parse_prepare_sql(Config), - InitState = #{poolname => PoolName, auto_reconnect => AutoReconn, prepare_statement => #{}}, + InitState = #{poolname => PoolName, prepare_statement => #{}}, State = maps:merge(InitState, Prepares), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of ok -> @@ -247,7 +244,7 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> end, Result. -on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> +on_get_status(_InstId, #{poolname := Pool} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of true -> case do_check_prepares(State) of @@ -258,10 +255,10 @@ on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State {connected, NState}; false -> %% do not log error, it is logged in prepare_sql_to_conn - conn_status(AutoReconn) + connecting end; false -> - conn_status(AutoReconn) + connecting end. do_get_status(Conn) -> @@ -280,11 +277,6 @@ do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepare end. %% =================================================================== -conn_status(_AutoReconn = true) -> connecting; -conn_status(_AutoReconn = false) -> disconnected. - -reconn_interval(true) -> 15; -reconn_interval(false) -> false. connect(Opts) -> Host = proplists:get_value(host, Opts), diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 726af2d9b..6143dbf0c 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -117,7 +117,6 @@ on_start( #{ redis_type := Type, pool_size := PoolSize, - auto_reconnect := AutoReconn, ssl := SSL } = Config ) -> @@ -142,7 +141,7 @@ on_start( [ {pool_size, PoolSize}, {password, maps:get(password, Config, "")}, - {auto_reconnect, reconn_interval(AutoReconn)} + {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} ] ++ Database ++ Servers, Options = case maps:get(enable, SSL) of @@ -155,7 +154,7 @@ on_start( [{ssl, false}] end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), - State = #{poolname => PoolName, type => Type, auto_reconnect => AutoReconn}, + State = #{poolname => PoolName, type => Type}, case Type of cluster -> case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of @@ -229,18 +228,18 @@ eredis_cluster_workers_exist_and_are_connected(Workers) -> Workers ). -on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect := AutoReconn}) -> +on_get_status(_InstId, #{type := cluster, poolname := PoolName}) -> case eredis_cluster:pool_exists(PoolName) of true -> Workers = extract_eredis_cluster_workers(PoolName), Health = eredis_cluster_workers_exist_and_are_connected(Workers), - status_result(Health, AutoReconn); + status_result(Health); false -> disconnected end; -on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) -> +on_get_status(_InstId, #{poolname := Pool}) -> Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), - status_result(Health, AutoReconn). + status_result(Health). do_get_status(Conn) -> case eredis:q(Conn, ["PING"]) of @@ -248,12 +247,8 @@ do_get_status(Conn) -> _ -> false end. -status_result(_Status = true, _AutoReconn) -> connected; -status_result(_Status = false, _AutoReconn = true) -> connecting; -status_result(_Status = false, _AutoReconn = false) -> disconnected. - -reconn_interval(true) -> 15; -reconn_interval(false) -> false. +status_result(_Status = true) -> connected; +status_result(_Status = false) -> connecting. do_cmd(PoolName, cluster, {cmd, Command}) -> eredis_cluster:q(PoolName, Command); diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 2364aeeaa..5d8f6941c 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -106,4 +106,5 @@ password(_) -> undefined. auto_reconnect(type) -> boolean(); auto_reconnect(desc) -> ?DESC("auto_reconnect"); auto_reconnect(default) -> true; +auto_reconnect(deprecated) -> {since, "v5.0.15"}; auto_reconnect(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index bf5d2e140..114459149 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -51,7 +51,6 @@ values(post) -> pool_size => 8, username => <<"root">>, password => <<"">>, - auto_reconnect => true, sql => ?DEFAULT_SQL, local_topic => <<"local/topic/#">>, resource_opts => #{ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index dc8697e37..be9fc9dc8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -53,7 +53,6 @@ values(post, Type) -> pool_size => 8, username => <<"root">>, password => <<"public">>, - auto_reconnect => true, sql => ?DEFAULT_SQL, local_topic => <<"local/topic/#">>, resource_opts => #{ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index a0c6ba834..5c273e050 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -79,7 +79,6 @@ values(common, RedisType, SpecificOpts) -> local_topic => <<"local/topic/#">>, pool_size => 8, password => <<"secret">>, - auto_reconnect => true, command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], resource_opts => #{ batch_size => 1, diff --git a/mix.exs b/mix.exs index 45bc820e0..832f6e7ae 100644 --- a/mix.exs +++ b/mix.exs @@ -69,7 +69,7 @@ defmodule EMQXUmbrella.MixProject do {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, {:hocon, github: "emqx/hocon", tag: "0.35.0", override: true}, - {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.1", override: true}, + {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, # in conflict by ehttpc and emqtt diff --git a/rebar.config b/rebar.config index a61bfe16a..f05624477 100644 --- a/rebar.config +++ b/rebar.config @@ -69,7 +69,7 @@ , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.0"}}} - , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}} + , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {telemetry, "1.1.0"}