diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 6b2dfa6e1..48733b5e9 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -77,8 +77,10 @@ on_start(InstId, #{servers := Servers0, {auto_reconnect, reconn_interval(AutoReconn)}, {servers, Servers}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), - _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), - {ok, #{poolname => PoolName}}. + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of + ok -> {ok, #{poolname => PoolName}}; + {error, Reason} -> {error, Reason} + end. on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{msg => "stopping_ldap_connector", diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 58ea888a9..8926587cb 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -130,8 +130,10 @@ on_start(InstId, Config = #{mongo_type := Type, {options, init_topology_options(maps:to_list(Topology), [])}, {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), - ok = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts), - {ok, #{poolname => PoolName, type => Type}}. + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of + ok -> {ok, #{poolname => PoolName, type => Type}}; + {error, Reason} -> {error, Reason} + end. on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{msg => "stopping_mongodb_connector", diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index e467e4b0c..548721a93 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -71,8 +71,10 @@ on_start(InstId, #{server := {Host, Port}, {auto_reconnect, reconn_interval(AutoReconn)}, {pool_size, PoolSize}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), - _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), - {ok, #{poolname => PoolName}}. + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of + ok -> {ok, #{poolname => PoolName}}; + {error, Reason} -> {error, Reason} + end. on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{msg => "stopping_mysql_connector", diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 90d1c207e..7dcaf7373 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -83,8 +83,10 @@ on_start(InstId, #{server := {Host, Port}, {pool_size, PoolSize}, {named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), - _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), - {ok, #{poolname => PoolName}}. + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of + ok -> {ok, #{poolname => PoolName}}; + {error, Reason} -> {error, Reason} + end. on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{msg => "stopping postgresql connector", diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 804adbf97..3907e24f9 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -117,14 +117,16 @@ on_start(InstId, #{redis_type := Type, case Type of cluster -> case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of - {ok, _} -> ok; - {ok, _, _} -> ok; - {error, Reason} -> error(connect_redis_cluster_failed, Reason) + {ok, _} -> {ok, #{poolname => PoolName, type => Type}}; + {ok, _, _} -> {ok, #{poolname => PoolName, type => Type}}; + {error, Reason} -> {error, Reason} end; _ -> - _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) - end, - {ok, #{poolname => PoolName, type => Type}}. + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) of + ok -> {ok, #{poolname => PoolName, type => Type}}; + {error, Reason} -> {error, Reason} + end + end. on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{msg => "stopping_redis_connector", diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index 1fd91f06f..d462e3db2 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -30,14 +30,15 @@ pool_name(ID) when is_binary(ID) -> start_pool(Name, Mod, Options) -> case ecpool:start_sup_pool(Name, Mod, Options) of {ok, _} -> - ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}); + ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}), + ok; {error, {already_started, _Pid}} -> stop_pool(Name), start_pool(Name, Mod, Options); {error, Reason} -> ?SLOG(error, #{msg => "start_ecpool_error", pool_name => Name, reason => Reason}), - error({start_pool_failed, Name, Reason}) + {error, {start_pool_failed, Name, Reason}} end. stop_pool(Name) ->