diff --git a/main_test.py b/main_test.py new file mode 100644 index 0000000..938952d --- /dev/null +++ b/main_test.py @@ -0,0 +1,502 @@ +import os +import math +import serial.tools.list_ports +import serial +import minimalmodbus +import time +import numpy as np +import cv2 + + +class monocular_camera: + def __init__(self, camera_id=0): + self.cap = cv2.VideoCapture(camera_id, cv2.CAP_V4L2) + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # 设置分辨率宽 + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 设置分辨率高 + self.cap.set(cv2.CAP_PROP_FPS, 30) # 设置帧率 + self.num = 0 + if not self.cap.isOpened(): + print("Cannot open camera") + exit() + + def grab_imgs(self): + ret, frame = self.cap.read() + if not ret: + print("Can't receive frame (stream end?). Exiting ...") + return None + # 存储图片 + # savePath = os.path.join("./path", "Camera_{:0>3d}.png".format(self.num)) + # cv2.imwrite(savePath, frame) + # self.num += 1 + # 显示帧 + # cv2.imshow("frame", frame) + # cv2.waitKey(1000) + return frame + + +def capture_point(calib, region_idx): + + print("拍照") + picture = cam3.grab_imgs() + # weed + img_weed = cv2.GaussianBlur(picture, (5, 5), 0) + hsv = cv2.cvtColor(img_weed, cv2.COLOR_BGR2HSV) + low_hsv = np.array([33, 43, 46]) + high_hsv = np.array([99, 255, 255]) + mask_green = cv2.inRange(hsv, lowerb=low_hsv, upperb=high_hsv) + contours, _ = cv2.findContours(mask_green.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + contours_l = [] + weed_list = [] + center_xy_list = [] + + + for i in range(len(contours)): + area = cv2.contourArea(contours[i]) + if area > 10: + contours_l.append(contours[i]) + cnt = contours[i] + x_weed, y_weed, w_weed, h_weed = cv2.boundingRect(cnt) + center_x, center_y = (x_weed + w_weed / 2), (y_weed + h_weed / 2) + weed_list.append([int(center_x), int(center_y)]) + p_laser_x = (calib[0][0] * center_x) + (calib[0][1] * center_y) + calib[0][2] + p_laser_y = (calib[1][0] * center_x) + (calib[1][1] * center_y) + calib[1][2] + encoder_position_x = int(p_laser_x) * 91 + encoder_position_y = int(p_laser_y) * 91 + if region_idx == 1: + if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: + center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 + if region_idx == 2: + if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: + center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 + if region_idx == 3: + if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: + center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 + if region_idx == 4: + if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920: + center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿 + else: + continue + center_xy_list = sorted(center_xy_list, key=lambda x: x[1], reverse=True) # 按y轴从大到小排序 + print("center_xy_list", center_xy_list) + + return center_xy_list + +def capture_point_data(idx): + camera_list = [Red_Control_board_1.A_Dev.camera, Red_Control_board_2.A_Dev.camera, Red_Control_board_3.A_Dev.camera, Red_Control_board_4.A_Dev.camera] + calib = camera_list[idx-1].Calibration_Parameter + region_idx = camera_list[idx-1].Camera_Region + camera_list[idx-1].coordinate_all = capture_point(calib, region_idx) + if len(camera_list[0].coordinate_all) > 0: + camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]] + camera_list[0].coordinate_all.append([0, 0]) + camera_list[idx-1].coordinate_all_empty = False + + # camera_list = [Red_Control_board_3.A_Dev.camera] + # calib = camera_list[0].Calibration_Parameter + # region_idx = camera_list[0].Camera_Region + # camera_list[0].coordinate_all = capture_point(calib, region_idx) + # if len(camera_list[0].coordinate_all) > 0: + # camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]] + # camera_list[0].coordinate_all.append([0, 0]) + # camera_list[0].coordinate_all_empty = False + +def convert_to_modbus_format(value): + # 如果值为负数,转换为 Modbus 无符号整数形式 + if value < 0: + value += 65536 # 或者 value = value & 0xFFFF,这种方法也可用于确保值在 0 到 65535 之间 + return value + +def is_timeout(start_time, timeout): + return time.time() - start_time >= timeout + +class camera: + def __init__(self,Calibration_Parameter,Camera_Regio): + self.Calibration_Parameter = Calibration_Parameter + self.Camera_Region = Camera_Regio + self.coordinate_all = [] + self.coordinate_all_empty = True + self.coordinate_list = [] + self.num_x = - 9100 + self.num_y = - 9100 + + def get_coordinate(self): + print("现在是区间序号为" + str(self.Camera_Region) + "标定参数为" + str(self.Calibration_Parameter) + "来取值了") + self.coordinate_list.clear() + + + if len(self.coordinate_all) > 0: + self.num_x = self.coordinate_all[0][0] + self.num_y = self.coordinate_all[0][1] + self.coordinate_all.pop(0) + self.coordinate_list.append([self.num_x, self.num_y]) + self.coordinate_list = self.coordinate_list[0] # 降维 + else: + # 没有坐标了 设置标志位通知外面拍照 直接结束函数 + self.coordinate_all_empty = True # 空 + self.coordinate_list.clear() + + print("get_coordinate", self.coordinate_list) + return self.coordinate_list + + +class Laser : + def __init__(self,Modbus_dev,Calibration_Parameter_this,Camera_Regio_this): + self.camera = camera(Calibration_Parameter_this,Camera_Regio_this) + self.Modbus_dev = Modbus_dev + self.start_time = time.time() + self.Coordinate_buf = [] + #寄存器定义 + self.Laser_100x_voltage_reg = 33 + self.Laser_100x_current_reg = 34 + self.Laser_time_reg = 35 + self.Laser_switch_reg = 36 + self.Motor_X_mode_reg = 37 + self.Motor_Y_mode_reg = 38 + self.Motor_X_acceleration_reg = 39 + self.Motor_Y_acceleration_reg = 40 + self.Motor_X_speed_reg = 41 + self.Motor_Y_speed_reg = 42 + self.X_reg = 43 + self.Y_reg = 44 + self.Start_reg = 45 + self.Servo_finish_reg = 46 + self.Laser_finish_reg = 47 + #状态机定义 + self.state = 'Init_State' + self.transitions = { + 'Init_State' : self.Init_State_Func, + 'IDLE_State' : self.IDLE_State_Func, + 'Set_XY_State': self.Set_XY_Func , + 'Start_State' : self.Start_Func , + 'Wait_State' : self.Wait_State_Func, + 'Err_State' : self.Err_State_Func , + } + + def process(self): + self.transitions[self.state]() + + def Updata_State(self, new_state): + if new_state in self.transitions: + self.state = new_state + else: + print(f"无法转换到未知状态: {new_state}") + pass + + def Init_State_Func(self): + print("Init_State_Func") + Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100] + try: + read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1) + + except Exception as e: + self.Updata_State('Init_State') + print("Wait_State_err") + else: + #读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光 + if read_finish_register[0] == 1: + #表示上电成功 + try: + self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次 + self.start_time = time.time() + except Exception as e: + if is_timeout(self.start_time, 3): + #写入寄存器错误已经超时 + self.Updata_State('Init_State') + print("Init_State_err") + else: + #写入寄存器错误但未超时 + self.Updata_State('Init_State') + print("Init_State") + + else: + #写入寄存器错成功 + # print("laser power write succeed") + read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10) + read_power_register = list(read_power_register) + print(read_power_register) + if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV: + self.Updata_State('IDLE_State') + print("Init_State_over") + else: + self.Updata_State('Init_State') + print("Init_State") + else: + #表示上电还没成功 + self.Updata_State('Init_State') + + def Open_State_Func(self): + print("Open_State_Func") + Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100] + try: + read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1) + + except Exception as e: + # self.Updata_State('Close_State') + print("Close_State_err") + else: + #读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光 + if read_finish_register[0] == 1: + #表示上电成功 + try: + self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次 + self.start_time = time.time() + except Exception as e: + if is_timeout(self.start_time, 3): + #写入寄存器错误已经超时 + # self.Updata_State('Close_State') + print("Close_State_err") + else: + #写入寄存器错误但未超时 + # self.Updata_State('Close_State') + print("Close_State") + + else: + #写入寄存器错成功 + # print("laser power write succeed") + read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10) + read_power_register = list(read_power_register) + print(read_power_register) + if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV: + # self.Updata_State('Mid_State') + print("Init_State_over") + else: + # self.Updata_State('Close_State') + print("Init_State") + else: + #表示上电还没成功 + # self.Updata_State('close_State') + pass + + def Close_State_Func(self): + print("Close_State_Func") + Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,0,100,0,1,1,2000,2000,2000,2000] # 时间表示在中心带停留的时间 + try: + read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1) + + except Exception as e: + # self.Updata_State('Close_State') + print("Close_State_err") + else: + #读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光 + if read_finish_register[0] == 1: + #表示上电成功 + try: + self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次 + self.start_time = time.time() + except Exception as e: + if is_timeout(self.start_time, 3): + #写入寄存器错误已经超时 + # self.Updata_State('Close_State') + print("Close_State_err") + else: + #写入寄存器错误但未超时 + # self.Updata_State('Close_State') + print("Close_State") + + else: + #写入寄存器错成功 + # print("laser power write succeed") + read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10) + read_power_register = list(read_power_register) + print(read_power_register) + if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV: + # self.Updata_State('Mid_State') + print("Init_State_over") + else: + # self.Updata_State('Close_State') + print("Init_State") + else: + #表示上电还没成功 + # self.Updata_State('close_State') + pass + + def IDLE_State_Func(self): + print("IDLE_State_Func",self.Coordinate_buf) + if not self.Coordinate_buf: + # 表示没有值可以打 + # 拍照获取值 + self.Coordinate_buf = [] # 这里为什么要清空一次列表?? + self.Coordinate_buf.append(camera.get_coordinate(self.camera)) + self.Coordinate_buf = self.Coordinate_buf[0] + self.Updata_State('IDLE_State') + print("get coordinate") + else: + # 表示列表里还有值 那就去打 + print("准备去打") + if len(self.Coordinate_buf ) == 2: + if self.Coordinate_buf[0] == 0 and self.Coordinate_buf[1] == 0: + self.Close_State_Func() + else: + self.Open_State_Func() + self.start_time = time.time() + self.Updata_State('Set_XY_State') + print("IDLE_to_Set_XY") + else: + self.Coordinate_buf.clear() + self.Updata_State('IDLE_State') + + + + def Set_XY_Func(self): + # print("Set_XY") + try: + converted_values = [convert_to_modbus_format(value) for value in self.Coordinate_buf] + self.Modbus_dev.write_registers(self.X_reg, converted_values)#由于是连续的,所以写一次 + except Exception as e: + if is_timeout(self.start_time, 3): + #写入寄存器错误已经超时 + self.Updata_State('Err_State') + print("Set_XY_err") + self.Coordinate_buf.clear() + + else: + #写入寄存器错误但未超时 + self.Updata_State('Set_XY_State') + # print("Set_XY_run") + else: + #写入寄存器错成功 + self.start_time = time.time() + self.Updata_State('Start_State') + print("Set_XY_OK") + self.Coordinate_buf.clear() + + def Start_Func(self): + print("Start_Func") + try: + self.Modbus_dev.write_registers(self.Start_reg, [1]) + except Exception as e: + if is_timeout(self.start_time, 3): + #写入寄存器错误已经超时 + self.Updata_State('Err_State') + print("Start_State_err") + else: + #写入寄存器错误但未超时 + self.Updata_State('Start_State') + print("Start_State_run") + + else: + #写入寄存器错成功 + self.start_time = time.time() + self.Updata_State('Wait_State') + print("Start_State_ok") + + def Wait_State_Func(self): + print("Wait_State_Func") + try: + read_laser_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 2) + print(read_laser_register) + except Exception as e: + if is_timeout(self.start_time, 3): + #读寄存器错误已经超时 + self.Updata_State('Err_State') + print("Wait_State_err") + + else: + #读寄存器错误但未超时 + self.Updata_State('Wait_State') + print("Wait_State_run") + else: + #读寄存器错成功 + if (read_laser_register[0] == 1 and read_laser_register[1] == 1): + #表示激光器完成 + self.Updata_State('IDLE_State') # 注释 + print("Wait_State_ok") + else: + #表示没完成 + if is_timeout(self.start_time, 90):#电机最长运行30s 激光最长1min + #没完成并且超时 + self.Updata_State('Err_State') + print("Wait_StateErr") + else: + #没完成但未超时 + self.Updata_State('Wait_State') + print("Wait_State_loop") + + + + def Err_State_Func(self): + print("Err_State_Func") + print("卡死 : 人工检查设备错误") + print("准备恢复为初始状态") + self.Updata_State('Init_State') + + + +class Control_board : + def __init__(self,COM,Slave_address,Calibration_Parameter,Camera_Regio): + self.modbus = minimalmodbus.Instrument(COM, Slave_address) # 端口名, 从站地址 + self.modbus.serial.baudrate = 115200 # 波特率 + self.modbus.serial.bytesize = 8 + self.modbus.serial.parity = serial.PARITY_NONE + self.modbus.serial.stopbits = 1 + self.modbus.serial.timeout = 1 # seconds + self.A_Dev = Laser(self.modbus,Calibration_Parameter,Camera_Regio) + + def process(self): + self.A_Dev.process() + + +if __name__ == "__main__": + + transform1 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02], + [ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]] + + transform2 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02], + [ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]] + + transform3 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02], + [-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]] + + transform4 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02], + [-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]] + + print("开始初始化相机") + cam1 = monocular_camera(camera_id=1) + cam2 = monocular_camera(camera_id=1) + cam3 = monocular_camera(camera_id=0) + cam4 = monocular_camera(camera_id=1) + cam_list = [cam1, cam2, cam3, cam4] + print("相机初始化完毕") + + # 设备注册 + Red_Control_board_1 = Control_board("COM3",1,transform1,1) + Red_Control_board_2 = Control_board("COM3",2,transform2,2) + Red_Control_board_3 = Control_board("COM3",3,transform3,3) + Red_Control_board_4 = Control_board("COM3",4,transform4,4) + + while True: + Red_Control_board_1.process() + Red_Control_board_2.process() + Red_Control_board_3.process() + Red_Control_board_4.process() + # 如果4个列表都为空 + if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : + print("1号设备空了") + if Red_Control_board_1.A_Dev.state == "IDLE_State" : + print("1号设备都空了,且在IDLE_State") + capture_point_data(idx=1) + else: + continue + if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : + print("2号设备空了") + if Red_Control_board_1.A_Dev.state == "IDLE_State" : + print("2号设备都空了,且在IDLE_State") + capture_point_data(idx=2) + else: + continue + if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : + print("3号设备空了") + if Red_Control_board_1.A_Dev.state == "IDLE_State" : + print("3号设备都空了,且在IDLE_State") + capture_point_data(idx=3) + else: + continue + if Red_Control_board_1.A_Dev.camera.coordinate_all_empty : + print("4号设备空了") + if Red_Control_board_1.A_Dev.state == "IDLE_State" : + print("4号设备都空了,且在IDLE_State") + capture_point_data(idx=4) + else: + continue + else: + continue \ No newline at end of file