diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 574851f70..438b1f601 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -410,45 +410,37 @@ t_connect_with_more_clients_than_the_broker_accepts(_) -> NewConf = OrgConf#{<<"max_connections">> => 3}, {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), BridgeName = atom_to_binary(?FUNCTION_NAME), - BridgeID = create_bridge( - ?SERVER_CONF#{ - <<"name">> => BridgeName, - <<"ingress">> => #{ - <<"pool_size">> => PoolSize, - <<"remote">> => #{ - <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, - <<"qos">> => 1 - }, - <<"local">> => #{ - <<"topic">> => <>, - <<"qos">> => <<"${qos}">>, - <<"payload">> => <<"${clientid}">>, - <<"retain">> => <<"${retain}">> + ?check_trace( + #{timetrap => 10_000}, + begin + BridgeID = create_bridge( + ?SERVER_CONF#{ + <<"name">> => BridgeName, + <<"ingress">> => #{ + <<"pool_size">> => PoolSize, + <<"remote">> => #{ + <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, + <<"qos">> => 1 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"payload">> => <<"${clientid}">>, + <<"retain">> => <<"${retain}">> + } + } } - } - } - ), - snabbkaffe:block_until( - fun - (#{msg := emqx_bridge_mqtt_connector_tcp_closed}) -> - true; - (_) -> - false + ), + ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + ok end, - 5000 - ), - Trace = snabbkaffe:collect_trace(), - ?assert( - lists:any( - fun(K) -> - maps:get(msg, K, not_found) =:= - emqx_bridge_mqtt_connector_tcp_closed - end, - Trace - ) + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)), + ok + end ), - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), ok. t_mqtt_egress_bridge_warns_clean_start(_) ->