Merge pull request #6286 from terry-xiaoyu/mqtt_bridge_hangs
MQTT bridge hangs with an unreachable IP
This commit is contained in:
commit
0d1b194906
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_mqtt,
|
{application, emqx_bridge_mqtt,
|
||||||
[{description, "EMQ X Bridge to MQTT Broker"},
|
[{description, "EMQ X Bridge to MQTT Broker"},
|
||||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,replayq,emqtt]},
|
{applications, [kernel,stdlib,replayq,emqtt]},
|
||||||
|
|
|
@ -1,18 +1,24 @@
|
||||||
%% -*-: erlang -*-
|
%% -*-: erlang -*-
|
||||||
|
|
||||||
{VSN,
|
{"4.3.3",
|
||||||
[
|
[
|
||||||
|
{<<"4.3.[1-2]">>, [
|
||||||
|
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{"4.3.0", [
|
{"4.3.0", [
|
||||||
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}
|
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []},
|
||||||
]},
|
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
||||||
{"4.3.1", []},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.3.0", [
|
{<<"4.3.[1-2]">>, [
|
||||||
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}
|
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{"4.3.0", [
|
||||||
|
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.3.1", []},
|
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -410,21 +410,35 @@ start_resource(ResId, PoolName, Options) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
test_resource_status(PoolName) ->
|
test_resource_status(PoolName) ->
|
||||||
IsConnected = fun(Worker) ->
|
Parent = self(),
|
||||||
case ecpool_worker:client(Worker) of
|
Pids = [spawn(fun() -> Parent ! {self(), get_worker_status(Worker)} end)
|
||||||
{ok, Bridge} ->
|
|| {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
||||||
try emqx_bridge_worker:status(Bridge) of
|
try
|
||||||
connected -> true;
|
Status = [
|
||||||
_ -> false
|
receive {Pid, R} -> R
|
||||||
catch _Error:_Reason ->
|
after 1000 -> %% get_worker_status/1 should be a quick operation
|
||||||
false
|
throw({timeout, Pid})
|
||||||
end;
|
end || Pid <- Pids],
|
||||||
{error, _} ->
|
lists:any(fun(St) -> St =:= true end, Status)
|
||||||
false
|
catch
|
||||||
end
|
throw:Reason ->
|
||||||
end,
|
?LOG(error, "Get mqtt bridge status timeout: ~p", [Reason]),
|
||||||
Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
lists:foreach(fun(Pid) -> exit(Pid, kill) end, Pids),
|
||||||
lists:any(fun(St) -> St =:= true end, Status).
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_worker_status(Worker) ->
|
||||||
|
case ecpool_worker:client(Worker) of
|
||||||
|
{ok, Bridge} ->
|
||||||
|
try emqx_bridge_worker:status(Bridge) of
|
||||||
|
connected -> true;
|
||||||
|
_ -> false
|
||||||
|
catch _Error:_Reason ->
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
{error, _} ->
|
||||||
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()).
|
-spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()).
|
||||||
on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) ->
|
on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) ->
|
||||||
|
@ -433,13 +447,13 @@ on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) ->
|
||||||
|
|
||||||
on_resource_destroy(ResId, #{<<"pool">> := PoolName}) ->
|
on_resource_destroy(ResId, #{<<"pool">> := PoolName}) ->
|
||||||
?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]),
|
?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]),
|
||||||
case ecpool:stop_sup_pool(PoolName) of
|
case ecpool:stop_sup_pool(PoolName) of
|
||||||
ok ->
|
ok ->
|
||||||
?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]);
|
?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
|
?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
|
||||||
error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed})
|
error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
|
on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
|
||||||
<<"forward_topic">> := ForwardTopic,
|
<<"forward_topic">> := ForwardTopic,
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
||||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
|
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
|
||||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
|
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
||||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
|
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
||||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}
|
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}
|
||||||
|
|
Loading…
Reference in New Issue