laser_weeding/main.py

502 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import math
import serial.tools.list_ports
import serial
import minimalmodbus
import time
import numpy as np
import cv2
class monocular_camera:
def __init__(self, camera_id=0):
self.cap = cv2.VideoCapture(camera_id, cv2.CAP_V4L2)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # 设置分辨率宽
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 设置分辨率高
self.cap.set(cv2.CAP_PROP_FPS, 30) # 设置帧率
self.num = 0
if not self.cap.isOpened():
print("Cannot open camera")
exit()
def grab_imgs(self):
ret, frame = self.cap.read()
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
return None
# 存储图片
# savePath = os.path.join("./path", "Camera_{:0>3d}.png".format(self.num))
# cv2.imwrite(savePath, frame)
# self.num += 1
# 显示帧
# cv2.imshow("frame", frame)
# cv2.waitKey(1000)
return frame
def capture_point(calib, region_idx):
print("拍照")
picture = cam3.grab_imgs()
# weed
img_weed = cv2.GaussianBlur(picture, (5, 5), 0)
hsv = cv2.cvtColor(img_weed, cv2.COLOR_BGR2HSV)
low_hsv = np.array([33, 43, 46])
high_hsv = np.array([99, 255, 255])
mask_green = cv2.inRange(hsv, lowerb=low_hsv, upperb=high_hsv)
contours, _ = cv2.findContours(mask_green.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours_l = []
weed_list = []
center_xy_list = []
for i in range(len(contours)):
area = cv2.contourArea(contours[i])
if area > 10:
contours_l.append(contours[i])
cnt = contours[i]
x_weed, y_weed, w_weed, h_weed = cv2.boundingRect(cnt)
center_x, center_y = (x_weed + w_weed / 2), (y_weed + h_weed / 2)
weed_list.append([int(center_x), int(center_y)])
p_laser_x = (calib[0][0] * center_x) + (calib[0][1] * center_y) + calib[0][2]
p_laser_y = (calib[1][0] * center_x) + (calib[1][1] * center_y) + calib[1][2]
encoder_position_x = int(p_laser_x) * 91
encoder_position_y = int(p_laser_y) * 91
if region_idx == 1:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
if region_idx == 2:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
if region_idx == 3:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
if region_idx == 4:
if encoder_position_x > -10920 and encoder_position_x < 10920 and encoder_position_y > - 10920 and encoder_position_y < 10920:
center_xy_list.append([encoder_position_x, encoder_position_y - 455]) # -455是车辆前进的补偿
else:
continue
center_xy_list = sorted(center_xy_list, key=lambda x: x[1], reverse=True) # 按y轴从大到小排序
print("center_xy_list", center_xy_list)
return center_xy_list
def capture_point_data(idx):
camera_list = [Red_Control_board_1.A_Dev.camera, Red_Control_board_2.A_Dev.camera, Red_Control_board_3.A_Dev.camera, Red_Control_board_4.A_Dev.camera]
calib = camera_list[idx-1].Calibration_Parameter
region_idx = camera_list[idx-1].Camera_Region
camera_list[idx-1].coordinate_all = capture_point(calib, region_idx)
if len(camera_list[0].coordinate_all) > 0:
camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]]
camera_list[0].coordinate_all.append([0, 0])
camera_list[idx-1].coordinate_all_empty = False
# camera_list = [Red_Control_board_3.A_Dev.camera]
# calib = camera_list[0].Calibration_Parameter
# region_idx = camera_list[0].Camera_Region
# camera_list[0].coordinate_all = capture_point(calib, region_idx)
# if len(camera_list[0].coordinate_all) > 0:
# camera_list[0].coordinate_all = [camera_list[0].coordinate_all[0]]
# camera_list[0].coordinate_all.append([0, 0])
# camera_list[0].coordinate_all_empty = False
def convert_to_modbus_format(value):
# 如果值为负数,转换为 Modbus 无符号整数形式
if value < 0:
value += 65536 # 或者 value = value & 0xFFFF这种方法也可用于确保值在 0 到 65535 之间
return value
def is_timeout(start_time, timeout):
return time.time() - start_time >= timeout
class camera:
def __init__(self,Calibration_Parameter,Camera_Regio):
self.Calibration_Parameter = Calibration_Parameter
self.Camera_Region = Camera_Regio
self.coordinate_all = []
self.coordinate_all_empty = True
self.coordinate_list = []
self.num_x = - 9100
self.num_y = - 9100
def get_coordinate(self):
print("现在是区间序号为" + str(self.Camera_Region) + "标定参数为" + str(self.Calibration_Parameter) + "来取值了")
self.coordinate_list.clear()
if len(self.coordinate_all) > 0:
self.num_x = self.coordinate_all[0][0]
self.num_y = self.coordinate_all[0][1]
self.coordinate_all.pop(0)
self.coordinate_list.append([self.num_x, self.num_y])
self.coordinate_list = self.coordinate_list[0] # 降维
else:
# 没有坐标了 设置标志位通知外面拍照 直接结束函数
self.coordinate_all_empty = True # 空
self.coordinate_list.clear()
print("get_coordinate", self.coordinate_list)
return self.coordinate_list
class Laser :
def __init__(self,Modbus_dev,Calibration_Parameter_this,Camera_Regio_this):
self.camera = camera(Calibration_Parameter_this,Camera_Regio_this)
self.Modbus_dev = Modbus_dev
self.start_time = time.time()
self.Coordinate_buf = []
#寄存器定义
self.Laser_100x_voltage_reg = 33
self.Laser_100x_current_reg = 34
self.Laser_time_reg = 35
self.Laser_switch_reg = 36
self.Motor_X_mode_reg = 37
self.Motor_Y_mode_reg = 38
self.Motor_X_acceleration_reg = 39
self.Motor_Y_acceleration_reg = 40
self.Motor_X_speed_reg = 41
self.Motor_Y_speed_reg = 42
self.X_reg = 43
self.Y_reg = 44
self.Start_reg = 45
self.Servo_finish_reg = 46
self.Laser_finish_reg = 47
#状态机定义
self.state = 'Init_State'
self.transitions = {
'Init_State' : self.Init_State_Func,
'IDLE_State' : self.IDLE_State_Func,
'Set_XY_State': self.Set_XY_Func ,
'Start_State' : self.Start_Func ,
'Wait_State' : self.Wait_State_Func,
'Err_State' : self.Err_State_Func ,
}
def process(self):
self.transitions[self.state]()
def Updata_State(self, new_state):
if new_state in self.transitions:
self.state = new_state
else:
print(f"无法转换到未知状态: {new_state}")
pass
def Init_State_Func(self):
print("Init_State_Func")
Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100]
try:
read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1)
except Exception as e:
self.Updata_State('Init_State')
print("Wait_State_err")
else:
#读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光
if read_finish_register[0] == 1:
#表示上电成功
try:
self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次
self.start_time = time.time()
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
self.Updata_State('Init_State')
print("Init_State_err")
else:
#写入寄存器错误但未超时
self.Updata_State('Init_State')
print("Init_State")
else:
#写入寄存器错成功
# print("laser power write succeed")
read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10)
read_power_register = list(read_power_register)
print(read_power_register)
if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV:
self.Updata_State('IDLE_State')
print("Init_State_over")
else:
self.Updata_State('Init_State')
print("Init_State")
else:
#表示上电还没成功
self.Updata_State('Init_State')
def Open_State_Func(self):
print("Open_State_Func")
Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,10,300,0,1,1,2000,2000,2000,2000] # 能量范围[0-100]
try:
read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1)
except Exception as e:
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光
if read_finish_register[0] == 1:
#表示上电成功
try:
self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次
self.start_time = time.time()
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#写入寄存器错误但未超时
# self.Updata_State('Close_State')
print("Close_State")
else:
#写入寄存器错成功
# print("laser power write succeed")
read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10)
read_power_register = list(read_power_register)
print(read_power_register)
if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV:
# self.Updata_State('Mid_State')
print("Init_State_over")
else:
# self.Updata_State('Close_State')
print("Init_State")
else:
#表示上电还没成功
# self.Updata_State('close_State')
pass
def Close_State_Func(self):
print("Close_State_Func")
Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV = [0,0,100,0,1,1,2000,2000,2000,2000] # 时间表示在中心带停留的时间
try:
read_finish_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 1)
except Exception as e:
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#读寄存器错成功 如果初次上电,激光为忙碌,中间断掉再启动激光为空闲,所以这里不判断激光
if read_finish_register[0] == 1:
#表示上电成功
try:
self.Modbus_dev.write_registers(self.Laser_100x_voltage_reg, Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV)#由于是连续的,所以写一次
self.start_time = time.time()
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
# self.Updata_State('Close_State')
print("Close_State_err")
else:
#写入寄存器错误但未超时
# self.Updata_State('Close_State')
print("Close_State")
else:
#写入寄存器错成功
# print("laser power write succeed")
read_power_register = self.Modbus_dev.read_registers(self.Laser_100x_voltage_reg, 10)
read_power_register = list(read_power_register)
print(read_power_register)
if read_power_register == Noused_Current_TIMEms_OFF_XM_YM_XA_YA_XV_YV:
# self.Updata_State('Mid_State')
print("Init_State_over")
else:
# self.Updata_State('Close_State')
print("Init_State")
else:
#表示上电还没成功
# self.Updata_State('close_State')
pass
def IDLE_State_Func(self):
print("IDLE_State_Func",self.Coordinate_buf)
if not self.Coordinate_buf:
# 表示没有值可以打
# 拍照获取值
self.Coordinate_buf = [] # 这里为什么要清空一次列表??
self.Coordinate_buf.append(camera.get_coordinate(self.camera))
self.Coordinate_buf = self.Coordinate_buf[0]
self.Updata_State('IDLE_State')
print("get coordinate")
else:
# 表示列表里还有值 那就去打
print("准备去打")
if len(self.Coordinate_buf ) == 2:
if self.Coordinate_buf[0] == 0 and self.Coordinate_buf[1] == 0:
self.Close_State_Func()
else:
self.Open_State_Func()
self.start_time = time.time()
self.Updata_State('Set_XY_State')
print("IDLE_to_Set_XY")
else:
self.Coordinate_buf.clear()
self.Updata_State('IDLE_State')
def Set_XY_Func(self):
# print("Set_XY")
try:
converted_values = [convert_to_modbus_format(value) for value in self.Coordinate_buf]
self.Modbus_dev.write_registers(self.X_reg, converted_values)#由于是连续的,所以写一次
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
self.Updata_State('Err_State')
print("Set_XY_err")
self.Coordinate_buf.clear()
else:
#写入寄存器错误但未超时
self.Updata_State('Set_XY_State')
# print("Set_XY_run")
else:
#写入寄存器错成功
self.start_time = time.time()
self.Updata_State('Start_State')
print("Set_XY_OK")
self.Coordinate_buf.clear()
def Start_Func(self):
print("Start_Func")
try:
self.Modbus_dev.write_registers(self.Start_reg, [1])
except Exception as e:
if is_timeout(self.start_time, 3):
#写入寄存器错误已经超时
self.Updata_State('Err_State')
print("Start_State_err")
else:
#写入寄存器错误但未超时
self.Updata_State('Start_State')
print("Start_State_run")
else:
#写入寄存器错成功
self.start_time = time.time()
self.Updata_State('Wait_State')
print("Start_State_ok")
def Wait_State_Func(self):
print("Wait_State_Func")
try:
read_laser_register = self.Modbus_dev.read_registers(self.Servo_finish_reg, 2)
print(read_laser_register)
except Exception as e:
if is_timeout(self.start_time, 3):
#读寄存器错误已经超时
self.Updata_State('Err_State')
print("Wait_State_err")
else:
#读寄存器错误但未超时
self.Updata_State('Wait_State')
print("Wait_State_run")
else:
#读寄存器错成功
if (read_laser_register[0] == 1 and read_laser_register[1] == 1):
#表示激光器完成
self.Updata_State('IDLE_State') # 注释
print("Wait_State_ok")
else:
#表示没完成
if is_timeout(self.start_time, 90):#电机最长运行30s 激光最长1min
#没完成并且超时
self.Updata_State('Err_State')
print("Wait_StateErr")
else:
#没完成但未超时
self.Updata_State('Wait_State')
print("Wait_State_loop")
def Err_State_Func(self):
print("Err_State_Func")
print("卡死 : 人工检查设备错误")
print("准备恢复为初始状态")
self.Updata_State('Init_State')
class Control_board :
def __init__(self,COM,Slave_address,Calibration_Parameter,Camera_Regio):
self.modbus = minimalmodbus.Instrument(COM, Slave_address) # 端口名, 从站地址
self.modbus.serial.baudrate = 115200 # 波特率
self.modbus.serial.bytesize = 8
self.modbus.serial.parity = serial.PARITY_NONE
self.modbus.serial.stopbits = 1
self.modbus.serial.timeout = 1 # seconds
self.A_Dev = Laser(self.modbus,Calibration_Parameter,Camera_Regio)
def process(self):
self.A_Dev.process()
if __name__ == "__main__":
transform1 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02],
[ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]]
transform2 = [[ 9.78925834e-01, 1.96192472e-02, -2.97419867e+02],
[ 3.01586932e-02, -9.63219883e-01, 3.61695282e+02]]
transform3 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02],
[-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]]
transform4 = [[ 1.55162446e+00, -4.99468032e-03, -5.35657043e+02],
[-3.34157095e-03, -1.54830655e+00, 4.28145094e+02]]
print("开始初始化相机")
cam1 = monocular_camera(camera_id=1)
cam2 = monocular_camera(camera_id=1)
cam3 = monocular_camera(camera_id=0)
cam4 = monocular_camera(camera_id=1)
cam_list = [cam1, cam2, cam3, cam4]
print("相机初始化完毕")
# 设备注册
Red_Control_board_1 = Control_board("COM3",1,transform1,1)
Red_Control_board_2 = Control_board("COM3",2,transform2,2)
Red_Control_board_3 = Control_board("COM3",3,transform3,3)
Red_Control_board_4 = Control_board("COM3",4,transform4,4)
while True:
Red_Control_board_1.process()
Red_Control_board_2.process()
Red_Control_board_3.process()
Red_Control_board_4.process()
# 如果4个列表都为空
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("1号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("1号设备都空了且在IDLE_State")
capture_point_data(idx=1)
else:
continue
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("2号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("2号设备都空了且在IDLE_State")
capture_point_data(idx=2)
else:
continue
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("3号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("3号设备都空了且在IDLE_State")
capture_point_data(idx=3)
else:
continue
if Red_Control_board_1.A_Dev.camera.coordinate_all_empty :
print("4号设备空了")
if Red_Control_board_1.A_Dev.state == "IDLE_State" :
print("4号设备都空了且在IDLE_State")
capture_point_data(idx=4)
else:
continue
else:
continue