Merge pull request #9944 from keynslug/fix/mqtt-bridge-test-flap

test(mqtt-bridge): manage test flaps by waiting for flush events
This commit is contained in:
Andrew Mayorov 2023-02-09 17:01:01 +04:00 committed by GitHub
commit 373947a4b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 16 deletions

View File

@ -317,10 +317,13 @@ t_mqtt_conn_bridge_egress(_) ->
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
?wait_async_action(
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
#{?snk_kind := buffer_worker_flush_ack}
),
%% we should receive a message on the "remote" broker, with specified topic
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
@ -356,10 +359,13 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
?wait_async_action(
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
#{?snk_kind := buffer_worker_flush_ack}
),
%% we should receive a message on the "remote" broker, with specified topic
Msg = assert_mqtt_msg_received(RemoteTopic),
@ -444,10 +450,13 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
}
]
} = request_bridge_metrics(BridgeIDEgress),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
?wait_async_action(
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
#{?snk_kind := buffer_worker_flush_ack}
),
%% we should receive a message on the "remote" broker, with specified topic
assert_mqtt_msg_received(RemoteTopic, Payload),
@ -677,10 +686,13 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
Payload0 = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload0)),
?wait_async_action(
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload0)),
#{?snk_kind := buffer_worker_flush_ack}
),
%% we should receive a message on the "remote" broker, with specified topic
assert_mqtt_msg_received(RemoteTopic, Payload0),