diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index c4afa4db2..49b333216 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -317,10 +317,13 @@ t_mqtt_conn_bridge_egress(_) -> RemoteTopic = <>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), - timer:sleep(100), - %% PUBLISH a message to the 'local' broker, as we have only one broker, - %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload)), + + ?wait_async_action( + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + #{?snk_kind := buffer_worker_flush_ack} + ), %% we should receive a message on the "remote" broker, with specified topic Msg = assert_mqtt_msg_received(RemoteTopic, Payload), @@ -356,10 +359,13 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> RemoteTopic = <>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), - timer:sleep(100), - %% PUBLISH a message to the 'local' broker, as we have only one broker, - %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload)), + + ?wait_async_action( + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + #{?snk_kind := buffer_worker_flush_ack} + ), %% we should receive a message on the "remote" broker, with specified topic Msg = assert_mqtt_msg_received(RemoteTopic), @@ -444,10 +450,13 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> } ] } = request_bridge_metrics(BridgeIDEgress), - timer:sleep(100), - %% PUBLISH a message to the 'local' broker, as we have only one broker, - %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload)), + + ?wait_async_action( + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + #{?snk_kind := buffer_worker_flush_ack} + ), %% we should receive a message on the "remote" broker, with specified topic assert_mqtt_msg_received(RemoteTopic, Payload), @@ -677,10 +686,13 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> RemoteTopic = <>, Payload0 = <<"hello">>, emqx:subscribe(RemoteTopic), - timer:sleep(100), - %% PUBLISH a message to the 'local' broker, as we have only one broker, - %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload0)), + + ?wait_async_action( + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload0)), + #{?snk_kind := buffer_worker_flush_ack} + ), %% we should receive a message on the "remote" broker, with specified topic assert_mqtt_msg_received(RemoteTopic, Payload0),