From a54668e83bc624d58e96fcb65a39f3ca77b09678 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 24 Nov 2021 17:11:04 +0800 Subject: [PATCH 1/3] fix(mqtt_bridge): the mqtt bridge hangs with an unreachable IP --- .../src/emqx_bridge_mqtt_actions.erl | 58 ++++++++++++------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 3f685a72a..21cda5b6d 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -410,21 +410,35 @@ start_resource(ResId, PoolName, Options) -> end. test_resource_status(PoolName) -> - IsConnected = fun(Worker) -> - case ecpool_worker:client(Worker) of - {ok, Bridge} -> - try emqx_bridge_worker:status(Bridge) of - connected -> true; - _ -> false - catch _Error:_Reason -> - false - end; - {error, _} -> - false - end - end, - Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - lists:any(fun(St) -> St =:= true end, Status). + Parent = self(), + Pids = [spawn(fun() -> Parent ! {self(), get_worker_status(Worker)} end) + || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + try + Status = [ + receive {Pid, R} -> R + after 1000 -> %% get_worker_status/1 should be a quick operation + throw({timeout, Pid}) + end || Pid <- Pids], + lists:any(fun(St) -> St =:= true end, Status) + catch + throw:Reason -> + ?LOG(error, "Get mqtt bridge status timeout: ~p", [Reason]), + lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids), + false + end. + +get_worker_status(Worker) -> + case ecpool_worker:client(Worker) of + {ok, Bridge} -> + try emqx_bridge_worker:status(Bridge) of + connected -> true; + _ -> false + catch _Error:_Reason -> + false + end; + {error, _} -> + false + end. -spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()). on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> @@ -433,13 +447,13 @@ on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> on_resource_destroy(ResId, #{<<"pool">> := PoolName}) -> ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), - case ecpool:stop_sup_pool(PoolName) of - ok -> - ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); - {error, Reason} -> - ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), - error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) - end. + case ecpool:stop_sup_pool(PoolName) of + ok -> + ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); + {error, Reason} -> + ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), + error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) + end. on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, <<"forward_topic">> := ForwardTopic, From 3b9bb1d66c960670391247ab3db67920996b7f4e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 24 Nov 2021 17:52:11 +0800 Subject: [PATCH 2/3] fix(ecpool): update ecpool to 0.5.2 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 09e4d1e2b..82f029873 100644 --- a/rebar.config +++ b/rebar.config @@ -47,7 +47,7 @@ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} - , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}} From f5ac6fb7142a4277d068abd7cba59438bebbf03a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 25 Nov 2021 09:46:06 +0800 Subject: [PATCH 3/3] chore(appup): bump emqx_bridge_mqtt to 4.3.3 --- .../src/emqx_bridge_mqtt.app.src | 2 +- .../src/emqx_bridge_mqtt.appup.src | 20 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 650c9d7fa..e6d1c55c4 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mqtt, [{description, "EMQ X Bridge to MQTT Broker"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,replayq,emqtt]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index b32d747f4..3b50949cd 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -1,18 +1,24 @@ %% -*-: erlang -*- -{VSN, +{"4.3.3", [ + {<<"4.3.[1-2]">>, [ + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", []}, + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}, + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ], [ - {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} + {<<"4.3.[1-2]">>, [ + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, + {"4.3.0", [ + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}, + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} ]}, - {"4.3.1", []}, {<<".*">>, []} ] }.