fix(bridge): mqtt bridge worker status idle
This commit is contained in:
parent
bffff65df5
commit
768ab4eacd
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_mqtt,
|
{application, emqx_bridge_mqtt,
|
||||||
[{description, "EMQ X Bridge to MQTT Broker"},
|
[{description, "EMQ X Bridge to MQTT Broker"},
|
||||||
{vsn, "4.3.5"}, % strict semver, bump manually!
|
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,replayq,emqtt]},
|
{applications, [kernel,stdlib,replayq,emqtt]},
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.4",
|
[{<<"4\\.3\\.[4-5]">>,
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||||
|
@ -14,7 +14,7 @@
|
||||||
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.4",
|
[{<<"4\\.3\\.[4-5]">>,
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -433,7 +433,7 @@ test_resource_status(PoolName) ->
|
||||||
try
|
try
|
||||||
Status = [
|
Status = [
|
||||||
receive {Pid, R} -> R
|
receive {Pid, R} -> R
|
||||||
after 1000 -> %% get_worker_status/1 should be a quick operation
|
after 10000 -> %% get_worker_status/1 should be a quick operation
|
||||||
throw({timeout, Pid})
|
throw({timeout, Pid})
|
||||||
end || Pid <- Pids],
|
end || Pid <- Pids],
|
||||||
lists:any(fun(St) -> St =:= true end, Status)
|
lists:any(fun(St) -> St =:= true end, Status)
|
||||||
|
@ -444,13 +444,28 @@ test_resource_status(PoolName) ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-define(RETRY_TIMES, 4).
|
||||||
|
|
||||||
get_worker_status(Worker) ->
|
get_worker_status(Worker) ->
|
||||||
|
get_worker_status(Worker, ?RETRY_TIMES).
|
||||||
|
|
||||||
|
get_worker_status(_Worker, 0) ->
|
||||||
|
false;
|
||||||
|
get_worker_status(Worker, Times) ->
|
||||||
case ecpool_worker:client(Worker) of
|
case ecpool_worker:client(Worker) of
|
||||||
{ok, Bridge} ->
|
{ok, Bridge} ->
|
||||||
try emqx_bridge_worker:status(Bridge) of
|
try emqx_bridge_worker:status(Bridge) of
|
||||||
connected -> true;
|
connected ->
|
||||||
_ -> false
|
true;
|
||||||
catch _Error:_Reason ->
|
idle ->
|
||||||
|
?LOG(info, "MQTT Bridge get status idle. Should not ignore this."),
|
||||||
|
timer:sleep(100),
|
||||||
|
get_worker_status(Worker, Times - 1);
|
||||||
|
ErrorStatus ->
|
||||||
|
?LOG(error, "MQTT Bridge get status ~p", [ErrorStatus]),
|
||||||
|
false
|
||||||
|
catch Error:Reason:ST ->
|
||||||
|
?LOG(error, "MQTT Bridge get status error: ~p reason: ~p stacktrace: ~p", [Error, Reason, ST]),
|
||||||
false
|
false
|
||||||
end;
|
end;
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
|
|
Loading…
Reference in New Issue