diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 12a3a8e23..ef4de9bbf 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -226,22 +226,20 @@ t_mqtt_conn_bridge_ingress(_) -> <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_INGRESS }), - #{ <<"id">> := BridgeIDIngress , <<"type">> := <<"mqtt">> - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDIngress, 5), %% we now test if the bridge works as expected - RemoteTopic = <<"remote_topic/1">>, LocalTopic = <<"local_topic/", RemoteTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), + timer:sleep(100), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -295,22 +293,21 @@ t_mqtt_conn_bridge_egress(_) -> <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - #{ <<"id">> := BridgeIDEgress , <<"type">> := ?CONNECTR_TYPE , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 5), %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), + timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic @@ -369,10 +366,9 @@ t_mqtt_conn_update(_) -> #{ <<"id">> := BridgeIDEgress , <<"type">> := <<"mqtt">> , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), - wait_for_resource_ready(BridgeIDEgress, 2), + wait_for_resource_ready(BridgeIDEgress, 5), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' @@ -424,6 +420,7 @@ t_mqtt_conn_update2(_) -> {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + wait_for_resource_ready(BridgeIDEgress, 5), ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"status">> := <<"connected">> }, jsx:decode(BridgeStr)), @@ -454,7 +451,7 @@ t_mqtt_conn_update3(_) -> #{ <<"id">> := BridgeIDEgress , <<"connector">> := ConnctorID } = jsx:decode(Bridge), - wait_for_resource_ready(BridgeIDEgress, 2), + wait_for_resource_ready(BridgeIDEgress, 5), %% delete the connector should fail because it is in use by a bridge {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), @@ -505,6 +502,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> LocalTopic = <<"local_topic/", RemoteTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), + timer:sleep(100), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. wait_for_resource_ready(BridgeIDIngress, 5), @@ -570,6 +568,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), + timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. wait_for_resource_ready(BridgeIDEgress, 5), @@ -593,6 +592,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RuleTopic = <<"t/1">>, RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, emqx:subscribe(RemoteTopic2), + timer:sleep(100), wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(RuleTopic, Payload2)), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),