Compare commits

..

2 Commits

Author SHA1 Message Date
jixingyue 1ced80e461 main.py 2025-09-29 13:51:52 +08:00
jixingyue 744a417618 test file 2025-09-29 13:45:44 +08:00
1 changed files with 502 additions and 0 deletions

502
main.py Normal file
View File

@ -0,0 +1,502 @@
import os
import math
import serial.tools.list_ports
import serial
import minimalmodbus
import time
import numpy as np
import cv2
class monocular_camera:
def __init__(self, camera_id=0):
self.cap = cv2.VideoCapture(camera_id, cv2.CAP_V4L2)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # 设置分辨率宽
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 设置分辨率高
self.cap.set(cv2.CAP_PROP_FPS, 30) # 设置帧率
self.num = 0
if not self.cap.isOpened():
print("Cannot open camera")
exit()
def grab_imgs(self):
ret, frame = self.cap.read()
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
return None
# 存储图片
# savePath = os.path.join("./path", "Camera_{:0>3d}.png".format(self.num))
# cv2.imwrite(savePath, frame)
# self.num += 1
# 显示帧
# cv2.imshow("frame", frame)
# cv2.waitKey(1000)
return frame
def capture_point(calib, region_idx):
print("拍照")
picture = cam3.grab_imgs()
# weed
img_weed = cv2.GaussianBlur(picture, (5, 5), 0)
hsv = cv2.cvtColor(img_weed, cv2.COLOR_BGR2HSV)
low_hsv = np.array([33, 43, 46])
high_hsv = np.array([99, 255, 255])
mask_green = cv2.inRange(hsv, lowerb=low_hsv, upperb=high_hsv)
contours, _ = cv2.findContours(mask_green.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours_l = []
weed_list = []
center_xy_list = []
for i in range(len(contours)):
area = cv2.contourArea(contours[i])
if area > 10:
contours_l.append(contours[i])
cnt = contours[i]
x_weed, y_weed, w_weed, h_weed = cv2.boundingRect(cnt)
center_x, center_y = (x_weed + w_weed / 2), (y_weed + h_weed / 2)
weed_list.append([int(center_x), int(center_y)])
p_laser_x = (calib[0][0] * center_x) + (calib[0][1] * center_y) + calib[0][2]
p_laser_y = (calib[1][0] * center_x) + (calib[1][1] * center_y) + calib[1][2]
encoder_position_x = int(p_laser_x) * 91
encoder_position_y = int(p_laser_y) * 91
if region_idx == 1:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
if region_idx == 2:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
if region_idx == 3:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
if region_idx == 4:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
else:
continue
center_xy_list = sorted(center_xy_list, key=lambda x: x[1], reverse=True) # 按y轴从大到小排序
print("center_xy_list", center_xy_list)
return center_xy_list
def capture_point_data(idx):
camera_list = [Red_Control_board_1.A_Dev.camera, Red_Control_board_2.A_Dev.camera, Red_Control_board_3.A_Dev.camera, Red_Control_board_4.A_Dev.camera]
calib = camera_list[idx-1].Calibration_Parameter
region_idx = camera_list[idx-1].Camera_Region
camera_list[idx-1].coordinate_all = capture_point(calib, region_idx)
if len(camera_list[0].coordinate_all) > 0:
camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]]
camera_list[0].coordinate_all.append([0, 0])
camera_list[idx-1].coordinate_all_empty = False
# camera_list = [Red_Control_board_3.A_Dev.camera]
# calib = camera_list[0].Calibration_Parameter
# region_idx = camera_list[0].Camera_Region
# camera_list[0].coordinate_all = capture_point(calib, region_idx)
# if len(camera_list[0].coordinate_all) > 0:
# camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]]
# camera_list[0].coordinate_all.append([0, 0])
# camera_list[0].coordinate_all_empty = False
def convert_to_modbus_format(value):
# 如果值为负数,转换为 Modbus 无符号整数形式
if value < 0:
value += 65536 # 或者 value = value & 0xFFFF这种方法也可用于确保值在 0 到 65535 之间
return value
def is_timeout(start_time, timeout):
return time.time() - start_time >= timeout
class camera:
def __init__(self,Calibration_Parameter,Camera_Regio):
self.Calibration_Parameter = Calibration_Parameter
self.Camera_Region = Camera_Regio
self.coordinate_all = []
self.coordinate_all_empty = True
self.coordinate_list = []
self.num_x = - 9100
self.num_y = - 9100
def get_coordinate(self):
print("现在是区间序号为" + str(self.Camera_Region) + "标定参数为" + str(self.Calibration_Parameter) + "来取值了")
self.coordinate_list.clear()
if len(self.coordinate_all) > 0:
self.num_x = self.coordinate_all[0][0]
self.num_y = self.coordinate_all[0][1]
self.coordinate_all.pop(0)
self.coordinate_list.append([self.num_x, self.num_y])
self.coordinate_list = self.coordinate_list[0] # 降维
else:
# 没有坐标了 设置标志位通知外面拍照 直接结束函数
self.coordinate_all_empty = True # 空
self.coordinate_list.clear()
print("get_coordinate", self.coordinate_list)
return self.coordinate_list
class Laser :
def __init__(self,Modbus_dev,Calibration_Parameter_this,Camera_Regio_this):
self.camera = camera(Calibration_Parameter_this,Camera_Regio_this)
self.Modbus_dev = Modbus_dev
self.start_time = time.time()
self.Coordinate_buf = []
#寄存器定义
self.Laser_100x_voltage_reg = 33
self.Laser_100x_current_reg = 34
self.Laser_time_reg = 35
self.Laser_switch_reg = 36
self.Motor_X_mode_reg = 37
self.Motor_Y_mode_reg = 38
self.Motor_X_acceleration_reg = 39
self.Motor_Y_acceleration_reg = 40
self.Motor_X_speed_reg = 41
self.Motor_Y_speed_reg = 42
self.X_reg = 43
self.Y_reg = 44
self.Start_reg = 45
self.Servo_finish_reg = 46
self.Laser_finish_reg = 47
#状态机定义
self.state = 'Init_State'
self.transitions = {
'Init_State' : self.Init_State_Func,
'IDLE_State' : self.IDLE_State_Func,
'Set_XY_State': self.Set_XY_Func ,
'Start_State' : self.Start_Func ,
'Wait_State' : self.Wait_State_Func,
'Err_State' : self.Err_State_Func ,
}
def process(self):
self.transitions[self.state]()
def Updata_State(self, new_state):
if new_state in self.transitions:
self.state = new_state
else:
print(f"无法转换到未知状态: {new_state}")
pass
def Init_State_Func(self):
print("Init_State_Func")
Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100]
try:
read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1)
except Exception as e:
self.Updata_State('Init_State')
print("Wait_State_err")
else:
#读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光
if read_finish_register[0] == 1:
#表示上电成功
try:
self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次
self.start_time = time.time()
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
self.Updata_State('Init_State')
print("Init_State_err")
else:
#写入寄存器错误但未超时
self.Updata_State('Init_State')
print("Init_State")
else:
#写入寄存器错成功
# print("laser power write succeed")
read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10)
read_power_register = list(read_power_register)
print(read_power_register)
if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV:
self.Updata_State('IDLE_State')
print("Init_State_over")
else:
self.Updata_State('Init_State')
print("Init_State")
else:
#表示上电还没成功
self.Updata_State('Init_State')
def Open_State_Func(self):
print("Open_State_Func")
Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100]
try:
read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1)
except Exception as e:
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光
if read_finish_register[0] == 1:
#表示上电成功
try:
self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次
self.start_time = time.time()
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#写入寄存器错误但未超时
# self.Updata_State('Close_State')
print("Close_State")
else:
#写入寄存器错成功
# print("laser power write succeed")
read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10)
read_power_register = list(read_power_register)
print(read_power_register)
if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV:
# self.Updata_State('Mid_State')
print("Init_State_over")
else:
# self.Updata_State('Close_State')
print("Init_State")
else:
#表示上电还没成功
# self.Updata_State('close_State')
pass
def Close_State_Func(self):
print("Close_State_Func")
Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,0,100,0,1,1,2000,2000,2000,2000] # 时间表示在中心带停留的时间
try:
read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1)
except Exception as e:
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光
if read_finish_register[0] == 1:
#表示上电成功
try:
self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次
self.start_time = time.time()
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#写入寄存器错误但未超时
# self.Updata_State('Close_State')
print("Close_State")
else:
#写入寄存器错成功
# print("laser power write succeed")
read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10)
read_power_register = list(read_power_register)
print(read_power_register)
if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV:
# self.Updata_State('Mid_State')
print("Init_State_over")
else:
# self.Updata_State('Close_State')
print("Init_State")
else:
#表示上电还没成功
# self.Updata_State('close_State')
pass
def IDLE_State_Func(self):
print("IDLE_State_Func",self.Coordinate_buf)
if not self.Coordinate_buf:
# 表示没有值可以打
# 拍照获取值
self.Coordinate_buf = [] # 这里为什么要清空一次列表??
self.Coordinate_buf.append(camera.get_coordinate(self.camera))
self.Coordinate_buf = self.Coordinate_buf[0]
self.Updata_State('IDLE_State')
print("get coordinate")
else:
# 表示列表里还有值 那就去打
print("准备去打")
if len(self.Coordinate_buf ) == 2:
if self.Coordinate_buf[0] == 0 and self.Coordinate_buf[1] == 0:
self.Close_State_Func()
else:
self.Open_State_Func()
self.start_time = time.time()
self.Updata_State('Set_XY_State')
print("IDLE_to_Set_XY")
else:
self.Coordinate_buf.clear()
self.Updata_State('IDLE_State')
def Set_XY_Func(self):
# print("Set_XY")
try:
converted_values = [convert_to_modbus_format(value) for value in self.Coordinate_buf]
self.Modbus_dev.write_registers(self.X_reg, converted_values)#由于是连续的,所以写一次
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
self.Updata_State('Err_State')
print("Set_XY_err")
self.Coordinate_buf.clear()
else:
#写入寄存器错误但未超时
self.Updata_State('Set_XY_State')
# print("Set_XY_run")
else:
#写入寄存器错成功
self.start_time = time.time()
self.Updata_State('Start_State')
print("Set_XY_OK")
self.Coordinate_buf.clear()
def Start_Func(self):
print("Start_Func")
try:
self.Modbus_dev.write_registers(self.Start_reg, [1])
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
self.Updata_State('Err_State')
print("Start_State_err")
else:
#写入寄存器错误但未超时
self.Updata_State('Start_State')
print("Start_State_run")
else:
#写入寄存器错成功
self.start_time = time.time()
self.Updata_State('Wait_State')
print("Start_State_ok")
def Wait_State_Func(self):
print("Wait_State_Func")
try:
read_laser_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 2)
print(read_laser_register)
except Exception as e:
if is_timeout(self.start_time, 3):
#读寄存器错误已经超时
self.Updata_State('Err_State')
print("Wait_State_err")
else:
#读寄存器错误但未超时
self.Updata_State('Wait_State')
print("Wait_State_run")
else:
#读寄存器错成功
if (read_laser_register[0] == 1 and read_laser_register[1] == 1):
#表示激光器完成
self.Updata_State('IDLE_State') # 注释
print("Wait_State_ok")
else:
#表示没完成
if is_timeout(self.start_time, 90):#电机最长运行30s 激光最长1min
#没完成并且超时
self.Updata_State('Err_State')
print("Wait_StateErr")
else:
#没完成但未超时
self.Updata_State('Wait_State')
print("Wait_State_loop")
def Err_State_Func(self):
print("Err_State_Func")
print("卡死 : 人工检查设备错误")
print("准备恢复为初始状态")
self.Updata_State('Init_State')
class Control_board :
def __init__(self,COM,Slave_address,Calibration_Parameter,Camera_Regio):
self.modbus = minimalmodbus.Instrument(COM, Slave_address) # 端口名, 从站地址
self.modbus.serial.baudrate = 115200 # 波特率
self.modbus.serial.bytesize = 8
self.modbus.serial.parity = serial.PARITY_NONE
self.modbus.serial.stopbits = 1
self.modbus.serial.timeout = 1 # seconds
self.A_Dev = Laser(self.modbus,Calibration_Parameter,Camera_Regio)
def process(self):
self.A_Dev.process()
if __name__ == "__main__":
transform1 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02],
[ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]]
transform2 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02],
[ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]]
transform3 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02],
[-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]]
transform4 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02],
[-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]]
print("开始初始化相机")
cam1 = monocular_camera(camera_id=1)
cam2 = monocular_camera(camera_id=1)
cam3 = monocular_camera(camera_id=0)
cam4 = monocular_camera(camera_id=1)
cam_list = [cam1, cam2, cam3, cam4]
print("相机初始化完毕")
# 设备注册
Red_Control_board_1 = Control_board("COM3",1,transform1,1)
Red_Control_board_2 = Control_board("COM3",2,transform2,2)
Red_Control_board_3 = Control_board("COM3",3,transform3,3)
Red_Control_board_4 = Control_board("COM3",4,transform4,4)
while True:
Red_Control_board_1.process()
Red_Control_board_2.process()
Red_Control_board_3.process()
Red_Control_board_4.process()
# 如果4个列表都为空
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("1号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("1号设备都空了且在IDLE_State")
capture_point_data(idx=1)
else:
continue
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("2号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("2号设备都空了且在IDLE_State")
capture_point_data(idx=2)
else:
continue
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("3号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("3号设备都空了且在IDLE_State")
capture_point_data(idx=3)
else:
continue
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("4号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("4号设备都空了且在IDLE_State")
capture_point_data(idx=4)
else:
continue
else:
continue