refactor: remove the auto_reconnect field

This commit is contained in:
Shawn 2023-01-11 19:24:51 +08:00
parent bf259e360a
commit 3e9c4f444f
15 changed files with 38 additions and 67 deletions

View File

@ -15,7 +15,6 @@ authz:{
pool_size: 1
username: root
password: public
auto_reconnect: true
ssl: {
enable: true
cacertfile: "etc/certs/cacert.pem"
@ -33,7 +32,6 @@ authz:{
pool_size: 1
username: root
password: public
auto_reconnect: true
ssl: {enable: false}
}
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = ${peerhost} or username = ${username} or username = '$all' or clientid = ${clientid}"
@ -45,7 +43,6 @@ authz:{
database: 0
pool_size: 1
password: public
auto_reconnect: true
ssl: {enable: false}
}
cmd: "HGETALL mqtt_authz:${username}"

View File

@ -14,7 +14,7 @@ An MySQL connector can be used as following:
```
(emqx@127.0.0.1)5> emqx_resource:list_instances_verbose().
[#{config =>
#{auto_reconnect => true,cacertfile => [],certfile => [],
#{cacertfile => [],certfile => [],
database => "mqtt",keyfile => [],password => "public",
pool_size => 1,
server => {{127,0,0,1},3306},

View File

@ -68,12 +68,12 @@ emqx_connector_schema_lib {
auto_reconnect {
desc {
en: "Enable automatic reconnect to the database."
zh: "自动重连数据库。"
en: "Deprecated. Enable automatic reconnect to the database."
zh: "已弃用。自动重连数据库。"
}
label: {
en: "Auto Reconnect Database"
zh: "自动重连数据库"
en: "Deprecated. Auto Reconnect Database"
zh: "已弃用。自动重连数据库"
}
}

View File

@ -24,6 +24,8 @@
-define(REDIS_DEFAULT_PORT, 6379).
-define(PGSQL_DEFAULT_PORT, 5432).
-define(AUTO_RECONNECT_INTERVAL, 2).
-define(SERVERS_DESC,
"A Node list for Cluster to connect to. The nodes should be separated with commas, such as: `Node[,Node].`<br/>"
"For each Node should be: "

View File

@ -59,7 +59,6 @@ on_start(
bind_password := BindPassword,
timeout := Timeout,
pool_size := PoolSize,
auto_reconnect := AutoReconn,
ssl := SSL
} = Config
) ->
@ -86,11 +85,11 @@ on_start(
{bind_password, BindPassword},
{timeout, Timeout},
{pool_size, PoolSize},
{auto_reconnect, reconn_interval(AutoReconn)}
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}};
ok -> {ok, #{poolname => PoolName}};
{error, Reason} -> {error, Reason}
end.
@ -129,9 +128,6 @@ on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = S
on_get_status(_InstId, _State) -> connected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
search(Conn, Base, Filter, Attributes) ->
eldap2:search(Conn, [
{base, Base},

View File

@ -15,6 +15,8 @@
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
@ -198,12 +200,10 @@ on_query_async(
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true),
on_get_status(_InstId, #{name := InstanceId}) ->
case emqx_connector_mqtt_worker:status(InstanceId) of
connected -> connected;
_ when AutoReconn == true -> connecting;
_ when AutoReconn == false -> disconnected
_ -> connecting
end.
ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
@ -236,7 +236,6 @@ make_forward_confs(FrowardConf) ->
basic_config(
#{
server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMode,
clean_start := CleanStart,
@ -252,7 +251,7 @@ basic_config(
%% 30s
connect_timeout => 30,
auto_reconnect => true,
reconnect_interval => ReconnIntv,
reconnect_interval => ?AUTO_RECONNECT_INTERVAL,
proto_ver => ProtoVer,
%% Opening bridge_mode will form a non-standard mqtt connection message.
%% A load balancing server (such as haproxy) is often set up before the emqx broker server.

View File

@ -52,7 +52,6 @@
-type state() ::
#{
poolname := atom(),
auto_reconnect := boolean(),
prepare_statement := prepares(),
params_tokens := params_tokens(),
batch_inserts := sqls(),
@ -84,8 +83,6 @@ on_start(
server := Server,
database := DB,
username := User,
password := Password,
auto_reconnect := AutoReconn,
pool_size := PoolSize,
ssl := SSL
} = Config
@ -107,14 +104,14 @@ on_start(
{host, Host},
{port, Port},
{user, User},
{password, Password},
{password, maps:get(password, Config, <<>>)},
{database, DB},
{auto_reconnect, reconn_interval(AutoReconn)},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config),
State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares),
State = maps:merge(#{poolname => PoolName}, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State)};
@ -194,7 +191,7 @@ mysql_function(prepared_query) ->
mysql_function(_) ->
mysql_function(prepared_query).
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
@ -205,10 +202,10 @@ on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State
{connected, NState};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
conn_status(AutoReconn)
connecting
end;
false ->
conn_status(AutoReconn)
connecting
end.
do_get_status(Conn) ->
@ -227,11 +224,6 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P
end.
%% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
connect(Options) ->
mysql:start_link(Options).

View File

@ -56,7 +56,6 @@
-type state() ::
#{
poolname := atom(),
auto_reconnect := boolean(),
prepare_sql := prepares(),
params_tokens := params_tokens(),
prepare_statement := epgsql:statement()
@ -87,8 +86,6 @@ on_start(
server := Server,
database := DB,
username := User,
password := Password,
auto_reconnect := AutoReconn,
pool_size := PoolSize,
ssl := SSL
} = Config
@ -113,14 +110,14 @@ on_start(
{host, Host},
{port, Port},
{username, User},
{password, emqx_secret:wrap(Password)},
{password, emqx_secret:wrap(maps:get(password, Config, ""))},
{database, DB},
{auto_reconnect, reconn_interval(AutoReconn)},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config),
InitState = #{poolname => PoolName, auto_reconnect => AutoReconn, prepare_statement => #{}},
InitState = #{poolname => PoolName, prepare_statement => #{}},
State = maps:merge(InitState, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok ->
@ -247,7 +244,7 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
end,
Result.
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
@ -258,10 +255,10 @@ on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State
{connected, NState};
false ->
%% do not log error, it is logged in prepare_sql_to_conn
conn_status(AutoReconn)
connecting
end;
false ->
conn_status(AutoReconn)
connecting
end.
do_get_status(Conn) ->
@ -280,11 +277,6 @@ do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepare
end.
%% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
connect(Opts) ->
Host = proplists:get_value(host, Opts),

View File

@ -117,7 +117,6 @@ on_start(
#{
redis_type := Type,
pool_size := PoolSize,
auto_reconnect := AutoReconn,
ssl := SSL
} = Config
) ->
@ -142,7 +141,7 @@ on_start(
[
{pool_size, PoolSize},
{password, maps:get(password, Config, "")},
{auto_reconnect, reconn_interval(AutoReconn)}
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
] ++ Database ++ Servers,
Options =
case maps:get(enable, SSL) of
@ -155,7 +154,7 @@ on_start(
[{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
State = #{poolname => PoolName, type => Type, auto_reconnect => AutoReconn},
State = #{poolname => PoolName, type => Type},
case Type of
cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
@ -229,18 +228,18 @@ eredis_cluster_workers_exist_and_are_connected(Workers) ->
Workers
).
on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect := AutoReconn}) ->
on_get_status(_InstId, #{type := cluster, poolname := PoolName}) ->
case eredis_cluster:pool_exists(PoolName) of
true ->
Workers = extract_eredis_cluster_workers(PoolName),
Health = eredis_cluster_workers_exist_and_are_connected(Workers),
status_result(Health, AutoReconn);
status_result(Health);
false ->
disconnected
end;
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
on_get_status(_InstId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
status_result(Health, AutoReconn).
status_result(Health).
do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of
@ -248,12 +247,8 @@ do_get_status(Conn) ->
_ -> false
end.
status_result(_Status = true, _AutoReconn) -> connected;
status_result(_Status = false, _AutoReconn = true) -> connecting;
status_result(_Status = false, _AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
status_result(_Status = true) -> connected;
status_result(_Status = false) -> connecting.
do_cmd(PoolName, cluster, {cmd, Command}) ->
eredis_cluster:q(PoolName, Command);

View File

@ -106,4 +106,5 @@ password(_) -> undefined.
auto_reconnect(type) -> boolean();
auto_reconnect(desc) -> ?DESC("auto_reconnect");
auto_reconnect(default) -> true;
auto_reconnect(deprecated) -> {since, "v5.0.15"};
auto_reconnect(_) -> undefined.

View File

@ -51,7 +51,6 @@ values(post) ->
pool_size => 8,
username => <<"root">>,
password => <<"">>,
auto_reconnect => true,
sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>,
resource_opts => #{

View File

@ -53,7 +53,6 @@ values(post, Type) ->
pool_size => 8,
username => <<"root">>,
password => <<"public">>,
auto_reconnect => true,
sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>,
resource_opts => #{

View File

@ -79,7 +79,6 @@ values(common, RedisType, SpecificOpts) ->
local_topic => <<"local/topic/#">>,
pool_size => 8,
password => <<"secret">>,
auto_reconnect => true,
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>],
resource_opts => #{
batch_size => 1,

View File

@ -69,7 +69,7 @@ defmodule EMQXUmbrella.MixProject do
{:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true},
{:hocon, github: "emqx/hocon", tag: "0.35.0", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.1", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
# in conflict by ehttpc and emqtt

View File

@ -69,7 +69,7 @@
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
, {telemetry, "1.1.0"}