import os import math import serial.tools.list_ports import serial import minimalmodbus import time import numpy as np import cv2 class monocular_camera: def __init__(self, camera_id=0): self.cap = cv2.VideoCapture(camera_id, cv2.CAP_V4L2) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # 设置分辨率宽 self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 设置分辨率高 self.cap.set(cv2.CAP_PROP_FPS, 30) # 设置帧率 self.num = 0 if not self.cap.isOpened(): print("Cannot open camera") exit() def grab_imgs(self): ret, frame = self.cap.read() if not ret: print("Can't receive frame (stream end?). Exiting ...") return None # 存储图片 # savePath = os.path.join("./path", "Camera_{:0>3d}.png".format(self.num)) # cv2.imwrite(savePath, frame) # self.num += 1 # 显示帧 # cv2.imshow("frame", frame) # cv2.waitKey(1000) return frame def capture_point(calib, region_idx): print("拍照") picture = cam3.grab_imgs() # weed img_weed = cv2.GaussianBlur(picture, (5, 5), 0) hsv = cv2.cvtColor(img_weed, cv2.COLOR_BGR2HSV) low_hsv = np.array([33, 43, 46]) high_hsv = np.array([99, 255, 255]) mask_green = cv2.inRange(hsv, lowerb=low_hsv, upperb=high_hsv) contours, _ = cv2.findContours(mask_green.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) contours_l = [] weed_list = [] center_xy_list = [] for i in range(len(contours)): area = cv2.contourArea(contours[i]) if area > 10: contours_l.append(contours[i]) cnt = contours[i] x_weed, y_weed, w_weed, h_weed = cv2.boundingRect(cnt) center_x, center_y = (x_weed + w_weed / 2), (y_weed + h_weed / 2) weed_list.append([int(center_x), int(center_y)]) p_laser_x = (calib[0][0] * center_x) + (calib[0][1] * center_y) + calib[0][2] p_laser_y = (calib[1][0] * center_x) + (calib[1][1] * center_y) + calib[1][2] encoder_position_x = int(p_laser_x) * 91 encoder_position_y = int(p_laser_y) * 91 if region_idx == 1: if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 if region_idx == 2: if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 if region_idx == 3: if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 if region_idx == 4: if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 else: continue center_xy_list = sorted(center_xy_list, key=lambda x: x[1], reverse=True) # 按y轴从大到小排序 print("center_xy_list", center_xy_list) return center_xy_list def capture_point_data(idx): camera_list = [Red_Control_board_1.A_Dev.camera, Red_Control_board_2.A_Dev.camera, Red_Control_board_3.A_Dev.camera, Red_Control_board_4.A_Dev.camera] calib = camera_list[idx-1].Calibration_Parameter region_idx = camera_list[idx-1].Camera_Region camera_list[idx-1].coordinate_all = capture_point(calib, region_idx) if len(camera_list[0].coordinate_all) > 0: camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]] camera_list[0].coordinate_all.append([0, 0]) camera_list[idx-1].coordinate_all_empty = False # camera_list = [Red_Control_board_3.A_Dev.camera] # calib = camera_list[0].Calibration_Parameter # region_idx = camera_list[0].Camera_Region # camera_list[0].coordinate_all = capture_point(calib, region_idx) # if len(camera_list[0].coordinate_all) > 0: # camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]] # camera_list[0].coordinate_all.append([0, 0]) # camera_list[0].coordinate_all_empty = False def convert_to_modbus_format(value): # 如果值为负数,转换为 Modbus 无符号整数形式 if value < 0: value += 65536 # 或者 value = value & 0xFFFF,这种方法也可用于确保值在 0 到 65535 之间 return value def is_timeout(start_time, timeout): return time.time() - start_time >= timeout class camera: def __init__(self,Calibration_Parameter,Camera_Regio): self.Calibration_Parameter = Calibration_Parameter self.Camera_Region = Camera_Regio self.coordinate_all = [] self.coordinate_all_empty = True self.coordinate_list = [] self.num_x = - 9100 self.num_y = - 9100 def get_coordinate(self): print("现在是区间序号为" + str(self.Camera_Region) + "标定参数为" + str(self.Calibration_Parameter) + "来取值了") self.coordinate_list.clear() if len(self.coordinate_all) > 0: self.num_x = self.coordinate_all[0][0] self.num_y = self.coordinate_all[0][1] self.coordinate_all.pop(0) self.coordinate_list.append([self.num_x, self.num_y]) self.coordinate_list = self.coordinate_list[0] # 降维 else: # 没有坐标了 设置标志位通知外面拍照 直接结束函数 self.coordinate_all_empty = True # 空 self.coordinate_list.clear() print("get_coordinate", self.coordinate_list) return self.coordinate_list class Laser : def __init__(self,Modbus_dev,Calibration_Parameter_this,Camera_Regio_this): self.camera = camera(Calibration_Parameter_this,Camera_Regio_this) self.Modbus_dev = Modbus_dev self.start_time = time.time() self.Coordinate_buf = [] #寄存器定义 self.Laser_100x_voltage_reg = 33 self.Laser_100x_current_reg = 34 self.Laser_time_reg = 35 self.Laser_switch_reg = 36 self.Motor_X_mode_reg = 37 self.Motor_Y_mode_reg = 38 self.Motor_X_acceleration_reg = 39 self.Motor_Y_acceleration_reg = 40 self.Motor_X_speed_reg = 41 self.Motor_Y_speed_reg = 42 self.X_reg = 43 self.Y_reg = 44 self.Start_reg = 45 self.Servo_finish_reg = 46 self.Laser_finish_reg = 47 #状态机定义 self.state = 'Init_State' self.transitions = { 'Init_State' : self.Init_State_Func, 'IDLE_State' : self.IDLE_State_Func, 'Set_XY_State': self.Set_XY_Func , 'Start_State' : self.Start_Func , 'Wait_State' : self.Wait_State_Func, 'Err_State' : self.Err_State_Func , } def process(self): self.transitions[self.state]() def Updata_State(self, new_state): if new_state in self.transitions: self.state = new_state else: print(f"无法转换到未知状态: {new_state}") pass def Init_State_Func(self): print("Init_State_Func") Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100] try: read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1) except Exception as e: self.Updata_State('Init_State') print("Wait_State_err") else: #读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光 if read_finish_register[0] == 1: #表示上电成功 try: self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次 self.start_time = time.time() except Exception as e: if is_timeout(self.start_time, 3): #写入寄存器错误已经超时 self.Updata_State('Init_State') print("Init_State_err") else: #写入寄存器错误但未超时 self.Updata_State('Init_State') print("Init_State") else: #写入寄存器错成功 # print("laser power write succeed") read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10) read_power_register = list(read_power_register) print(read_power_register) if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV: self.Updata_State('IDLE_State') print("Init_State_over") else: self.Updata_State('Init_State') print("Init_State") else: #表示上电还没成功 self.Updata_State('Init_State') def Open_State_Func(self): print("Open_State_Func") Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100] try: read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1) except Exception as e: # self.Updata_State('Close_State') print("Close_State_err") else: #读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光 if read_finish_register[0] == 1: #表示上电成功 try: self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次 self.start_time = time.time() except Exception as e: if is_timeout(self.start_time, 3): #写入寄存器错误已经超时 # self.Updata_State('Close_State') print("Close_State_err") else: #写入寄存器错误但未超时 # self.Updata_State('Close_State') print("Close_State") else: #写入寄存器错成功 # print("laser power write succeed") read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10) read_power_register = list(read_power_register) print(read_power_register) if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV: # self.Updata_State('Mid_State') print("Init_State_over") else: # self.Updata_State('Close_State') print("Init_State") else: #表示上电还没成功 # self.Updata_State('close_State') pass def Close_State_Func(self): print("Close_State_Func") Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,0,100,0,1,1,2000,2000,2000,2000] # 时间表示在中心带停留的时间 try: read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1) except Exception as e: # self.Updata_State('Close_State') print("Close_State_err") else: #读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光 if read_finish_register[0] == 1: #表示上电成功 try: self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次 self.start_time = time.time() except Exception as e: if is_timeout(self.start_time, 3): #写入寄存器错误已经超时 # self.Updata_State('Close_State') print("Close_State_err") else: #写入寄存器错误但未超时 # self.Updata_State('Close_State') print("Close_State") else: #写入寄存器错成功 # print("laser power write succeed") read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10) read_power_register = list(read_power_register) print(read_power_register) if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV: # self.Updata_State('Mid_State') print("Init_State_over") else: # self.Updata_State('Close_State') print("Init_State") else: #表示上电还没成功 # self.Updata_State('close_State') pass def IDLE_State_Func(self): print("IDLE_State_Func",self.Coordinate_buf) if not self.Coordinate_buf: # 表示没有值可以打 # 拍照获取值 self.Coordinate_buf = [] # 这里为什么要清空一次列表?? self.Coordinate_buf.append(camera.get_coordinate(self.camera)) self.Coordinate_buf = self.Coordinate_buf[0] self.Updata_State('IDLE_State') print("get coordinate") else: # 表示列表里还有值 那就去打 print("准备去打") if len(self.Coordinate_buf ) == 2: if self.Coordinate_buf[0] == 0 and self.Coordinate_buf[1] == 0: self.Close_State_Func() else: self.Open_State_Func() self.start_time = time.time() self.Updata_State('Set_XY_State') print("IDLE_to_Set_XY") else: self.Coordinate_buf.clear() self.Updata_State('IDLE_State') def Set_XY_Func(self): # print("Set_XY") try: converted_values = [convert_to_modbus_format(value) for value in self.Coordinate_buf] self.Modbus_dev.write_registers(self.X_reg, converted_values)#由于是连续的,所以写一次 except Exception as e: if is_timeout(self.start_time, 3): #写入寄存器错误已经超时 self.Updata_State('Err_State') print("Set_XY_err") self.Coordinate_buf.clear() else: #写入寄存器错误但未超时 self.Updata_State('Set_XY_State') # print("Set_XY_run") else: #写入寄存器错成功 self.start_time = time.time() self.Updata_State('Start_State') print("Set_XY_OK") self.Coordinate_buf.clear() def Start_Func(self): print("Start_Func") try: self.Modbus_dev.write_registers(self.Start_reg, [1]) except Exception as e: if is_timeout(self.start_time, 3): #写入寄存器错误已经超时 self.Updata_State('Err_State') print("Start_State_err") else: #写入寄存器错误但未超时 self.Updata_State('Start_State') print("Start_State_run") else: #写入寄存器错成功 self.start_time = time.time() self.Updata_State('Wait_State') print("Start_State_ok") def Wait_State_Func(self): print("Wait_State_Func") try: read_laser_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 2) print(read_laser_register) except Exception as e: if is_timeout(self.start_time, 3): #读寄存器错误已经超时 self.Updata_State('Err_State') print("Wait_State_err") else: #读寄存器错误但未超时 self.Updata_State('Wait_State') print("Wait_State_run") else: #读寄存器错成功 if (read_laser_register[0] == 1 and read_laser_register[1] == 1): #表示激光器完成 self.Updata_State('IDLE_State') # 注释 print("Wait_State_ok") else: #表示没完成 if is_timeout(self.start_time, 90):#电机最长运行30s 激光最长1min #没完成并且超时 self.Updata_State('Err_State') print("Wait_StateErr") else: #没完成但未超时 self.Updata_State('Wait_State') print("Wait_State_loop") def Err_State_Func(self): print("Err_State_Func") print("卡死 : 人工检查设备错误") print("准备恢复为初始状态") self.Updata_State('Init_State') class Control_board : def __init__(self,COM,Slave_address,Calibration_Parameter,Camera_Regio): self.modbus = minimalmodbus.Instrument(COM, Slave_address) # 端口名, 从站地址 self.modbus.serial.baudrate = 115200 # 波特率 self.modbus.serial.bytesize = 8 self.modbus.serial.parity = serial.PARITY_NONE self.modbus.serial.stopbits = 1 self.modbus.serial.timeout = 1 # seconds self.A_Dev = Laser(self.modbus,Calibration_Parameter,Camera_Regio) def process(self): self.A_Dev.process() if __name__ == "__main__": transform1 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02], [ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]] transform2 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02], [ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]] transform3 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02], [-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]] transform4 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02], [-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]] print("开始初始化相机") cam1 = monocular_camera(camera_id=1) cam2 = monocular_camera(camera_id=1) cam3 = monocular_camera(camera_id=0) cam4 = monocular_camera(camera_id=1) cam_list = [cam1, cam2, cam3, cam4] print("相机初始化完毕") # 设备注册 Red_Control_board_1 = Control_board("COM3",1,transform1,1) Red_Control_board_2 = Control_board("COM3",2,transform2,2) Red_Control_board_3 = Control_board("COM3",3,transform3,3) Red_Control_board_4 = Control_board("COM3",4,transform4,4) while True: Red_Control_board_1.process() Red_Control_board_2.process() Red_Control_board_3.process() Red_Control_board_4.process() # 如果4个列表都为空 if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : print("1号设备空了") if Red_Control_board_1.A_Dev.state == "IDLE_State" : print("1号设备都空了,且在IDLE_State") capture_point_data(idx=1) else: continue if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : print("2号设备空了") if Red_Control_board_1.A_Dev.state == "IDLE_State" : print("2号设备都空了,且在IDLE_State") capture_point_data(idx=2) else: continue if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : print("3号设备空了") if Red_Control_board_1.A_Dev.state == "IDLE_State" : print("3号设备都空了,且在IDLE_State") capture_point_data(idx=3) else: continue if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : print("4号设备空了") if Red_Control_board_1.A_Dev.state == "IDLE_State" : print("4号设备都空了,且在IDLE_State") capture_point_data(idx=4) else: continue else: continue