From a54668e83bc624d58e96fcb65a39f3ca77b09678 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 24 Nov 2021 17:11:04 +0800 Subject: [PATCH] fix(mqtt_bridge): the mqtt bridge hangs with an unreachable IP --- .../src/emqx_bridge_mqtt_actions.erl | 58 ++++++++++++------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 3f685a72a..21cda5b6d 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -410,21 +410,35 @@ start_resource(ResId, PoolName, Options) -> end. test_resource_status(PoolName) -> - IsConnected = fun(Worker) -> - case ecpool_worker:client(Worker) of - {ok, Bridge} -> - try emqx_bridge_worker:status(Bridge) of - connected -> true; - _ -> false - catch _Error:_Reason -> - false - end; - {error, _} -> - false - end - end, - Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - lists:any(fun(St) -> St =:= true end, Status). + Parent = self(), + Pids = [spawn(fun() -> Parent ! {self(), get_worker_status(Worker)} end) + || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + try + Status = [ + receive {Pid, R} -> R + after 1000 -> %% get_worker_status/1 should be a quick operation + throw({timeout, Pid}) + end || Pid <- Pids], + lists:any(fun(St) -> St =:= true end, Status) + catch + throw:Reason -> + ?LOG(error, "Get mqtt bridge status timeout: ~p", [Reason]), + lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids), + false + end. + +get_worker_status(Worker) -> + case ecpool_worker:client(Worker) of + {ok, Bridge} -> + try emqx_bridge_worker:status(Bridge) of + connected -> true; + _ -> false + catch _Error:_Reason -> + false + end; + {error, _} -> + false + end. -spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()). on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> @@ -433,13 +447,13 @@ on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> on_resource_destroy(ResId, #{<<"pool">> := PoolName}) -> ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), - case ecpool:stop_sup_pool(PoolName) of - ok -> - ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); - {error, Reason} -> - ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), - error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) - end. + case ecpool:stop_sup_pool(PoolName) of + ok -> + ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); + {error, Reason} -> + ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), + error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) + end. on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, <<"forward_topic">> := ForwardTopic,