diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index a75f4db39..255247011 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -162,7 +162,7 @@ on_query( #{egress_pool_name := PoolName, egress_config := Config} ) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), - handle_send_result(with_worker(PoolName, send, [Msg, Config])); + handle_send_result(with_egress_client(PoolName, send, [Msg, Config])); on_query(ResourceId, {send_message, Msg}, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", @@ -179,7 +179,7 @@ on_query_async( ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), Callback = {fun on_async_result/2, [CallbackIn]}, - Result = with_worker(PoolName, send_async, [Msg, Callback, Config]), + Result = with_egress_client(PoolName, send_async, [Msg, Callback, Config]), case Result of ok -> ok; @@ -196,16 +196,8 @@ on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) -> reason => "Egress is not configured" }). -with_worker(ResourceId, Fun, Args) -> - Worker = ecpool:get_client(ResourceId), - case is_pid(Worker) andalso ecpool_worker:client(Worker) of - {ok, Client} -> - erlang:apply(emqx_connector_mqtt_egress, Fun, [Client | Args]); - {error, Reason} -> - {error, Reason}; - false -> - {error, disconnected} - end. +with_egress_client(ResourceId, Fun, Args) -> + ecpool:pick_and_do(ResourceId, {emqx_connector_mqtt_egress, Fun, Args}, no_handover). on_async_result(Callback, Result) -> apply_callback_function(Callback, handle_send_result(Result)). @@ -233,6 +225,8 @@ classify_reply(Reply = #{reason_code := _}) -> classify_error(disconnected = Reason) -> {recoverable_error, Reason}; +classify_error(ecpool_empty) -> + {recoverable_error, disconnected}; classify_error({disconnected, _RC, _} = Reason) -> {recoverable_error, Reason}; classify_error({shutdown, _} = Reason) -> diff --git a/mix.exs b/mix.exs index 2e2882e15..3fa3cc2bc 100644 --- a/mix.exs +++ b/mix.exs @@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true}, - {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true}, + {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer diff --git a/rebar.config b/rebar.config index 81545a7be..6a44b8074 100644 --- a/rebar.config +++ b/rebar.config @@ -66,7 +66,7 @@ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}} - , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}}