fix(emqx_connector): when start_pool fails, return error and reason

This commit is contained in:
EMQ-YangM 2022-01-26 16:05:18 +08:00
parent 2522a36b0c
commit e9f3fa7b6b
6 changed files with 27 additions and 16 deletions

View File

@ -77,8 +77,10 @@ on_start(InstId, #{servers := Servers0,
{auto_reconnect, reconn_interval(AutoReconn)}, {auto_reconnect, reconn_interval(AutoReconn)},
{servers, Servers}], {servers, Servers}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of
{ok, #{poolname => PoolName}}. ok -> {ok, #{poolname => PoolName}};
{error, Reason} -> {error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping_ldap_connector", ?SLOG(info, #{msg => "stopping_ldap_connector",

View File

@ -130,8 +130,10 @@ on_start(InstId, Config = #{mongo_type := Type,
{options, init_topology_options(maps:to_list(Topology), [])}, {options, init_topology_options(maps:to_list(Topology), [])},
{worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}], {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
ok = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of
{ok, #{poolname => PoolName, type => Type}}. ok -> {ok, #{poolname => PoolName, type => Type}};
{error, Reason} -> {error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping_mongodb_connector", ?SLOG(info, #{msg => "stopping_mongodb_connector",

View File

@ -71,8 +71,10 @@ on_start(InstId, #{server := {Host, Port},
{auto_reconnect, reconn_interval(AutoReconn)}, {auto_reconnect, reconn_interval(AutoReconn)},
{pool_size, PoolSize}], {pool_size, PoolSize}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
{ok, #{poolname => PoolName}}. ok -> {ok, #{poolname => PoolName}};
{error, Reason} -> {error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping_mysql_connector", ?SLOG(info, #{msg => "stopping_mysql_connector",

View File

@ -83,8 +83,10 @@ on_start(InstId, #{server := {Host, Port},
{pool_size, PoolSize}, {pool_size, PoolSize},
{named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}], {named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
{ok, #{poolname => PoolName}}. ok -> {ok, #{poolname => PoolName}};
{error, Reason} -> {error, Reason}
end.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping postgresql connector", ?SLOG(info, #{msg => "stopping postgresql connector",

View File

@ -117,14 +117,16 @@ on_start(InstId, #{redis_type := Type,
case Type of case Type of
cluster -> cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
{ok, _} -> ok; {ok, _} -> {ok, #{poolname => PoolName, type => Type}};
{ok, _, _} -> ok; {ok, _, _} -> {ok, #{poolname => PoolName, type => Type}};
{error, Reason} -> error(connect_redis_cluster_failed, Reason) {error, Reason} -> {error, Reason}
end; end;
_ -> _ ->
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) of
end, ok -> {ok, #{poolname => PoolName, type => Type}};
{ok, #{poolname => PoolName, type => Type}}. {error, Reason} -> {error, Reason}
end
end.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping_redis_connector", ?SLOG(info, #{msg => "stopping_redis_connector",

View File

@ -30,14 +30,15 @@ pool_name(ID) when is_binary(ID) ->
start_pool(Name, Mod, Options) -> start_pool(Name, Mod, Options) ->
case ecpool:start_sup_pool(Name, Mod, Options) of case ecpool:start_sup_pool(Name, Mod, Options) of
{ok, _} -> {ok, _} ->
?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}); ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}),
ok;
{error, {already_started, _Pid}} -> {error, {already_started, _Pid}} ->
stop_pool(Name), stop_pool(Name),
start_pool(Name, Mod, Options); start_pool(Name, Mod, Options);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "start_ecpool_error", pool_name => Name, ?SLOG(error, #{msg => "start_ecpool_error", pool_name => Name,
reason => Reason}), reason => Reason}),
error({start_pool_failed, Name, Reason}) {error, {start_pool_failed, Name, Reason}}
end. end.
stop_pool(Name) -> stop_pool(Name) ->