diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 650c9d7fa..e6d1c55c4 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mqtt, [{description, "EMQ X Bridge to MQTT Broker"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,replayq,emqtt]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index b32d747f4..3b50949cd 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -1,18 +1,24 @@ %% -*-: erlang -*- -{VSN, +{"4.3.3", [ + {<<"4.3.[1-2]">>, [ + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", []}, + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}, + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ], [ - {"4.3.0", [ - {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} + {<<"4.3.[1-2]">>, [ + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} + ]}, + {"4.3.0", [ + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}, + {load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []} ]}, - {"4.3.1", []}, {<<".*">>, []} ] }. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 3f685a72a..21cda5b6d 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -410,21 +410,35 @@ start_resource(ResId, PoolName, Options) -> end. test_resource_status(PoolName) -> - IsConnected = fun(Worker) -> - case ecpool_worker:client(Worker) of - {ok, Bridge} -> - try emqx_bridge_worker:status(Bridge) of - connected -> true; - _ -> false - catch _Error:_Reason -> - false - end; - {error, _} -> - false - end - end, - Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - lists:any(fun(St) -> St =:= true end, Status). + Parent = self(), + Pids = [spawn(fun() -> Parent ! {self(), get_worker_status(Worker)} end) + || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + try + Status = [ + receive {Pid, R} -> R + after 1000 -> %% get_worker_status/1 should be a quick operation + throw({timeout, Pid}) + end || Pid <- Pids], + lists:any(fun(St) -> St =:= true end, Status) + catch + throw:Reason -> + ?LOG(error, "Get mqtt bridge status timeout: ~p", [Reason]), + lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids), + false + end. + +get_worker_status(Worker) -> + case ecpool_worker:client(Worker) of + {ok, Bridge} -> + try emqx_bridge_worker:status(Bridge) of + connected -> true; + _ -> false + catch _Error:_Reason -> + false + end; + {error, _} -> + false + end. -spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()). on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> @@ -433,13 +447,13 @@ on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> on_resource_destroy(ResId, #{<<"pool">> := PoolName}) -> ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), - case ecpool:stop_sup_pool(PoolName) of - ok -> - ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); - {error, Reason} -> - ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), - error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) - end. + case ecpool:stop_sup_pool(PoolName) of + ok -> + ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); + {error, Reason} -> + ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), + error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) + end. on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, <<"forward_topic">> := ForwardTopic, diff --git a/rebar.config b/rebar.config index 09e4d1e2b..82f029873 100644 --- a/rebar.config +++ b/rebar.config @@ -47,7 +47,7 @@ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} - , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}