活动介绍

AttributeError: 'collections.deque' object has no attribute 'leftpop'为什么

时间: 2025-05-10 08:23:16 浏览: 27
<think>嗯,用户遇到了一个AttributeError,说collections.deque对象没有'leftpop'属性。我需要先弄清楚这个错误的原因。deque是Python中的一个双端队列,通常支持从两端添加或弹出元素。常见的操作有popleft()和pop(),分别从左边和右边弹出元素。用户可能误将popleft()写成leftpop()了。 首先,我应该确认deque的方法名。没错,正确的左边弹出方法是popleft(),而不是leftpop()。所以错误是因为方法名拼写错误导致的。用户可能在代码中调用了deque.leftpop(),而正确的应该是deque.popleft()。接下来,我需要给出一个示例代码,展示正确的用法。例如,创建一个deque,然后使用popleft()方法。同时,解释这个错误的原因,并提醒用户注意方法名的正确拼写。 然后,可能需要进一步解释deque的其他常用方法,比如appendleft()、append()、pop()等,帮助用户全面了解deque的操作。此外,可以提到deque的高效性,特别是在左端操作的O(1)时间复杂度,比列表的pop(0)更高效。用户可能是在实现队列或需要高效左右操作的场景下使用deque,所以可以建议他们在需要左端弹出元素时使用popleft(),并检查代码中的方法名拼写是否正确。此外,可能还需要提醒用户注意Python标准库中模块和方法的命名习惯,通常是下划线分隔的小写字母,比如popleft,而不是驼峰式或连字符式。 最后,考虑用户可能对deque的其他方法或使用场景有疑问,所以准备相关问题,比如其他常用方法,与列表的性能比较,线程安全,以及实现栈的例子,这样可以帮助用户更深入地理解和应用deque。</think>遇到的`AttributeError`是因为`deque`对象没有`leftpop`方法。正确的方法名是`popleft()`,用于从双端队列的左侧弹出元素。以下是详细解释和修复方法: ```python from collections import deque # 创建双端队列 d = deque([1, 2, 3, 4]) # 正确用法:从左侧弹出元素 left_item = d.popleft() # ✅ 正确的函数名 print(left_item) # 输出 1 print(d) # 输出 deque([2, 3, 4]) # 错误用法示例(会触发AttributeError) # wrong_item = d.leftpop() # ❌ 错误函数名 ``` **错误原因分析**: 1. `deque`对象只有`popleft()`方法没有`leftpop()`方法 2. Python标准库方法命名遵循`snake_case`格式,动词在前(pop)方向在后(left) 3. `popleft()`的时间复杂度是O(1),而列表的pop(0)操作是O(n) **相关方法对比**: ```python d.appendleft(0) # 左侧添加 d.append(5) # 右侧添加 right_item = d.pop() # 右侧弹出 ```
阅读全文

相关推荐

你的解决方案应该如何放进下面的程序中 # -*- coding: utf-8 -*- import sys import os import cv2 import numpy as np import math import time import logging import threading from collections import deque from PyQt5.QtWidgets import ( QApplication, QMainWindow, QPushButton, QWidget, QVBoxLayout, QHBoxLayout, QMessageBox, QLabel, QFileDialog, QToolBox, QComboBox, QStatusBar, QGroupBox, QSlider, QDockWidget, QProgressDialog, QLineEdit, QRadioButton, QGridLayout, QSpinBox, QCheckBox, QDialog, QDialogButtonBox, QDoubleSpinBox, QProgressBar, ) from PyQt5.QtCore import QRect, Qt, QSettings, QThread, pyqtSignal, QTimer, QMetaObject, pyqtSlot, Q_ARG from PyQt5.QtGui import QImage, QPixmap from CamOperation_class import CameraOperation from MvCameraControl_class import * import ctypes from ctypes import cast, POINTER from datetime import datetime import skimage import platform from CameraParams_header import ( MV_GIGE_DEVICE, MV_USB_DEVICE, MV_GENTL_CAMERALINK_DEVICE, MV_GENTL_CXP_DEVICE, MV_GENTL_XOF_DEVICE ) # ===== 全局配置 ===== # 模板匹配参数 MATCH_THRESHOLD = 0.75 # 降低匹配置信度阈值以提高灵敏度 MIN_MATCH_COUNT = 10 # 最小匹配特征点数量 MIN_FRAME_INTERVAL = 0.1 # 最小检测间隔(秒) MIN_CONSECUTIVE_MATCHES = 3 # 需要连续匹配帧数才触发检测 # ===== 全局变量 ===== current_sample_path = None detection_history = [] isGrabbing = False isOpen = False obj_cam_operation = None frame_monitor_thread = None template_matcher_thread = None MV_OK = 0 MV_E_CALLORDER = -2147483647 # ==================== 优化后的质量检测算法 ==================== def enhanced_check_print_quality(sample_image_path, test_image, threshold=0.05): # 不再使用传感器数据调整阈值 adjusted_threshold = threshold try: sample_img_data = np.fromfile(sample_image_path, dtype=np.uint8) sample_image = cv2.imdecode(sample_img_data, cv2.IMREAD_GRAYSCALE) if sample_image is None: logging.error(f"无法解码样本图像: {sample_image_path}") return None, None, None except Exception as e: logging.exception(f"样本图像读取异常: {str(e)}") return None, None, None if len(test_image.shape) == 3: test_image_gray = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY) else: test_image_gray = test_image.copy() sample_image = cv2.GaussianBlur(sample_image, (5, 5), 0) test_image_gray = cv2.GaussianBlur(test_image_gray, (5, 5), 0) try: # 使用更鲁棒的SIFT特征检测器 sift = cv2.SIFT_create() keypoints1, descriptors1 = sift.detectAndCompute(sample_image, None) keypoints2, descriptors2 = sift.detectAndCompute(test_image_gray, None) if descriptors1 is None or descriptors2 is None: logging.warning("无法提取特征描述符,跳过配准") aligned_sample = sample_image else: # 使用FLANN匹配器提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(descriptors1, descriptors2, k=2) # 应用Lowe's比率测试筛选优质匹配 good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) if len(good_matches) > MIN_MATCH_COUNT: src_pts = np.float32([keypoints1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) if H is not None: aligned_sample = cv2.warpPerspective( sample_image, H, (test_image_gray.shape[1], test_image_gray.shape[0]) ) logging.info("图像配准成功,使用配准后样本") else: aligned_sample = sample_image logging.warning("无法计算单应性矩阵,使用原始样本") else: aligned_sample = sample_image logging.warning(f"特征点匹配不足({len(good_matches)}/{MIN_MATCH_COUNT}),跳过图像配准") except Exception as e: logging.error(f"图像配准失败: {str(e)}") aligned_sample = sample_image try: if aligned_sample.shape != test_image_gray.shape: test_image_gray = cv2.resize(test_image_gray, (aligned_sample.shape[1], aligned_sample.shape[0])) except Exception as e: logging.error(f"图像调整大小失败: {str(e)}") return None, None, None try: from skimage.metrics import structural_similarity as compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True, data_range=255 ) except ImportError: from skimage.measure import compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True ) except Exception as e: logging.error(f"SSIM计算失败: {str(e)}") abs_diff = cv2.absdiff(aligned_sample, test_image_gray) ssim_diff = abs_diff.astype(np.float32) / 255.0 ssim_score = 1.0 - np.mean(ssim_diff) ssim_diff = (1 - ssim_diff) * 255 abs_diff = cv2.absdiff(aligned_sample, test_image_gray) combined_diff = cv2.addWeighted(ssim_diff.astype(np.uint8), 0.7, abs_diff, 0.3, 0) _, thresholded = cv2.threshold(combined_diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((3, 3), np.uint8) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_OPEN, kernel) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_CLOSE, kernel) diff_pixels = np.count_nonzero(thresholded) total_pixels = aligned_sample.size diff_ratio = diff_pixels / total_pixels is_qualified = diff_ratio <= adjusted_threshold marked_image = cv2.cvtColor(test_image_gray, cv2.COLOR_GRAY2BGR) marked_image[thresholded == 255] = [0, 0, 255] # 放大缺陷标记 scale_factor = 2.0 # 放大2倍 marked_image = cv2.resize(marked_image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR) labels = skimage.measure.label(thresholded) properties = skimage.measure.regionprops(labels) for prop in properties: if prop.area > 50: y, x = prop.centroid # 根据放大比例调整坐标 x_scaled = int(x * scale_factor) y_scaled = int(y * scale_factor) cv2.putText(marked_image, f"Defect", (x_scaled, y_scaled), cv2.FONT_HERSHEY_SIMPLEX, 0.5 * scale_factor, (0, 255, 255), int(scale_factor)) return is_qualified, diff_ratio, marked_image # ==================== 视觉触发的质量检测流程 ==================== def vision_controlled_check(capture_image=None, match_score=0.0): """修改为接受图像帧和匹配分数""" global current_sample_path, detection_history, template_matcher_thread logging.info("视觉触发质量检测启动") # 如果没有提供图像,使用当前帧 if capture_image is None: frame = obj_cam_operation.get_current_frame() else: frame = capture_image if frame is None: QMessageBox.warning(mainWindow, "错误", "无法获取当前帧图像!", QMessageBox.Ok) return progress = QProgressDialog("正在检测...", "取消", 0, 100, mainWindow) progress.setWindowModality(Qt.WindowModal) progress.setValue(10) try: diff_threshold = mainWindow.sliderDiffThreshold.value() / 100.0 logging.info(f"使用差异度阈值: {diff_threshold}") progress.setValue(30) is_qualified, diff_ratio, marked_image = enhanced_check_print_quality( current_sample_path, frame, threshold=diff_threshold ) progress.setValue(70) if is_qualified is None: QMessageBox.critical(mainWindow, "检测错误", "检测失败,请查看日志", QMessageBox.Ok) return logging.info(f"检测结果: 合格={is_qualified}, 差异={diff_ratio}") progress.setValue(90) update_diff_display(diff_ratio, is_qualified) result_text = f"印花是否合格: {'合格' if is_qualified else '不合格'}\n差异占比: {diff_ratio*100:.2f}%\n阈值: {diff_threshold*100:.2f}%" QMessageBox.information(mainWindow, "检测结果", result_text, QMessageBox.Ok) if marked_image is not None: # 创建可调整大小的窗口 cv2.namedWindow("缺陷标记结果", cv2.WINDOW_NORMAL) cv2.resizeWindow("缺陷标记结果", 800, 600) # 初始大小 cv2.imshow("缺陷标记结果", marked_image) cv2.waitKey(0) cv2.destroyAllWindows() detection_result = { 'timestamp': datetime.now(), 'qualified': is_qualified, 'diff_ratio': diff_ratio, 'threshold': diff_threshold, 'trigger_type': 'vision' if capture_image else 'manual' } detection_history.append(detection_result) update_history_display() progress.setValue(100) # 重置匹配线程的冷却时间 if template_matcher_thread: template_matcher_thread.last_trigger_time = 0 except Exception as e: logging.exception("印花检测失败") QMessageBox.critical(mainWindow, "检测错误", f"检测过程中发生错误: {str(e)}", QMessageBox.Ok) finally: progress.close() # ==================== 相机操作函数 ==================== def open_device(): global deviceList, nSelCamIndex, obj_cam_operation, isOpen, frame_monitor_thread, mainWindow if isOpen: QMessageBox.warning(mainWindow, "Error", '相机已打开!', QMessageBox.Ok) return MV_E_CALLORDER nSelCamIndex = mainWindow.ComboDevices.currentIndex() if nSelCamIndex < 0: QMessageBox.warning(mainWindow, "Error", '请选择相机!', QMessageBox.Ok) return MV_E_CALLORDER # 创建相机控制对象 cam = MvCamera() # 初始化相机操作对象 obj_cam_operation = CameraOperation(cam, deviceList, nSelCamIndex) ret = obj_cam_operation.open_device() if 0 != ret: strError = "打开设备失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) isOpen = False else: set_continue_mode() get_param() isOpen = True enable_controls() # 创建并启动帧监控线程 frame_monitor_thread = FrameMonitorThread(obj_cam_operation) frame_monitor_thread.frame_status.connect(mainWindow.statusBar().showMessage) frame_monitor_thread.start() def start_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.start_grabbing(mainWindow.widgetDisplay.winId()) if ret != 0: strError = "开始取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow , "Error", strError, QMessageBox.Ok) else: isGrabbing = True enable_controls() # 等待第一帧到达 QThread.msleep(500) if not obj_cam_operation.is_frame_available(): QMessageBox.warning(mainWindow, "警告", "开始取流后未接收到帧,请检查相机连接!", QMessageBox.Ok) # 如果启用了自动检测,启动检测线程 if mainWindow.chkContinuousMatch.isChecked(): toggle_template_matching(True) def stop_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.Stop_grabbing() if ret != 0: strError = "停止取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = False enable_controls() # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() def close_device(): global isOpen, isGrabbing, obj_cam_operation, frame_monitor_thread, template_matcher_thread if frame_monitor_thread and frame_monitor_thread.isRunning(): frame_monitor_thread.stop() frame_monitor_thread.wait(2000) # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() template_matcher_thread.wait(2000) template_matcher_thread = None if isOpen and obj_cam_operation: obj_cam_operation.close_device() isOpen = False isGrabbing = False enable_controls() # ==================== 连续帧匹配检测器 ==================== class ContinuousFrameMatcher(QThread): frame_processed = pyqtSignal(np.ndarray, float, bool) # 处理后的帧, 匹配分数, 是否匹配 match_score_updated = pyqtSignal(float) # 匹配分数更新信号 match_success = pyqtSignal(np.ndarray, float) # 匹配成功信号 (帧, 匹配分数) trigger_activated = pyqtSignal(bool) # 新增信号:触发状态变化 def __init__(self, cam_operation, parent=None): super().__init__(parent) self.cam_operation = cam_operation self.running = True self.sample_template = None self.min_match_count = MIN_MATCH_COUNT self.match_threshold = MATCH_THRESHOLD self.sample_kp = None self.sample_des = None self.current_match_score = 0.0 self.last_match_time = 0 self.frame_counter = 0 self.consecutive_fail_count = 0 self.last_trigger_time = 0 # 上次触发时间 self.cool_down = 0.2 # 冷却时间(秒) self.trigger_threshold = 0.5 # 默认触发阈值 self.consecutive_match_count = 0 # 连续匹配帧数计数器 self.min_consecutive_matches = MIN_CONSECUTIVE_MATCHES # 最小连续匹配帧数 # 特征检测器 - 使用SIFT和ORB self.sift = cv2.SIFT_create() self.orb = cv2.ORB_create(nfeatures=2000) self.current_detector = 'sift' # 默认使用SIFT # 特征匹配器 - 使用FLANN提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=10) # 增加树数量 search_params = dict(checks=100) # 增加检查次数 self.flann = cv2.FlannBasedMatcher(index_params, search_params) # 性能监控 self.processing_times = deque(maxlen=100) self.frame_rates = deque(maxlen=100) # 黑白相机优化 self.clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) def calculate_match_score(self, good_matches): """基于匹配质量和数量的综合评分""" if not good_matches: return 0.0 # 质量评分(距离的倒数) quality_score = sum(1.0 / max(m.distance, 0.01) for m in good_matches) max_quality = len(good_matches) * 100.0 # 假设最小距离0.01 # 数量评分(匹配点比例) sample_points = len(self.sample_kp) quantity_score = len(good_matches) / max(sample_points, 1) # 综合评分(质量60% + 数量40%) match_score = (0.6 * (quality_score / max_quality) + 0.4 * quantity_score) return min(1.0, max(0.0, match_score)) def preprocess_image(self, image): """更保守的图像预处理""" if len(image.shape) == 2: # 已经是灰度图,直接返回 return image # 仅转换为灰度图 gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # 可选:轻度直方图均衡化 gray = cv2.equalizeHist(gray) return gray def set_sample(self, sample_img): """设置标准样本并提取特征""" # 保存样本图像 self.sample_img = sample_img # 预处理增强特征 processed_sample = self.preprocess_image(sample_img) # 提取样本特征点 if self.current_detector == 'orb': self.sample_k极光, self.sample_des = self.orb.detectAndCompute(processed_sample, None) else: self.sample_kp, self.sample_des = self.sift.detectAndCompute(processed_sample, None) if self.sample_des is None or len(self.sample_kp) < 100: logging.warning(f"样本特征点不足({len(self.sample_kp)}个),尝试使用ORB") # 尝试使用ORB self.sample_kp, self.sample_des = self.orb.detectAndCompute(processed_sample, None) if self.sample_des is not None and len(self.sample_kp) >= 50: self.current_detector = 'orb' logging.info(f"切换到ORB特征检测器,提取到{len(self.sample_kp)}个特征点") else: return False logging.info(f"样本特征提取成功({self.current_detector}): {len(self.sample_kp)}个关键点") return True def set_threshold(self, threshold): """更新匹配阈值""" self.match_threshold = max(0.0, min(1.0, threshold)) logging.info(f"更新匹配阈值: {self.match_threshold:.2f}") def set_trigger_threshold(self, threshold): """更新触发阈值""" self.trigger_threshold = max(0.0, min(1.0, threshold)) logging.info(f"更新触发阈值: {self.trigger_threshold:.2f}") def switch_detector(self, detector_name): """切换特征检测器""" self.current_detector = detector_name.lower() logging.info(f"切换特征检测器为: {self.current_detector}") def auto_adjust_trigger_threshold(self, brightness): """根据环境亮度动态调整触发阈值""" if brightness < 50: # 暗环境 return 0.4 # 降低阈值要求 elif brightness > 200: # 亮环境 return 0.6 # 提高阈值要求 else: return 0.5 # 正常阈值 def process_frame(self, frame): """处理帧:特征提取、匹配和可视化""" is_matched = False match_score = 0.0 processed_frame = frame.copy() # 检查是否已设置样本 if self.sample_kp is None or self.sample_des is None: return processed_frame, match_score, is_matched # 预处理当前帧 processed_frame = self.preprocess_image(frame) try: # 提取当前帧的特征点 if self.current_detector == 'orb': kp, des = self.orb.detectAndCompute(processed_frame, None) else: kp, des = self.sift.detectAndCompute(processed_frame, None) # 即使特征点不足也继续处理 if des is None or len(kp) < 5: # 更新匹配分数为0 self.current_match_score = 0.0 self.match_score_updated.emit(0.0) return processed_frame, 0.0, False # 匹配特征点 matches = self.flann.knnMatch(self.sample_des, des, k=2) # 应用Lowe's比率测试 good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) # 使用匹配分数计算方法 match_score = self.calculate_match_score(good_matches) # 添加调试日志 logging.debug(f"匹配结果: 样本特征点={len(self.sample_kp)}, " f"当前帧特征点={len(kp)}, " f"良好匹配={len(good_matches)}, " f"匹配分数={match_score:.4f}") # 判断是否匹配成功(用于UI显示) if len(good_matches) >= self.min_match_count and match_score >= self.match_threshold: is_matched = True # 在图像上绘制匹配结果 if len(processed_frame.shape) == 2: processed_frame = cv2.cvtColor(processed_frame, cv2.COLOR_GRAY2BGR) # 修正变量名错误 processed_frame = cv2.drawMatches( self.sample_img, self.sample_kp, processed_frame, kp, good_matches, None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS ) # 在图像上显示匹配分数 cv2.putText(processed_frame, f"Match Score: {match_score:.2f}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) # 更新当前匹配分数 self.current_match_score = match_score self.match_score_updated.emit(match_score) # 根据环境亮度动态调整阈值 brightness = np.mean(processed_frame) self.trigger_threshold = self.auto_adjust_trigger_threshold(brightness) # 连续匹配计数 if match_score >= self.trigger_threshold: self.consecutive_match_count += 1 else: self.consecutive_match_count = 0 # 检查是否达到触发条件 trigger_active = False current_time = time.time() if (self.consecutive_match_count >= self.min_consecutive_matches and (current_time - self.last_trigger_time) > self.cool_down): self.last_trigger_time = current_time trigger_active = True logging.info(f"连续{self.min_consecutive_matches}帧匹配达到触发阈值! 分数: {match_score:.2f}, 触发质量检测") # 发出匹配成功信号 (传递当前帧) self.match_success.emit(frame.copy(), match_score) # 发送触发状态信号 self.trigger_activated.emit(trigger_active) except Exception as e: logging.error(f"帧处理错误: {str(e)}") import traceback traceback.print_exc() return processed_frame, match_score, is_matched def run(self): """主处理循环 - 连续处理每一帧""" logging.info("连续帧匹配线程启动") self.last_match_time = time.time() self.consecutive_fail_count = 0 self.frame_counter = 0 while self.running: start_time = time.time() self.frame_counter += 1 # 检查相机状态 if not self.cam_operation or not self.cam_operation.is_grabbing: if self.consecutive_fail_count % 10 == 0: logging.debug("相机未取流,等待...") time.sleep(0.1) self.consecutive_fail_count += 1 continue # 获取当前帧 frame = self.cam_operation.get_current_frame() if frame is None: self.consecutive_fail_count += 1 if self.consecutive_fail_count % 10 == 0: logging.warning(f"连续{self.consecutive_fail_count}次获取帧失败") time.sleep(0.05) continue self.consecutive_fail_count = 0 try: # 每10帧切换一次检测器(测试用) if self.frame_counter % 10 == 0 and self.current_detector == 'sift': self.switch_detector('orb') elif self.frame_counter % 20 == 0 and self.current_detector == 'orb': self.switch_detector('sift') # 处理帧 processed_frame, match_score, is_matched = self.process_frame(frame) # 发送处理结果 self.frame_processed.emit(processed_frame, match_score, is_matched) except Exception as e: logging.error(f"帧处理错误: {str(e)}") # 控制处理频率 processing_time = time.time() - start_time sleep_time = max(0.01, MIN_FRAME_INTERVAL - processing_time) time.sleep(sleep_time) # 计算并记录帧率 if self.frame_counter % 30 == 0: fps = 30 / (time.time() - start_time + 0.001) logging.debug(f"处理帧率: {fps:.1f} FPS") logging.info("连续帧匹配线程退出") # ==================== 模板匹配控制函数 ==================== def toggle_template_matching(state): global template_matcher_thread, current_sample_path logging.debug(f"切换连续匹配状态: {state}") if state == Qt.Checked and isGrabbing: # 确保已设置样本 if not current_sample_path: logging.warning("尝试启动连续匹配但未设置样本") QMessageBox.warning(mainWindow, "错误", "请先设置标准样本", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if template_matcher_thread is None: logging.info("创建新的连续帧匹配线程") template_matcher_thread = ContinuousFrameMatcher(obj_cam_operation) template_matcher_thread.frame_processed.connect(update_frame_display) template_matcher_thread.match_score_updated.connect(update_match_score_display) template_matcher_thread.trigger_activated.connect( lambda active: mainWindow.update_trigger_indicator(active) ) # 正确连接匹配成功信号到质量检测函数 template_matcher_thread.match_success.connect( lambda frame, score: vision_controlled_check(frame, score) ) # 加载样本图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: logging.error("无法加载标准样本图像") QMessageBox.warning(mainWindow, "错误", "无法加载标准样本图像", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.set_sample(sample_img): logging.warning("标准样本特征不足") QMessageBox.warning(mainWindow, "错误", "标准样本特征不足", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.isRunning(): logging.info("启动连续帧匹配线程") template_matcher_thread.start() elif template_matcher_thread and template_matcher_thread.isRunning(): logging.info("停止连续帧匹配线程") template_matcher_thread.stop() # 重置匹配分数显示 update_match_score_display(0.0) # 重置帧显示 if obj_cam_operation and obj_cam_operation.is_frame_available(): frame = obj_cam_operation.get_current_frame() if frame is not None: display_frame = frame.copy() # 添加状态信息 cv2.putText(display_frame, "Continuous Matching Disabled", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) update_frame_display(display_frame, 0.0, False) # 添加类型转换函数 def numpy_to_qimage(np_array): """将 numpy 数组转换为 QImage""" if np_array is None: return QImage() height, width, channel = np_array.shape bytes_per_line = 3 * width # 确保数据是连续的 if not np_array.flags['C_CONTIGUOUS']: np_array = np.ascontiguousarray(np_array) # 转换 BGR 到 RGB rgb_image = cv2.cvtColor(np_array, cv2.COLOR_BGR2RGB) # 创建 QImage qimg = QImage( rgb_image.data, width, height, bytes_per_line, QImage.Format_RGB888 ) # 复制数据以避免内存问题 return qimg.copy() # 修改 update_frame_display 函数 def update_frame_display(frame, match_score, is_matched): """更新主显示窗口(线程安全)""" # 确保在GUI线程中执行 if QThread.currentThread() != QApplication.instance().thread(): # 转换为 QImage 再传递 qimg = numpy_to_qimage(frame) QMetaObject.invokeMethod( mainWindow, "updateDisplay", Qt.QueuedConnection, Q_ARG(QImage, qimg), Q_ARG(float, match_score), Q_ARG(bool, is_matched) ) return # 如果已经在主线程,直接调用主窗口的更新方法 mainWindow.updateDisplay(frame, match_score, is_matched) def update_match_score_display(score): """更新匹配分数显示""" # 将分数转换为百分比显示 score_percent = score * 100 mainWindow.lblMatchScoreValue.setText(f"{score_percent:.1f}%") # 根据分数设置颜色 if score > 0.8: # 高于80%显示绿色 color = "green" elif score > 0.6: # 60%-80%显示黄色 color = "orange" else: # 低于60%显示红色 color = "red" mainWindow.lblMatchScoreValue.setStyleSheet(f"color: {color}; font-weight: bold;") def update_diff_display(diff_ratio, is_qualified): mainWindow.lblCurrentDiff.setText(f"当前差异度: {diff_ratio*100:.2f}%") if is_qualified: mainWindow.lblDiffStatus.setText("状态: 合格") mainWindow.lblDiffStatus.setStyleSheet("color: green; font-size: 12px;") else: mainWindow.lblDiffStatus.setText("状态: 不合格") mainWindow.lblDiffStatus.setStyleSheet("color: red; font-size: 12px;") def update_diff_threshold(value): mainWindow.lblDiffValue.setText(f"{value}%") def update_sample_display(): global current_sample_path if current_sample_path: mainWindow.lblSamplePath.setText(f"当前样本: {os.path.basename(current_sample_path)}") mainWindow.lblSamplePath.setToolTip(current_sample_path) mainWindow.bnPreviewSample.setEnabled(True) else: mainWindow.lblSamplePath.setText("当前样本: 未设置样本") mainWindow.bnPreviewSample.setEnabled(False) def update_history_display(): global detection_history mainWindow.cbHistory.clear() for i, result in enumerate(detection_history[-10:]): timestamp = result['timestamp'].strftime("%H:%M:%S") status = "合格" if result['qualified'] else "不合格" ratio = f"{result['diff_ratio']*100:.2f}%" trigger = "视觉" if result['trigger_type'] == 'vision' else "手动" mainWindow.cbHistory.addItem(f"[{trigger} {timestamp}] {status} - 差异: {ratio}") def update_match_threshold(value): """更新匹配阈值显示并应用到匹配器""" global template_matcher_thread # 更新UI显示 if mainWindow: mainWindow.lblThresholdValue.setText(f"{value}%") # 如果匹配线程存在,更新其匹配阈值 if template_matcher_thread: # 转换为0-1范围的浮点数 threshold = value / 100.0 template_matcher_thread.set_threshold(threshold) logging.debug(f"更新匹配阈值: {threshold:.2f}") # 新增函数:保存当前帧 def save_current_frame(): if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return frame = obj_cam_operation.get_current_frame() if frame is None: QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"capture_{timestamp}" # 正确写法: file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存标准样本图像", os.path.join(last_dir, default_filename), "BMP Files (*.bmp);;PNG Files (*.png);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return # 确保文件扩展名正确 if not os.path.splitext(file_path)[1]: if "PNG" in selected_filter: file_path += ".png" elif "BMP" in selected_filter: file_path += ".bmp" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" # 保存图像 try: if cv2.imwrite(file_path, frame): QMessageBox.information(mainWindow, "成功", f"当前帧已保存至:\n{file_path}", QMessageBox.Ok) else: raise Exception("OpenCV保存失败") except Exception as e: QMessageBox.critical(mainWindow, "保存错误", f"保存图像时发生错误:\n{str(e)}", QMessageBox.Ok) # ==================== 主窗口类 ==================== class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("布料印花检测系统 - 连续匹配版") self.resize(1200, 800) central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 设备枚举区域 device_layout = QHBoxLayout() self.ComboDevices = QComboBox() self.bnEnum = QPushButton("枚举设备") self.bnOpen = QPushButton("打开设备") self.bnClose = QPushButton("关闭设备") device_layout.addWidget(self.ComboDevices) device_layout.addWidget(self.bnEnum) device_layout.addWidget(self.bnOpen) device_layout.addWidget(self.bnClose) main_layout.addLayout(device_layout) # 取流控制组 self.groupGrab = QGroupBox("取流控制") grab_layout = QHBoxLayout(self.groupGrab) self.bnStart = QPushButton("开始取流") self.bnStop = QPushButton("停止取流") self.radioContinueMode = QRadioButton("连续模式") self.radioTriggerMode = QRadioButton("触发模式") self.bnSoftwareTrigger = QPushButton("软触发") grab_layout.addWidget(self.bnStart) grab_layout.addWidget(self.bnStop) grab_layout.addWidget(self.radioContinueMode) grab_layout.addWidget(self.radioTriggerMode) grab_layout.addWidget(self.bnSoftwareTrigger) main_layout.addWidget(self.groupGrab) # 参数设置组 self.paramgroup = QGroupBox("相机参数") param_layout = QGridLayout(self.paramgroup) self.edtExposureTime = QLineEdit() self.edtGain = QLineEdit() self.edtFrameRate = QLineEdit() self.bnGetParam = QPushButton("获取参数") self.bnSetParam = QPushButton("设置参数") self.bnSaveImage = QPushButton("保存图像") param_layout.addWidget(QLabel("曝光时间:"), 0, 0) param_layout.addWidget(self.edtExposureTime, 0, 1) param_layout.addWidget(self.bnGetParam, 0, 2) param_layout.addWidget(QLabel("增益:"), 1, 0) param_layout.addWidget(self.edtGain, 1, 1) param_layout.addWidget(self.bnSetParam, 1, 2) param_layout.addWidget(QLabel("帧率:"), 2, 0) param_layout.addWidget(self.edtFrameRate, 2, 1) param_layout.addWidget(self.bnSaveImage, 2, 2) main_layout.addWidget(self.paramgroup) # 图像显示区域 self.widgetDisplay = QLabel() self.widgetDisplay.setMinimumSize(640, 480) self.widgetDisplay.setStyleSheet("background-color: black;") self.widgetDisplay.setAlignment(Qt.AlignCenter) self.widgetDisplay.setText("相机预览区域") main_layout.addWidget(self.widgetDisplay, 1) # 创建自定义UI组件 self.setup_custom_ui() # 添加阈值自适应定时器 self.threshold_timer = QTimer() self.threshold_timer.timeout.connect(self.auto_adjust_threshold) self.threshold_timer.start(2000) # 每2秒调整一次 # 触发计数器 self.trigger_count = 0 def auto_adjust_threshold(self): """根据环境亮度自动调整匹配阈值""" if not obj_cam_operation or not isGrabbing: return # 获取当前帧 frame = obj_cam_operation.get_current_frame() if frame is None: return # 处理不同通道数的图像 if len(frame.shape) == 2 or frame.shape[2] == 1: # 已经是灰度图 gray = frame elif frame.shape[2] == 3: # 三通道彩色图 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) elif frame.shape[2] == 4: # 四通道图(带alpha) gray = cv2.cvtColor(frame, cv2.COLOR_BGRA2GRAY) else: # 其他通道数,无法处理 logging.warning(f"无法处理的图像格式: {frame.shape}") return # 计算平均亮度 try: brightness = np.mean(gray) except Exception as e: logging.error(f"计算亮度失败: {str(e)}") return # 根据亮度动态调整阈值 (亮度低时降低阈值要求) if brightness < 50: # 暗环境 new_threshold = 40 # 40% elif brightness > 200: # 亮环境 new_threshold = 65 # 65% else: # 正常环境 new_threshold = 55 # 55% # 更新UI self.sliderThreshold.setValue(new_threshold) self.lblThresholdValue.setText(f"{new_threshold}%") # 更新匹配器阈值 update_match_threshold(new_threshold) # 状态栏显示调整信息 self.statusBar().showMessage(f"亮度: {brightness:.1f}, 自动调整阈值至: {new_threshold}%", 3000) def update_trigger_indicator(self, active): """更新触发指示灯状态""" if active: self.triggerIndicator.setStyleSheet("background-color: green; border-radius: 10px;") else: self.triggerIndicator.setStyleSheet("background-color: gray; border-radius: 10px;") def update_trigger_threshold(self, value): """更新触发阈值显示并应用到匹配器""" self.lblTriggerValue.setText(f"{value}%") threshold = value / 100.0 if template_matcher_thread: template_matcher_thread.set_trigger_threshold(threshold) def increment_trigger_count(self): """增加触发计数器""" self.trigger_count += 1 self.lblTriggerCount.setText(f"触发次数: {self.trigger_count}") @pyqtSlot(QImage, float, bool) def updateDisplay(self, qimg, match_score, is_matched): """线程安全的显示更新方法(只接收 QImage)""" if qimg.isNull(): return # 创建QPixmap并缩放 pixmap = QPixmap.fromImage(qimg) scaled_pixmap = pixmap.scaled( self.widgetDisplay.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) # 更新显示 self.widgetDisplay.setPixmap(scaled_pixmap) self.widgetDisplay.setAlignment(Qt.AlignCenter) def closeEvent(self, event): logging.info("主窗口关闭,执行清理...") close_device() event.accept() def setup_custom_ui(self): # 工具栏 toolbar = self.addToolBar("检测工具") self.bnCheckPrint = QPushButton("手动检测") toolbar.addWidget(self.bnCheckPrint) # 添加分隔标签 toolbar.addWidget(QLabel("图像保存:")) self.bnSaveCurrentFrame = QPushButton("保存当前帧") toolbar.addWidget(self.bnSaveCurrentFrame) self.bnSaveSample = QPushButton("保存标准样本") toolbar.addWidget(self.bnSaveSample) self.bnPreviewSample = QPushButton("预览样本") toolbar.addWidget(self.bnPreviewSample) # 添加触发指示灯 self.triggerIndicator = QLabel() self.triggerIndicator.setFixedSize(20, 20) self.triggerIndicator.setStyleSheet("background-color: gray; border-radius: 10px;") toolbar.addWidget(QLabel("触发状态:")) toolbar.addWidget(self.triggerIndicator) # 添加触发计数器 self.lblTriggerCount = QLabel("触发次数: 0") toolbar.addWidget(self.lblTriggerCount) # 历史记录 toolbar.addWidget(QLabel("历史记录:")) self.cbHistory = QComboBox() self.cbHistory.setMinimumWidth(300) toolbar.addWidget(self.cbHistory) # 状态栏样本路径 self.lblSamplePath = QLabel("当前样本: 未设置样本") self.statusBar().addPermanentWidget(self.lblSamplePath) # 右侧面板 right_panel = QWidget() right_layout = QVBoxLayout(right_panel) right_layout.setContentsMargins(10, 10, 10, 10) # 差异度调整组 diff_group = QGroupBox("差异度调整") diff_layout = QVBoxLayout(diff_group) self.lblDiffThreshold = QLabel("差异度阈值 (0-100%):") self.sliderDiffThreshold = QSlider(Qt.Horizontal) self.sliderDiffThreshold.setRange(0, 100) self.sliderDiffThreshold.setValue(5) self.lblDiffValue = QLabel("5%") self.lblCurrentDiff = QLabel("当前差异度: -") self.lblCurrentDiff.setStyleSheet("font-size: 14px; font-weight: bold;") self.lblDiffStatus = QLabel("状态: 未检测") self.lblDiffStatus.setStyleSheet("font-size: 12px;") diff_layout.addWidget(self.lblDiffThreshold) diff_layout.addWidget(self.sliderDiffThreshold) diff_layout.addWidget(self.lblDiffValue) diff_layout.addWidget(self.lblCurrentDiff) diff_layout.addWidget(self.lblDiffStatus) right_layout.addWidget(diff_group) # ===== 连续匹配面板 ===== match_group = QGroupBox("连续帧匹配") match_layout = QVBoxLayout(match_group) # 样本设置 sample_layout = QHBoxLayout() self.bnSetSample = QPushButton("设置标准样本") self.bnPreviewSample = QPushButton("预览样本") self.lblSampleStatus = QLabel("状态: 未设置样本") sample_layout.addWidget(self.bnSetSample) sample_layout.addWidget(self.bnPreviewSample) sample_layout.addWidget(self.lblSampleStatus) match_layout.addLayout(sample_layout) # 匹配参数 param_layout = QHBoxLayout() self.lblMatchThreshold = QLabel("匹配阈值:") self.sliderThreshold = QSlider(Qt.Horizontal) self.sliderThreshold.setRange(50, 100) self.sliderThreshold.setValue(75) # 降低默认阈值 self.lblThresholdValue = QLabel("75%") param_layout.addWidget(self.lblMatchThreshold) param_layout.addWidget(self.sliderThreshold) param_layout.addWidget(self.lblThresholdValue) match_layout.addLayout(param_layout) # 触发阈值调整 trigger_threshold_layout = QHBoxLayout() self.lblTriggerThreshold = QLabel("触发阈值(%):") self.sliderTriggerThreshold = QSlider(Qt.Horizontal) self.sliderTriggerThreshold.setRange(0, 100) self.sliderTriggerThreshold.setValue(50) # 默认50% self.lblTriggerValue = QLabel("50%") trigger_threshold_layout.addWidget(self.lblTriggerThreshold) trigger_threshold_layout.addWidget(self.sliderTriggerThreshold) trigger_threshold_layout.addWidget(self.lblTriggerValue) match_layout.addLayout(trigger_threshold_layout) # 匹配分数显示 match_score_layout = QHBoxLayout() self.lblMatchScore = QLabel("实时匹配分数:") self.lblMatchScoreValue = QLabel("0.0%") self.lblMatchScoreValue.setStyleSheet("font-weight: bold;") match_score_layout.addWidget(self.lblMatchScore) match_score_layout.addWidget(self.lblMatchScoreValue) match_layout.addLayout(match_score_layout) # 特征检测器选择 detector_layout = QHBoxLayout() self.lblDetector = QLabel("特征检测器:") self.cmbDetector = QComboBox() self.cmbDetector.addItems(["SIFT", "ORB"]) self.cmbDetector.setCurrentIndex(0) detector_layout.addWidget(self.lblDetector) detector_layout.addWidget(self.cmbDetector) match_layout.addLayout(detector_layout) # 连续匹配开关 self.chkContinuousMatch = QCheckBox("启用连续帧匹配") self.chkContinuousMatch.setChecked(False) match_layout.addWidget(self.chkContinuousMatch) right_layout.addWidget(match_group) right_layout.addStretch(1) # 添加调试按钮 self.bnDebug = QPushButton("调试匹配") toolbar.addWidget(self.bnDebug) # 连接信号 self.bnDebug.clicked.connect(self.debug_matching) # 停靠窗口 dock = QDockWidget("检测控制面板", self) dock.setWidget(right_panel) dock.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) self.addDockWidget(Qt.RightDockWidgetArea, dock) def change_detector(self, detector_name): """切换特征检测器""" if template_matcher_thread: template_matcher_thread.switch_detector(detector_name.lower()) QMessageBox.information(self, "检测器切换", f"已切换到 {detector_name} 特征检测器", QMessageBox.Ok) def debug_matching(self): """保存当前帧和匹配结果用于调试""" # ... (保持不变) # ===== 辅助函数 ===== def ToHexStr(num): if not isinstance(num, int): try: num = int(num) except: return f"<非整数:{type(num)}>" chaDic = {10: 'a', 11: 'b', 12: 'c', 13: 'd', 14: 'e', 15: 'f'} hexStr = "" if num < 0: num = num + 2 ** 32 while num >= 16: digit = num % 16 hexStr = chaDic.get(digit, str(digit)) + hexStr num //= 16 hexStr = chaDic.get(num, str(num)) + hexStr return "0x" + hexStr def enum_devices(): global deviceList, obj_cam_operation n_layer_type = ( MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE ) # 创建设备列表 deviceList = MV_CC_DEVICE_INFO_LIST() # 枚举设备 ret = MvCamera.MV_CC_EnumDevices(n_layer_type, deviceList) if ret != MV_OK: error_msg = f"枚举设备失败! 错误码: 0x{ret:x}" logging.error(error_msg) QMessageBox.warning(mainWindow, "错误", error_msg, QMessageBox.Ok) return ret if deviceList.nDeviceNum == 0: QMessageBox.warning(mainWindow, "提示", "未找到任何设备", QMessageBox.Ok) return MV_OK logging.info(f"找到 {deviceList.nDeviceNum} 个设备") # 处理设备信息 devList = [] for i in range(deviceList.nDeviceNum): # 获取设备信息 mvcc_dev_info = ctypes.cast( deviceList.pDeviceInfo[i], ctypes.POINTER(MV_CC_DEVICE_INFO) ).contents # 根据设备类型提取信息 if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: st_gige_info = mvcc_dev_info.SpecialInfo.stGigEInfo ip_addr = ( f"{(st_gige_info.nCurrentIp >> 24) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 16) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 8) & 0xFF}." f"{st_gige_info.nCurrentIp & 0xFF}" ) # 修复:将c_ubyte_Array_16转换为字节串再解码 user_defined_bytes = bytes(st_gige_info.chUserDefinedName) dev_name = f"GigE: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} ({ip_addr})") elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: st_usb_info = mvcc_dev_info.SpecialInfo.stUsb3VInfo serial = bytes(st_usb_info.chSerialNumber).decode('ascii', 'ignore').rstrip('\x00') # 修复:同样处理用户自定义名称 user_defined_bytes = bytes(st_usb_info.chUserDefinedName) dev_name = f"USB: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} (SN: {serial})") else: devList.append(f"[{i}] 未知设备类型: {mvcc_dev_info.nTLayerType}") # 更新UI mainWindow.ComboDevices.clear() mainWindow.ComboDevices.addItems(devList) if devList: mainWindow.ComboDevices.setCurrentIndex(0) mainWindow.statusBar().showMessage(f"找到 {deviceList.nDeviceNum} 个设备", 300) return MV_OK def set_continue_mode(): ret = obj_cam_operation.set_trigger_mode(False) if ret != 0: strError = "设置连续模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(True) mainWindow.radioTriggerMode.setChecked(False) mainWindow.bnSoftwareTrigger.setEnabled(False) def set_software_trigger_mode(): ret = obj_cam_operation.set_trigger_mode(True) if ret != 0: strError = "设置触发模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(False) mainWindow.radioTriggerMode.setChecked(True) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing) def trigger_once(): ret = obj_cam_operation.trigger_once() if ret != 0: strError = "软触发失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) def save_sample_image(): global isGrabbing, obj_cam_operation, current_sample_path if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return # 尝试捕获当前帧 frame = obj_cam_operation.capture_frame() if frame is None: QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return # 确保图像有效 if frame.size == 0 or frame.shape[0] == 0 or frame.shape[1] == 0: QMessageBox.warning(mainWindow, "无效图像", "捕获的图像无效,请检查相机设置!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"sample_{timestamp}" file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存标准样本图像", os.path.join(last_dir, default_filename), "BMP Files (*.bmp);;PNG Files (*.png);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return # 确保文件扩展名正确 file_extension = os.path.splitext(file_path)[1].lower() if not file_extension: if "BMP" in selected_filter: file_path += ".bmp" elif "PNG" in selected_filter: file_path += ".png" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" else: file_path += ".bmp" file_extension = os.path.splitext(file_path)[1].lower() # 创建目录(如果不存在) directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) except OSError as e: QMessageBox.critical(mainWindow, "目录创建错误", f"无法创建目录 {directory}: {str(e)}", QMessageBox.Ok) return # 保存图像 try: # 使用OpenCV保存图像 if not cv2.imwrite(file_path, frame): raise Exception("OpenCV保存失败") # 更新状态 current_sample_path = file_path update_sample_display() settings.setValue("last_save_dir", os.path.dirname(file_path)) # 显示成功消息 QMessageBox.information(mainWindow, "成功", f"标准样本已保存至:\n{file_path}", QMessageBox.Ok) # 更新样本状态 mainWindow.lblSampleStatus.setText("状态: 样本已设置") mainWindow.lblSampleStatus.setStyleSheet("color: green;") except Exception as e: logging.error(f"保存图像失败: {str(e)}") QMessageBox.critical(mainWindow, "保存错误", f"保存图像时发生错误:\n{str(e)}", QMessageBox.Ok) def preview_sample(): global current_sample_path if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(mainWindow, "错误", "请先设置有效的标准样本图像!", QMessageBox.Ok) return try: # 直接使用OpenCV加载图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: raise Exception("无法加载图像") # 显示图像 cv2.namedWindow("标准样本预览", cv2.WINDOW_NORMAL) cv2.resizeWindow("标准样本预览", 800, 600) cv2.imshow("标准样本预览", sample_img) cv2.waitkey(0) cv2.destroyAllWindows() except Exception as e: QMessageBox.warning(mainWindow, "错误", f"预览样本失败: {str(e)}", QMessageBox.Ok) def is_float(str): try: float(str) return True except ValueError: return False def get_param(): try: ret = obj_cam_operation.get_parameters() if ret != MV_OK: strError = "获取参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) else: mainWindow.edtExposureTime.setText("{0:.2f}".format(obj_cam_operation.exposure_time)) mainWindow.edtGain.setText("{0:.2f}".format(obj_cam_operation.gain)) mainWindow.edtFrameRate.setText("{0:.2f}".format(obj_cam_operation.frame_rate)) except Exception as e: error_msg = f"获取参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def set_param(): frame_rate = mainWindow.edtFrameRate.text() exposure = mainWindow.edtExposureTime.text() gain = mainWindow.edtGain.text() if not (is_float(frame_rate) and is_float(exposure) and is_float(gain)): strError = "设置参数失败: 参数必须是有效的浮点数" QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) return MV_E_PARAMETER try: ret = obj_cam_operation.set_param( frame_rate=float(frame_rate), exposure_time=float(exposure), gain=float(gain) ) if ret != MV_OK: strError = "设置参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) except Exception as e: error_msg = f"设置参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def enable_controls(): global isGrabbing, isOpen mainWindow.groupGrab.setEnabled(isOpen) mainWindow.paramgroup.setEnabled(isOpen) mainWindow.bnOpen.setEnabled(not isOpen) mainWindow.bnClose.setEnabled(isOpen) mainWindow.bnStart.setEnabled(isOpen and (not isGrabbing)) mainWindow.bnStop.setEnabled(isOpen and isGrabbing) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing and mainWindow.radioTriggerMode.isChecked()) mainWindow.bnSaveImage.setEnabled(isOpen and isGrabbing) mainWindow.bnCheckPrint.setEnabled(isOpen and isGrabbing) mainWindow.bnSaveSample.setEnabled(isOpen and isGrabbing) mainWindow.bnPreviewSample.setEnabled(bool(current_sample_path)) mainWindow.bnSaveCurrentFrame.setEnabled(isOpen and isGrabbing) # 连续匹配控制 mainWindow.chkContinuousMatch.setEnabled(bool(current_sample_path) and isGrabbing) class FrameMonitorThread(QThread): frame_status = pyqtSignal(str) # 用于发送状态消息的信号 def __init__(self, cam_operation): super().__init__() self.cam_operation = cam_operation self.running = True self.frame_count = 0 self.last_time = time.time() def run(self): """监控相机帧状态的主循环""" while self.running: try: if self.cam_operation and self.cam_operation.is_grabbing: # 获取帧统计信息 frame_info = self.get_frame_info() if frame_info: fps = frame_info.get('fps', 0) dropped = frame_info.get('dropped', 0) status = f"FPS: {fps:.1f} | 丢帧: {dropped}" self.frame_status.emit(status) else: self.frame_status.emit("取流中...") else: self.frame_status.emit("相机未取流") except Exception as e: self.frame_status.emit(f"监控错误: {str(e)}") # 每500ms检查一次 QThread.msleep(500) def stop(self): """停止监控线程""" self.running = False self.wait(1000) # 等待线程结束 def calculate_fps(self): """计算当前帧率""" current_time = time.time() elapsed = current_time - self.last_time if elapsed > 0: fps = self.frame_count / elapsed self.frame_count = 0 self.last_time = current_time return fps return 0 def get_frame_info(self): """获取帧信息""" try: # 更新帧计数 self.frame_count += 1 # 返回帧信息 return { 'fps': self.calculate_fps(), 'dropped': 0 # 实际应用中需要从相机获取真实丢帧数 } except Exception as e: logging.error(f"获取帧信息失败: {str(e)}") return None # ===== 主程序入口 ===== if __name__ == "__main__": # 配置日志系统 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("cloth_inspection_continuous.log"), logging.StreamHandler() ] ) logging.info("布料印花检测系统(连续匹配版)启动") app = QApplication(sys.argv) mainWindow = MainWindow() # 信号连接 mainWindow.sliderThreshold.valueChanged.connect(update_match_threshold) mainWindow.sliderTriggerThreshold.valueChanged.connect( mainWindow.update_trigger_threshold ) mainWindow.cmbDetector.currentTextChanged.connect( mainWindow.change_detector ) # 其他信号连接 mainWindow.sliderDiffThreshold.valueChanged.connect(update_diff_threshold) mainWindow.bnCheckPrint.clicked.connect(lambda: vision_controlled_check(None)) mainWindow.bnSaveSample.clicked.connect(save_sample_image) mainWindow.bnPreviewSample.clicked.connect(preview_sample) mainWindow.bnEnum.clicked.connect(enum_devices) mainWindow.bnOpen.clicked.connect(open_device) mainWindow.bnClose.clicked.connect(close_device) mainWindow.bnStart.clicked.connect(start_grabbing) mainWindow.bnStop.clicked.connect(stop_grabbing) mainWindow.bnSoftwareTrigger.clicked.connect(trigger_once) mainWindow.radioTriggerMode.clicked.connect(set_software_trigger_mode) mainWindow.radioContinueMode.clicked.connect(set_continue_mode) mainWindow.bnGetParam.clicked.connect(get_param) mainWindow.bnSetParam.clicked.connect(set_param) mainWindow.bnSaveImage.clicked.connect(save_current_frame) mainWindow.bnSaveCurrentFrame.clicked.connect(save_current_frame) # 连续匹配信号连接 mainWindow.sliderThreshold.valueChanged.connect(update_match_score_display) mainWindow.chkContinuousMatch.stateChanged.connect(toggle_template_matching) # 连接触发计数信号 if template_matcher_thread: template_matcher_thread.match_success.connect(mainWindow.increment_trigger_count) # 设置默认触发阈值 mainWindow.sliderTriggerThreshold.setValue(40) # 40% mainWindow.update_trigger_threshold(40) mainWindow.show() app.exec_() close_device() sys.exit()

这个程序运行后我点了调试运行之后显示特征点都一样为什么并没有自动触发检测而且我这个以后肯定是要它自动匹配好,自动质量检测的 # -*- coding: utf-8 -*- import sys import os import cv2 import numpy as np import math import time import logging import threading from collections import deque from PyQt5.QtWidgets import ( QApplication, QMainWindow, QPushButton, QWidget, QVBoxLayout, QHBoxLayout, QMessageBox, QLabel, QFileDialog, QToolBox, QComboBox, QStatusBar, QGroupBox, QSlider, QDockWidget, QProgressDialog, QLineEdit, QRadioButton, QGridLayout, QSpinBox, QCheckBox, QDialog, QDialogButtonBox, QDoubleSpinBox, QProgressBar, ) from PyQt5.QtCore import QRect, Qt, QSettings, QThread, pyqtSignal, QTimer, QMetaObject, pyqtSlot,Q_ARG from PyQt5.QtGui import QImage, QPixmap from CamOperation_class import CameraOperation from MvCameraControl_class import * import ctypes from ctypes import cast, POINTER from datetime import datetime import skimage import platform from CameraParams_header import ( MV_GIGE_DEVICE, MV_USB_DEVICE, MV_GENTL_CAMERALINK_DEVICE, MV_GENTL_CXP_DEVICE, MV_GENTL_XOF_DEVICE ) # ===== 全局配置 ===== # 模板匹配参数 MATCH_THRESHOLD = 0.75 # 降低匹配置信度阈值以提高灵敏度 MIN_MATCH_COUNT = 10 # 最小匹配特征点数量 MIN_FRAME_INTERVAL = 0.1 # 最小检测间隔(秒) # ===== 全局变量 ===== current_sample_path = "" detection_history = [] isGrabbing = False isOpen = False obj_cam_operation = None frame_monitor_thread = None template_matcher_thread = None MV_OK = 0 MV_E_CALLORDER = -2147483647 # ==================== 优化后的质量检测算法 ==================== def enhanced_check_print_quality(sample_image_path, test_image, threshold=0.05): # 不再使用传感器数据调整阈值 adjusted_threshold = threshold try: sample_img_data = np.fromfile(sample_image_path, dtype=np.uint8) sample_image = cv2.imdecode(sample_img_data, cv2.IMREAD_GRAYSCALE) if sample_image is None: logging.error(f"无法解码样本图像: {sample_image_path}") return None, None, None except Exception as e: logging.exception(f"样本图像读取异常: {str(e)}") return None, None, None if len(test_image.shape) == 3: test_image_gray = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY) else: test_image_gray = test_image.copy() sample_image = cv2.GaussianBlur(sample_image, (5, 5), 0) test_image_gray = cv2.GaussianBlur(test_image_gray, (5, 5), 0) try: # 使用更鲁棒的SIFT特征检测器 sift = cv2.SIFT_create() keypoints1, descriptors1 = sift.detectAndCompute(sample_image, None) keypoints2, descriptors2 = sift.detectAndCompute(test_image_gray, None) if descriptors1 is None or descriptors2 is None: logging.warning("无法提取特征描述符,跳过配准") aligned_sample = sample_image else: # 使用FLANN匹配器提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(descriptors1, descriptors2, k=2) # 应用Lowe's比率测试筛选优质匹配 good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) if len(good_matches) > MIN_MATCH_COUNT: src_pts = np.float32([keypoints1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) if H is not None: aligned_sample = cv2.warpPerspective( sample_image, H, (test_image_gray.shape[1], test_image_gray.shape[0]) ) logging.info("图像配准成功,使用配准后样本") else: aligned_sample = sample_image logging.warning("无法计算单应性矩阵,使用原始样本") else: aligned_sample = sample_image logging.warning(f"特征点匹配不足({len(good_matches)}/{MIN_MATCH_COUNT}),跳过图像配准") except Exception as e: logging.error(f"图像配准失败: {str(e)}") aligned_sample = sample_image try: if aligned_sample.shape != test_image_gray.shape: test_image_gray = cv2.resize(test_image_gray, (aligned_sample.shape[1], aligned_sample.shape[0])) except Exception as e: logging.error(f"图像调整大小失败: {str(e)}") return None, None, None try: from skimage.metrics import structural_similarity as compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True, data_range=255 ) except ImportError: from skimage.measure import compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True ) except Exception as e: logging.error(f"SSIM计算失败: {str(e)}") abs_diff = cv2.absdiff(aligned_sample, test_image_gray) ssim_diff = abs_diff.astype(np.float32) / 255.0 ssim_score = 1.0 - np.mean(ssim_diff) ssim_diff = (1 - ssim_diff) * 255 abs_diff = cv2.absdiff(aligned_sample, test_image_gray) combined_diff = cv2.addWeighted(ssim_diff.astype(np.uint8), 0.7, abs_diff, 0.3, 0) _, thresholded = cv2.threshold(combined_diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((3, 3), np.uint8) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_OPEN, kernel) threshold极光 = cv2.morphologyEx(thresholded, cv2.MORPH_CLOSE, kernel) diff_pixels = np.count_nonzero(thresholded) total_pixels = aligned_sample.size diff_ratio = diff_pixels / total_pixels is_qualified = diff_ratio <= adjusted_threshold marked_image = cv2.cvtColor(test_image_gray, cv2.COLOR_GRAY2BGR) marked_image[thresholded == 255] = [0, 0, 255] # 放大缺陷标记 scale_factor = 2.0 # 放大2倍 marked_image = cv2.resize(marked_image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR) labels = skimage.measure.label(thresholded) properties = skimage.measure.regionprops(labels) for prop in properties: if prop.area > 50: y, x = prop.centroid # 根据放大比例调整坐标 x_scaled = int(x * scale_factor) y_scaled = int(y * scale_factor) cv2.putText(marked_image, f"Defect", (x_scaled, y_scaled), cv2.FONT_HERSHEY_SIMPLEX, 0.5 * scale_factor, (0, 255, 255), int(scale_factor)) return is_qualified, diff_ratio, marked_image # ==================== 视觉触发的质量检测流程 ==================== def vision_controlled_check(capture_image=None, match_score=0.0): """修改为接受图像帧和匹配分数""" global current_sample_path, detection_history logging.info("视觉触发质量检测启动") # 如果没有提供图像,使用当前帧 if capture_image is None: frame = obj_cam_operation.get_current_frame() else: frame = capture_image if frame is None: QMessageBox.warning(mainWindow, "错误", "无法获取当前帧图像!", QMessageBox.Ok) return progress = QProgressDialog("正在检测...", "取消", 0, 100, mainWindow) progress.setWindowModality(Qt.WindowModal) progress.setValue(10) try: diff_threshold = mainWindow.sliderDiffThreshold.value() / 100.0 logging.info(f"使用差异度阈值: {diff_threshold}") progress.setValue(30) is_qualified, diff_ratio, marked_image = enhanced_check_print_quality( current_sample_path, frame, threshold=diff_threshold ) progress.setValue(70) if is_qualified is None: QMessageBox.critical(mainWindow, "检测错误", "检测失败,请极光日志", QMessageBox.Ok) return logging.info(f"检测结果: 合格={is_qualified}, 差异={diff_ratio}") progress.setValue(90) update_diff_display(diff_ratio, is_qualified) result_text = f"印花是否合格: {'合格' if is_qualified else '不合格'}\n差异占比: {diff_ratio*100:.2f}%\n阈值: {diff_threshold*100:.2f}%" QMessageBox.information(mainWindow, "检测结果", result_text, QMessageBox.Ok) if marked_image is not None: # 创建可调整大小的窗口 cv2.namedWindow("缺陷标记结果", cv2.WINDOW_NORMAL) cv2.resizeWindow("缺陷标记结果", 800, 600) # 初始大小 cv2.imshow("缺陷标记结果", marked_image) cv2.waitKey(0) cv2.destroyAllWindows() detection_result = { 'timestamp': datetime.now(), 'qualified': is_qualified, 'diff_ratio': diff_ratio, 'threshold': diff_threshold, 'trigger_type': 'vision' if capture_image else 'manual' } detection_history.append(detection_result) update_history_display() progress.setValue(100) except Exception as e: logging.exception("印花检测失败") QMessageBox.critical(mainWindow, "检测错误", f"检测过程中发生错误: {str(e)}", QMessageBox.Ok) finally: progress.close() # ==================== 相机操作函数 ==================== def open_device(): global deviceList, nSelCamIndex, obj_cam_operation, isOpen, frame_monitor_thread, mainWindow if isOpen: QMessageBox.warning(mainWindow, "Error", '相机已打开!', QMessageBox.Ok) return MV_E_CALLORDER nSelCamIndex = mainWindow.ComboDevices.currentIndex() if nSelCamIndex < 0: QMessageBox.warning(mainWindow, "Error", '请选择相机!', QMessageBox.Ok) return MV_E_CALLORDER # 创建相机控制对象 cam = MvCamera() # 初始化相机操作对象 obj_cam_operation = CameraOperation(cam, deviceList, nSelCamIndex) ret = obj_cam_operation.open_device() if 0 != ret: strError = "打开设备失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) isOpen = False else: set_continue_mode() get_param() isOpen = True enable_controls() # 创建并启动帧监控线程 frame_monitor_thread = FrameMonitorThread(obj_cam_operation) frame_monitor_thread.frame_status.connect(mainWindow.statusBar().showMessage) frame_monitor_thread.start() def start_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.start_grabbing(mainWindow.widgetDisplay.winId()) if ret != 0: strError = "开始取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = True enable_controls() # 等待第一帧到达 QThread.msleep(500) if not obj_cam_operation.is_frame_available(): QMessageBox.warning(mainWindow, "警告", "开始取流后未接收到帧,请检查相机连接!", QMessageBox.Ok) # 如果启用了自动检测,启动检测线程 if mainWindow.chkContinuousMatch.isChecked(): toggle_template_matching(True) def stop_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.Stop_grabbing() if ret != 0: strError = "停止取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = False enable_controls() # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() def close_device(): global isOpen, isGrabbing, obj_cam_operation, frame_monitor_thread, template_matcher_thread if frame_monitor_thread and frame_monitor_thread.isRunning(): frame_monitor_thread.stop() frame_monitor_thread.wait(2000) # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() template_matcher_thread.wait(2000) template_matcher_thread = None if isOpen and obj_cam_operation: obj_cam_operation.close_device() isOpen = False isGrabbing = False enable_controls() # ==================== 连续帧匹配检测器 ==================== class ContinuousFrameMatcher(QThread): frame_processed = pyqtSignal(np.ndarray, float, bool) # 处理后的帧, 匹配分数, 是否匹配 match_score_updated = pyqtSignal(float) # 匹配分数更新信号 match_success = pyqtSignal(np.ndarray, float) # 匹配成功信号 (帧, 匹配分数) trigger_activated = pyqtSignal(bool) # 新增信号:触发状态变化 def __init__(self, cam_operation, parent=None): super().__init__(parent) self.cam_operation = cam_operation self.running = True self.sample_template = None self.min_match_count = MIN_MATCH_COUNT self.match_threshold = MATCH_THRESHOLD self.sample_kp = None self.sample_des = None self.current_match_score = 0.0 self.last_match_time = 0 self.frame_counter = 0 self.consecutive_fail_count = 0 self.last_trigger_time = 0 # 上次触发时间 self.cool_down = 0.2 # 冷却时间(秒) self.trigger_threshold = 0.5 # 默认触发阈值 # 特征检测器 - 使用SIFT self.sift = cv2.SIFT_create() # 特征匹配器 - 使用FLANN提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) self.flann = cv2.FlannBasedMatcher(index_params, search_params) # 性能监控 self.processing_times = deque(maxlen=100) self.frame_rates = deque(maxlen=100) # 黑白相机优化 self.clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) def calculate_match_score(self, good_matches): """更合理的匹配分数计算方法""" if not good_matches: return 0.0 # 使用匹配质量(距离的倒数)计算分数 total_quality = 0.0 for match in good_matches: # 避免除零错误 distance = max(match.distance, 1e-5) total_quality += 1.0 / distance # 归一化处理 max_quality = len(good_matches) * (1.0 / 0.01) # 假设最小距离0.01 match_score = min(1.0, max(0.0, total_quality / max_quality)) return match_score def preprocess_image(self, image): """更保守的图像预处理""" if len(image.shape) == 2: # 已经是灰度图,直接返回 return image # 仅转换为灰度图 gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # 可选:轻度直方图均衡化 gray = cv2.equalizeHist(gray) return gray def set_sample(self, sample_img): """设置标准样本并提取特征""" # 保存样本图像 self.sample_img = sample_img # 预处理增强特征 processed_sample = self.preprocess_image(sample_img) # 提取样本特征点 self.sample_kp, self.sample_des = self.sift.detectAndCompute(processed_sample, None) if self.sample_des is None or len(self.sample_kp) < 100: # 增加最小特征点要求 logging.warning(f"样本特征点不足({len(self.sample_kp)}个),需要至少100个") return False logging.info(f"样本特征提取成功: {len(self.sample_kp)}个关键点") return True def set_threshold(self, threshold): """更新匹配阈值""" self.match_threshold = max(0.0, min(1.0, threshold)) logging.info(f"更新匹配阈值: {self.match_threshold:.2f}") def set_trigger_threshold(self, threshold): """更新触发阈值""" self.trigger_threshold = max(0.0, min(1.0, threshold)) logging.info(f"更新触发阈值: {self.trigger_threshold:.2f}") def process_frame(self, frame): """处理帧:特征提取、匹配和可视化""" is_matched = False match_score = 0.0 processed_frame = frame.copy() # 检查是否已设置样本 if self.sample_kp is None or self.sample_des is None: return processed_frame, match_score, is_matched # 预处理当前帧 processed_frame = self.preprocess_image(frame) # 转换为灰度图像用于特征提取 gray_frame = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2GRAY) try: # 提取当前帧的特征点 kp, des = self.sift.detectAndCompute(gray_frame, None) # 即使特征点不足也继续处理 if des is None or len(kp) < 5: # 更新匹配分数为0 self.current_match_score = 0.0 self.match_score_updated.emit(0.0) return processed_frame, 0.0, False # 匹配特征点 matches = self.flann.knnMatch(self.sample_des, des, k=2) # 应用Lowe's比率测试 good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) # 使用更科学的匹配分数计算方法 match_score = self.calculate_match_score(good_matches) # 添加调试日志 logging.debug(f"匹配结果: 样本特征点={len(self.sample_kp)}, " f"当前帧特征点={len(kp)}, " f"良好匹配={len(good_matches)}, " f"匹配分数={match_score:.4f}") # 判断是否匹配成功(用于UI显示) if len(good_matches) >= self.min_match_count and match_score >= self.match_threshold: is_matched = True # 在图像上绘制匹配结果 if len(gray_frame.shape) == 2: processed_frame = cv2.cvtColor(gray_frame, cv2.COLOR_GRAY2BGR) # 绘制匹配点 processed_frame = cv2.drawMatches( self.sample_img, self.s极光, processed_frame, kp, good_matches, None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS ) # 在图像上显示匹配分数 cv2.putText(processed_frame, f"Match Score: {match_score:.2f}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) # 更新当前匹配分数 self.current_match_score = match_score self.match_score_updated.emit(match_score) # 检查是否达到触发条件(50%以上)并且超过冷却时间 trigger_active = False current_time = time.time() if match_score >= self.trigger_threshold and (current_time - self.last_trigger_time) > self.cool_down: self.last_trigger_time = current_time trigger_active = True logging.info(f"匹配分数达到触发阈值! 分数: {match_score:.2f}, 触发质量检测") # 发出匹配成功信号 (传递当前帧) self.match_success.emit(frame.copy(), match_score) # 发送触发状态信号 self.trigger_activated.emit(trigger_active) except Exception as e: logging.error(f"帧处理错误: {str(e)}") return processed_frame, match_score, is_matched def run(self): """主处理循环 - 连续处理每一帧""" logging.info("连续帧匹配线程启动") self.last_match_time = time.time() self.consecutive_fail_count = 0 while self.running: start_time = time.time() # 检查相机状态 if not self.cam_operation or not self.cam_operation.is_grabbing: if self.consecutive_fail_count % 10 == 0: logging.debug("相机未取流,等待...") time.sleep(0.1) self.consecutive_fail_count += 1 continue # 获取当前帧 frame = self.cam_operation.get_current_frame() if frame is None: self.consecutive_fail_count += 1 if self.consecutive_fail_count % 10 == 0: logging.warning(f"连续{self.consecutive_fail_count}次获取帧失败") time.sleep(0.05) continue self.consecutive_fail_count = 0 try: # 处理帧 processed_frame, match_score, is_matched = self.process_frame(frame) # 发送处理结果 self.frame_processed.emit(processed_frame, match_score, is_matched) except Exception as e: logging.error(f"帧处理错误: {str(e)}") # 控制处理频率 processing_time = time.time() - start_time sleep_time = max(0.01, MIN_FRAME_INTERVAL - processing_time) time.sleep(sleep_time) logging.info("连续帧匹配线程退出") # ==================== 模板匹配控制函数 ==================== def toggle_template_matching(state): global template_matcher_thread, current_sample_path logging.debug(f"切换连续匹配状态: {state}") if state == Qt.Checked and isGrabbing: # 确保已设置样本 if not current_sample_path: logging.warning("尝试启动连续匹配但未设置样本") QMessageBox.warning(mainWindow, "错误", "请先设置标准样本", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if template_matcher_thread is None: logging.info("创建新的连续帧匹配线程") template_matcher_thread = ContinuousFrameMatcher(obj_cam_operation) template_matcher_thread.frame_processed.connect(update_frame_display) template_matcher_thread.match_score_updated.connect(update_match_score_display) template_matcher_thread.trigger_activated.connect( lambda active: mainWindow.update_trigger_indicator(active) ) # 正确连接匹配成功信号到质量检测函数 template_matcher_thread.match_success.connect( lambda frame, score: vision_controlled_check(frame, score) ) # 加载样本图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: logging.error("无法加载标准样本图像") QMessageBox.warning(mainWindow, "错误", "无法加载标准样本图像", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.set_sample(sample_img): logging.warning("标准样本特征不足") QMessageBox.warning(mainWindow, "错误", "标准样本特征不足", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.isRunning(): logging.info("启动连续帧匹配线程") template_matcher_thread.start() elif template_matcher_thread and template_matcher_thread.isRunning(): logging.info("停止连续帧匹配线程") template_matcher_thread.stop() # 重置匹配分数显示 update_match_score_display(0.0) # 重置帧显示 if obj_cam_operation and obj_cam_operation.is_frame_available(): frame = obj_cam_operation.get_current_frame() if frame is not None: display_frame = frame.copy() # 添加状态信息 cv2.putText(display_frame, "Continuous Matching Disabled", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) update_frame_display(display_frame, 0.0, False) # 添加类型转换函数 def numpy_to_qimage(np_array): """将 numpy 数组转换为 QImage""" if np_array is None: return QImage() height, width, channel = np_array.shape bytes_per_line = 3 * width # 确保数据是连续的 if not np_array.flags['C_CONTIGUOUS']: np_array = np.ascontiguousarray(np_array) # 转换 BGR 到 RGB rgb_image = cv2.cvtColor(np_array, cv2.COLOR_BGR2RGB) # 创建 QImage qimg = QImage( rgb_image.data, width, height, bytes_per_line, QImage.Format_RGB888 ) # 复制数据以避免内存问题 return qimg.copy() # 修改 update_frame_display 函数 def update_frame_display(frame, match_score, is_matched): """更新主显示窗口(线程安全)""" # 确保在GUI线程中执行 if QThread.currentThread() != QApplication.instance().thread(): # 转换为 QImage 再传递 qimg = numpy_to_qimage(frame) QMetaObject.invokeMethod( mainWindow, "updateDisplay", Qt.QueuedConnection, Q_ARG(QImage, qimg), Q_ARG(float, match_score), Q_ARG(bool, is_matched) ) return # 如果已经在主线程,直接调用主窗口的更新方法 mainWindow.updateDisplay(frame, match_score, is_matched) def update_match_score_display(score): """更新匹配分数显示""" # 将分数转换为百分比显示 score_percent = score * 100 mainWindow.lblMatchScoreValue.setText(f"{score_percent:.1f}%") # 根据分数设置颜色 if score > 0.8: # 高于80%显示绿色 color = "green" elif score > 0.6: # 60%-80%显示黄色 color = "orange" else: # 低于60%显示红色 color = "red" mainWindow.lblMatchScoreValue.setStyleSheet(f"color: {color}; font-weight: bold;") def update_diff_display(diff_ratio, is_qualified): mainWindow.lblCurrentDiff.setText(f"当前差异度: {diff_ratio*100:.2f}%") if is_qualified: mainWindow.lblDiffStatus.setText("状态: 合格") mainWindow.lblDiffStatus.setStyleSheet("color: green; font-size: 12px;") else: mainWindow.lblDiffStatus.setText("状态: 不合格") mainWindow.lblDiffStatus.setStyleSheet("color: red; font-size: 12px;") def update_diff_threshold(value): mainWindow.lblDiffValue.setText(f"{value}%") def update_sample_display(): global current_sample_path if current_sample_path: mainWindow.lblSamplePath.setText(f"当前样本: {os.path.basename(current_sample_path)}") mainWindow.lblSamplePath.setToolTip(current_sample_path) mainWindow.bnPreviewSample.setEnabled(True) else: mainWindow.lblSamplePath.setText("当前样本: 未设置样本") mainWindow.bnPreviewSample.setEnabled(False) def update_history_display(): global detection_history mainWindow.cbHistory.clear() for i, result in enumerate(detection_history[-10:]): timestamp = result['timestamp'].strftime("%H:%M:%S") status = "合格" if result['qualified'] else "不合格" ratio = f"{result['diff_ratio']*100:.2f}%" trigger = "视觉" if result['trigger_type'] == 'vision' else "手动" mainWindow.cbHistory.addItem(f"[{trigger} {timestamp}] {status} - 差异: {ratio}") def update_match_threshold(value): """更新匹配阈值显示并应用到匹配器""" global template_matcher_thread # 更新UI显示 if mainWindow: mainWindow.lblThresholdValue.setText(f"{value}%") # 如果匹配线程存在,更新其匹配阈值 if template_matcher_thread: # 转换为0-1范围的浮点数 threshold = value / 100.0 template_matcher_thread.set_threshold(threshold) logging.debug(f"更新匹配阈值: {threshold:.2f}") # 新增函数:保存当前帧 def save_current_frame(): if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return frame = obj_cam_operation.get_current_frame() if frame is None: QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"capture_{timestamp}" file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存当前帧", os.path.join(last_dir, default_filename), "PNG Files (*.png);;BMP Files (*.bmp);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return # 确保文件扩展名正确 if not os.path.splitext(file_path)[1]: if "PNG" in selected_filter: file_path += ".png" elif "BMP" in selected_filter: file_path += ".bmp" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" # 保存图像 try: if cv2.imwrite(file_path, frame): QMessageBox.information(mainWindow, "成功", f"当前帧已保存至:\n{file_path}", QMessageBox.Ok) else: raise Exception("OpenCV保存失败") except Exception as e: QMessageBox.critical(mainWindow, "保存错误", f"保存图像时发生错误:\n{str(e)}", QMessageBox.Ok) # ==================== 主窗口类 ==================== class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("布料印花检测系统 - 连续匹配版") self.resize(1200, 800) central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 设备枚举区域 device_layout = QHBoxLayout() self.ComboDevices = QComboBox() self.bnEnum = QPushButton("枚举设备") self.bnOpen = QPushButton("打开设备") self.bnClose = QPushButton("关闭设备") device_layout.addWidget(self.ComboDevices) device_layout.addWidget(self.bnEnum) device_layout.addWidget(self.bnOpen) device_layout.addWidget(self.bnClose) main_layout.addLayout(device_layout) # 取流控制组 self.groupGrab = QGroupBox("取流控制") grab_layout = QHBoxLayout(self.groupGrab) self.bnStart = QPushButton("开始取流") self.bnStop = QPushButton("停止取流") self.radioContinueMode = QRadioButton("连续模式") self.radioTriggerMode = QRadioButton("触发模式") self.bnSoftwareTrigger = QPushButton("软触发") grab_layout.addWidget(self.bnStart) grab_layout.addWidget(self.bnStop) grab_layout.addWidget(self.radioContinueMode) grab_layout.addWidget(self.radioTriggerMode) grab_layout.addWidget(self.bnSoftwareTrigger) main_layout.addWidget(self.groupGrab) # 参数设置组 self.paramgroup = QGroupBox("相机参数") param_layout = QGridLayout(self.paramgroup) self.edtExposureTime = QLineEdit() self.edtGain = QLineEdit() self.edtFrameRate = QLineEdit() self.bnGetParam = QPushButton("获取参数") self.bnSetParam = QPushButton("设置参数") self.bnSaveImage = QPushButton("保存图像") param_layout.addWidget(QLabel("曝光时间:"), 0, 0) param_layout.addWidget(self.edtExposureTime, 0, 1) param_layout.addWidget(self.bnGetParam, 0, 2) param_layout.addWidget(QLabel("增益:"), 1, 0) param_layout.addWidget(self.edtGain, 1, 1) param_layout.addWidget(self.bnSetParam, 1, 2) param_layout.addWidget(QLabel("帧率:"), 2, 0) param_layout.addWidget(self.edtFrameRate, 2, 1) param_layout.addWidget(self.bnSaveImage, 2, 2) main_layout.addWidget(self.paramgroup) # 图像显示区域 self.widgetDisplay = QLabel() self.widgetDisplay.setMinimumSize(640, 480) self.widgetDisplay.setStyleSheet("background-color: black;") self.widgetDisplay.setAlignment(Qt.AlignCenter) self.widgetDisplay.setText("相机预览区域") main_layout.addWidget(self.widgetDisplay, 1) # 创建自定义UI组件 self.setup_custom_ui() # 添加阈值自适应定时器 self.threshold_timer = QTimer() self.threshold_timer.timeout.connect(self.auto_adjust_threshold) self.threshold_timer.start(2000) # 每2秒调整一次 def auto_adjust_threshold(self): """根据环境亮度自动调整匹配阈值""" if not obj_cam_operation or not isGrabbing: return # 获取当前帧 frame = obj_cam_operation.get_current_frame() if frame is None: return # 处理不同通道数的图像 if len(frame.shape) == 2 or frame.shape[2] == 1: # 已经是灰度图 gray = frame elif frame.shape[2] == 3: # 三通道彩色图 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) elif frame.shape[2] == 4: # 四通道图(带alpha) gray = cv2.cvtColor(frame, cv2.COLOR_BGRA2GRAY) else: # 其他通道数,无法处理 logging.warning(f"无法处理的图像格式: {frame.shape}") return # 计算平均亮度 try: brightness = np.mean(gray) except Exception as e: logging.error(f"计算亮度失败: {str(e)}") return # 根据亮度动态调整阈值 (亮度低时降低阈值要求) if brightness < 50: # 暗环境 new_threshold = 40 # 40% elif brightness > 200: # 亮环境 new_threshold = 65 # 65% else: # 正常环境 new_threshold = 55 # 55% # 更新UI self.sliderThreshold.setValue(new_threshold) self.lblThresholdValue.setText(f"{new_threshold}%") # 更新匹配器阈值 update_match_threshold(new_threshold) # 状态栏显示调整信息 self.statusBar().showMessage(f"亮度: {brightness:.1f}, 自动调整阈值至: {new_threshold}%", 3000) def update_trigger_indicator(self, active): """更新触发指示灯状态""" if active: self.triggerIndicator.setStyleSheet("background-color: green; border-radius: 10px;") else: self.triggerIndicator.setStyleSheet("background-color: gray; border-radius: 10px;") def update_trigger_threshold(self, value): """更新触发阈值显示并应用到匹配器""" self.lblTriggerValue.setText(f"{value}%") threshold = value / 100.0 if template_matcher_thread: template_matcher_thread.set_trigger_threshold(threshold) def setup_custom_ui(self): # 工具栏 toolbar = self.addToolBar("检测工具") self.bnCheckPrint = QPushButton("手动检测") toolbar.addWidget(self.bnCheckPrint) # 添加分隔标签 toolbar.addWidget(QLabel("图像保存:")) self.bnSaveCurrentFrame = QPushButton("保存当前帧") toolbar.addWidget(self.bnSaveCurrentFrame) self.bnSaveSample = QPushButton("保存标准样本") toolbar.addWidget(self.bnSaveSample) self.bnPreviewSample = QPushButton("预览样本") toolbar.addWidget(self.bnPreviewSample) # 添加触发指示灯 self.triggerIndicator = QLabel() self.triggerIndicator.setFixedSize(20, 20) self.triggerIndicator.setStyleSheet("background-color: gray; border-radius: 10px;") toolbar.addWidget(QLabel("触发状态:")) toolbar.addWidget(self.triggerIndicator) # 历史记录 toolbar.addWidget(QLabel("历史记录:")) self.cbHistory = QComboBox() self.cbHistory.setMinimumWidth(300) toolbar.addWidget(self.cbHistory) # 状态栏样本路径 self.lblSamplePath = QLabel("当前样本: 未设置样本") self.statusBar().addPermanentWidget(self.lblSamplePath) # 右侧面板 right_panel = QWidget() right_layout = QVBoxLayout(right_panel) right_layout.setContentsMargins(10, 10, 10, 10) # 差异度调整组 diff_group = QGroupBox("差异度调整") diff_layout = QVBoxLayout(diff_group) self.lblDiffThreshold = QLabel("差异度阈值 (0-100%):") self.sliderDiffThreshold = QSlider(Qt.Horizontal) self.sliderDiffThreshold.setRange(0, 100) self.sliderDiffThreshold.setValue(5) self.lblDiffValue = QLabel("5%") self.lblCurrentDiff = QLabel("当前差异度: -") self.lblCurrentDiff.setStyleSheet("font-size: 14px; font-weight: bold;") self.lblDiffStatus = QLabel("状态: 未检测") self.lblDiffStatus.setStyleSheet("font-size: 12px;") diff_layout.addWidget(self.lblDiffThreshold) diff_layout.addWidget(self.sliderDiffThreshold) diff_layout.addWidget(self.lblDiffValue) diff_layout.addWidget(self.lblCurrentDiff) diff_layout.addWidget(self.lblDiffStatus) right_layout.addWidget(diff_group) # ===== 连续匹配面板 ===== match_group = QGroupBox("连续帧匹配") match_layout = QVBoxLayout(match_group) # 样本设置 sample_layout = QHBoxLayout() self.bnSetSample = QPushButton("设置标准样本") self.bnPreviewSample = QPushButton("预览样本") self.lblSampleStatus = QLabel("状态: 未设置样本") sample_layout.addWidget(self.bnSetSample) sample_layout.addWidget(self.bnPreviewSample) sample_layout.addWidget(self.lblSampleStatus) match_layout.addLayout(sample_layout) # 匹配参数 param_layout = QHBoxLayout() self.lblMatchThreshold = QLabel("匹配阈值:") self.sliderThreshold = QSlider(Qt.Horizontal) self.sliderThreshold.setRange(50, 100) self.sliderThreshold.setValue(75) # 降低默认阈值 self.lblThresholdValue = QLabel("75%") param_layout.addWidget(self.lblMatchThreshold) param_layout.addWidget(self.sliderThreshold) param_layout.addWidget(self.lblThresholdValue) match_layout.addLayout(param_layout) # 触发阈值调整 trigger_threshold_layout = QHBoxLayout() self.lblTriggerThreshold = QLabel("触发阈值(%):") self.sliderTriggerThreshold = QSlider(Qt.Horizontal) self.sliderTriggerThreshold.setRange(0, 100) self.sliderTriggerThreshold.setValue(50) # 默认50% self.lblTriggerValue = QLabel("50%") trigger_threshold_layout.addWidget(self.lblTriggerThreshold) trigger_threshold_layout.addWidget(self.sliderTriggerThreshold) trigger_threshold_layout.addWidget(self.lblTriggerValue) match_layout.addLayout(trigger_threshold_layout) # 匹配分数显示 match_score_layout = QHBoxLayout() self.lblMatchScore = QLabel("实时匹配分数:") self.lblMatchScoreValue = QLabel("0.0%") self.lblMatchScoreValue.setStyleSheet("font-weight: bold;") match_score_layout.addWidget(self.lblMatchScore) match_score_layout.addWidget(self.lblMatchScoreValue) match_layout.addLayout(match_score_layout) # 连续匹配开关 self.chkContinuousMatch = QCheckBox("启用连续帧匹配") self.chkContinuousMatch.setChecked(False) match_layout.addWidget(self.chkContinuousMatch) right_layout.addWidget(match_group) right_layout.addStretch(1) # 添加调试按钮 self.bnDebug = QPushButton("调试匹配") toolbar.addWidget(self.bnDebug) # 连接信号 self.bnDebug.clicked.connect(self.debug_matching) # 停靠窗口 dock = QDockWidget("检测控制面板", self) dock.setWidget(right_panel) dock.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) self.addDockWidget(Qt.RightDockWidgetArea, dock) @pyqtSlot(QImage, float, bool) def updateDisplay(self, qimg, match_score, is_matched): """线程安全的显示更新方法(只接收 QImage)""" if qimg.isNull(): return # 创建QPixmap并缩放 pixmap = QPixmap.fromImage(qimg) scaled_pixmap = pixmap.scaled( self.widgetDisplay.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) # 更新显示 self.widgetDisplay.setPixmap(scaled_pixmap) self.widgetDisplay.setAlignment(Qt.AlignCenter) def closeEvent(self, event): logging.info("主窗口关闭,执行清理...") close_device() event.accept() def debug_matching(self): """保存当前帧和匹配结果用于调试""" global isGrabbing, current_sample_path, obj_cam_operation # 检查必要状态 if not isGrabbing: QMessageBox.warning(self, "错误", "请先开始相机取流!") return if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(self, "错误", "请先设置有效的标准样本图像!") return try: # 获取当前帧 frame = obj_cam_operation.get_current_frame() if frame is None: QMessageBox.warning(self, "无有效图像", "未获取到有效帧") return # 创建调试目录 debug_dir = "debug_match" os.makedirs(debug_dir, exist_ok=True) # 保存当前帧 current_time = datetime.now().strftime("%Y%m%d_%H%M%S") frame_path = os.path.join(debug_dir, f"current_frame_{current_time}.png") cv2.imwrite(frame_path, frame) # 保存样本图像 sample_img = cv2.imread(current_sample_path) sample_path = os.path.join(debug_dir, f"sample_{os.path.basename(current_sample_path)}") cv2.imwrite(sample_path, sample_img) # 安全预处理:检查图像通道数 def safe_preprocess(img): """安全的图像预处理函数""" # 处理不同通道数的图像 if len(img.shape) == 2 or img.shape[2] == 1: # 已经是灰度图 return img elif img.shape[2] == 3: # 三通道彩色图 return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) elif img.shape[2] == 4: # 四通道图(带alpha) return cv2.cvtColor(img, cv2.COLOR_BGRA2GRAY) else: # 其他通道数,返回原图 logging.warning(f"无法处理的图像格式: {img.shape}") return img # 应用安全预处理 if template_matcher_thread: processed_frame = template_matcher_thread.preprocess_image(frame) processed_sample = template_matcher_thread.preprocess_image(sample_img) else: processed_frame = safe_preprocess(frame) processed_sample = safe_preprocess(sample_img) # 提取特征点 sift = cv2.SIFT_create() kp1, des1 = sift.detectAndCompute(processed_sample, None) kp2, des2 = sift.detectAndCompute(processed_frame, None) # 绘制特征点 - 处理单通道图像 def draw_keypoints_safe(img, keypoints): """安全的特征点绘制函数""" if len(img.shape) == 2: # 单通道转三通道用于彩色绘制 return cv2.drawKeypoints( cv2.cvtColor(img, cv2.COLOR_GRAY2BGR), keypoints, None, color=(0, 255, 0) ) else: return cv2.drawKeypoints(img, keypoints, None, color=(0, 255, 0)) sample_with_kp = draw_keypoints_safe(processed_sample, kp1) frame_with_kp = draw_keypoints_safe(processed_frame, kp2) # 保存带特征点的图像 cv2.imwrite(os.path.join(debug_dir, "sample_keypoints.png"), sample_with_kp) cv2.imwrite(os.path.join(debug_dir, "frame_keypoints.png"), frame_with_kp) # 进行匹配(仅当有足够的特征点时) if des1 is not None and des2 is not None and len(des1) > 0 and len(des2) > 0: FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(des1, des2, k=2) good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) # 绘制匹配结果 match_img = cv2.drawMatches( sample_img, kp1, frame, kp2, good_matches, None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS ) # 保存匹配结果 cv2.imwrite(os.path.join(debug_dir, "matches.png"), match_img) QMessageBox.information(self, "调试完成", f"调试文件已保存到: {debug_dir}\n" f"样本特征点: {len(kp1)}\n" f"当前帧特征点: {len(kp2)}\n" f"良好匹配数: {len(good_matches)}") else: QMessageBox.warning(self, "特征不足", f"无法进行匹配:\n" f"样本特征点: {len(kp1) if kp1 else 0}\n" f"当前帧特征点: {len(kp2) if kp2 else 0}") except Exception as e: logging.exception("调试匹配失败") QMessageBox.critical(self, "调试错误", f"调试过程中发生错误:\n{str(e)}") # ===== 辅助函数 ===== def ToHexStr(num): if not isinstance(num, int): try: num = int(num) except: return f"<非整数:{type(num)}>" chaDic = {10: 'a', 11: 'b', 12: 'c', 13: 'd', 14: 'e', 15: 'f'} hexStr = "" if num < 0: num = num + 2 ** 32 while num >= 16: digit = num % 16 hexStr = chaDic.get(digit, str(digit)) + hexStr num //= 16 hexStr = chaDic.get(num, str(num)) + hexStr return "0x" + hexStr def enum_devices(): global deviceList, obj_cam_operation n_layer_type = ( MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE ) # 创建设备列表 deviceList = MV_CC_DEVICE_INFO_LIST() # 枚举设备 ret = MvCamera.MV_CC_EnumDevices(n_layer_type, deviceList) if ret != MV_OK: error_msg = f"枚举设备失败! 错误码: 0x{ret:x}" logging.error(error_msg) QMessageBox.warning(mainWindow, "错误", error_msg, QMessageBox.Ok) return ret if deviceList.nDeviceNum == 0: QMessageBox.warning(mainWindow, "提示", "未找到任何设备", QMessageBox.Ok) return MV_OK logging.info(f"找到 {deviceList.nDeviceNum} 个设备") # 处理设备信息 devList = [] for i in range(deviceList.nDeviceNum): # 获取设备信息 mvcc_dev_info = ctypes.cast( deviceList.pDeviceInfo[i], ctypes.POINTER(MV_CC_DEVICE_INFO) ).contents # 根据设备类型提取信息 if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: st_gige_info = mvcc_dev_info.SpecialInfo.stGigEInfo ip_addr = ( f"{(st_gige_info.nCurrentIp >> 24) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 16) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 8) & 0xFF}." f"{st_gige_info.nCurrentIp & 0xFF}" ) # 修复:将c_ubyte_Array_16转换为字节串再解码 user_defined_bytes = bytes(st_gige_info.chUserDefinedName) dev_name = f"GigE: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} ({ip_addr})") elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: st_usb_info = mvcc_dev_info.SpecialInfo.stUsb3VInfo serial = bytes(st_usb_info.chSerialNumber).decode('ascii', 'ignore').rstrip('\x00') # 修复:同样处理用户自定义名称 user_defined_bytes = bytes(st_usb_info.chUserDefinedName) dev_name = f"USB: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} (SN: {serial})") else: devList.append(f"[{i}] 未知设备类型: {mvcc_dev_info.nTLayerType}") # 更新UI mainWindow.ComboDevices.clear() mainWindow.ComboDevices.addItems(devList) if devList: mainWindow.ComboDevices.setCurrentIndex(0) mainWindow.statusBar().showMessage(f"找到 {deviceList.nDeviceNum} 个设备", 300) return MV_OK def set_continue_mode(): ret = obj_cam_operation.set_trigger_mode(False) if ret != 0: strError = "设置连续模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(True) mainWindow.radioTriggerMode.setChecked(False) mainWindow.bnSoftwareTrigger.setEnabled(False) def set_software_trigger_mode(): ret = obj_cam_operation.set_trigger_mode(True) if ret != 0: strError = "设置触发模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(False) mainWindow.radioTriggerMode.setChecked(True) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing) def trigger_once(): ret = obj_cam_operation.trigger_once() if ret != 0: strError = "软触发失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) def save_sample_image(): global isGrabbing, obj_cam_operation, current_sample_path if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return # 尝试捕获当前帧 frame = obj_cam_operation.capture_frame() if frame is None: QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return # 确保图像有效 if frame.size == 0 or frame.shape[0] == 0 or frame.shape[1] == 0: QMessageBox.warning(mainWindow, "无效图像", "捕获的图像无效,请检查相机设置!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"sample_{timestamp}" file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存标准样本图像", os.path.join(last_dir, default_filename), "BMP Files (*.bmp);;PNG Files (*.png);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return # 确保文件扩展名正确 file_extension = os.path.splitext(file_path)[1].lower() if not file_extension: if "BMP" in selected_filter: file_path += ".bmp" elif "PNG" in selected_filter: file_path += ".png" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" else: file_path += ".bmp" file_extension = os.path.splitext(file_path)[1].lower() # 创建目录(如果不存在) directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) except OSError as e: QMessageBox.critical(mainWindow, "目录创建错误", f"无法创建目录 {directory}: {str(e)}", QMessageBox.Ok) return # 保存图像 try: # 使用OpenCV保存图像 if not cv2.imwrite(file_path, frame): raise Exception("OpenCV保存失败") # 更新状态 current_sample_path = file_path update_sample_display() settings.setValue("last_save_dir", os.path.dirname(file_path)) # 显示成功消息 QMessageBox.information(mainWindow, "成功", f"标准样本已保存至:\n{file_path}", QMessageBox.Ok) # 更新样本状态 mainWindow.lblSampleStatus.setText("状态: 样本已设置") mainWindow.lblSampleStatus.setStyleSheet("color: green;") except Exception as e: logging.error(f"保存图像失败: {str(e)}") QMessageBox.critical(mainWindow, "保存错误", f"保存图像时发生错误:\n{str(e)}", QMessageBox.Ok) def preview_sample(): global current_sample_path if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(mainWindow, "错误", "请先设置有效的标准样本图像!", QMessageBox.Ok) return try: # 直接使用OpenCV加载图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: raise Exception("无法加载图像") # 显示图像 cv2.namedWindow("标准样本预览", cv2.WINDOW_NORMAL) cv2.resizeWindow("标准样本预览", 800, 600) cv2.imshow("标准样本预览", sample_img) cv2.waitKey(0) cv2.destroyAllWindows() except Exception as e: QMessageBox.warning(mainWindow, "错误", f"预览样本失败: {str(e)}", QMessageBox.Ok) def is_float(str): try: float(str) return True except ValueError: return False def get_param(): try: ret = obj_cam_operation.get_parameters() if ret != MV_OK: strError = "获取参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) else: mainWindow.edtExposureTime.setText("{0:.2f}".format(obj_cam_operation.exposure_time)) mainWindow.edtGain.setText("{0:.2f}".format(obj_cam_operation.gain)) mainWindow.edtFrameRate.setText("{0:.2f}".format(obj_cam_operation.frame_rate)) except Exception as e: error_msg = f"获取参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def set_param(): frame_rate = mainWindow.edtFrameRate.text() exposure = mainWindow.edtExposureTime.text() gain = mainWindow.edtGain.text() if not (is_float(frame_rate) and is_float(exposure) and is_float(gain)): strError = "设置参数失败: 参数必须是有效的浮点数" QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) return MV_E_PARAMETER try: ret = obj_cam_operation.set_param( frame_rate=float(frame_rate), exposure_time=float(exposure), gain=float(gain) ) if ret != MV_OK: strError = "设置参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) except Exception as e: error_msg = f"设置参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def enable_controls(): global isGrabbing, isOpen mainWindow.groupGrab.setEnabled(isOpen) mainWindow.paramgroup.setEnabled(isOpen) mainWindow.bnOpen.setEnabled(not isOpen) mainWindow.bnClose.setEnabled(isOpen) mainWindow.bnStart.setEnabled(isOpen and (not isGrabbing)) mainWindow.bnStop.setEnabled(isOpen and isGrabbing) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing and mainWindow.radioTriggerMode.isChecked()) mainWindow.bnSaveImage.setEnabled(isOpen and isGrabbing) mainWindow.bnCheckPrint.setEnabled(isOpen and isGrabbing) mainWindow.bnSaveSample.setEnabled(isOpen and isGrabbing) mainWindow.bnPreviewSample.setEnabled(bool(current_sample_path)) mainWindow.bnSaveCurrentFrame.setEnabled(isOpen and isGrabbing) # 连续匹配控制 mainWindow.chkContinuousMatch.setEnabled(bool(current_sample_path) and isGrabbing) # ===== 相机帧监控线程 ===== class FrameMonitorThread(QThread): frame_status = pyqtSignal(str) # 用于发送状态消息的信号 def __init__(self, cam_operation): super().__init__() self.cam_operation = cam_operation self.running = True self.frame_count = 0 self.last_time = time.time() def run(self): """监控相机帧状态的主循环""" while self.running: try: if self.cam_operation and self.cam_operation.is_grabbing: # 获取帧统计信息 frame_info = self.get_frame_info() if frame_info: fps = frame_info.get('fps', 0) dropped = frame_info.get('dropped', 0) status = f"FPS: {fps:.1f} | 丢帧: {dropped}" self.frame_status.emit(status) else: self.frame_status.emit("取流中...") else: self.frame_status.emit("相机未取流") except Exception as e: self.frame_status.emit(f"监控错误: {str(e)}") # 每500ms检查一次 QThread.msleep(500) def stop(self): """停止监控线程""" self.running = False self.wait(1000) # 等待线程结束 def calculate_fps(self): """计算当前帧率""" current_time = time.time() elapsed = current_time - self.last_time if elapsed > 0: fps = self.frame_count / elapsed self.frame_count = 0 self.last_time = current_time return fps return 0 def get_frame_info(self): """获取帧信息""" try: # 更新帧计数 self.frame_count += 1 # 返回帧信息 return { 'fps': self.calculate_fps(), 'dropped': 0 # 实际应用中需要从相机获取真实丢帧数 } except Exception as e: logging.error(f"获取帧信息失败: {str(e)}") return None # ===== 主程序入口 ===== if __name__ == "__main__": # 配置日志系统 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("cloth_inspection_continuous.log"), logging.StreamHandler() ] ) logging.info("布料印花检测系统(连续匹配版)启动") app = QApplication(sys.argv) mainWindow = MainWindow() # 信号连接 mainWindow.sliderThreshold.valueChanged.connect(update_match_threshold) mainWindow.sliderTriggerThreshold.valueChanged.connect( mainWindow.update_trigger_threshold ) # 其他信号连接 mainWindow.sliderDiffThreshold.valueChanged.connect(update_diff_threshold) mainWindow.bnCheckPrint.clicked.connect(lambda: vision_controlled_check(None)) mainWindow.bnSaveSample.clicked.connect(save_sample_image) mainWindow.bnPreviewSample.clicked.connect(preview_sample) mainWindow.bnEnum.clicked.connect(enum_devices) mainWindow.bnOpen.clicked.connect(open_device) mainWindow.bnClose.clicked.connect(close_device) mainWindow.bnStart.clicked.connect(start_grabbing) mainWindow.bnStop.clicked.connect(stop_grabbing) mainWindow.bnSoftwareTrigger.clicked.connect(trigger_once) mainWindow.radioTriggerMode.clicked.connect(set_software_trigger_mode) mainWindow.radioContinueMode.clicked.connect(set_continue_mode) mainWindow.bnGetParam.clicked.connect(get_param) mainWindow.bnSetParam.clicked.connect(set_param) mainWindow.bnSaveImage.clicked.connect(save_current_frame) mainWindow.bnSaveCurrentFrame.clicked.connect(save_current_frame) # 连续匹配信号连接 mainWindow.sliderThreshold.valueChanged.connect(update_match_score_display) mainWindow.chkContinuousMatch.stateChanged.connect(toggle_template_matching) mainWindow.show() app.exec_() close_device() sys.exit()

import sys from collections import deque import matplotlib.pyplot as plt import networkx as nx import numpy as np plt.rcParams['font.sans-serif'] = ['SimHei'] # 解决中文显示问题 plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 class MinCostFlowSourceSinkVisual: def __init__(self, n, edges, source, sink, visualize=True): """ :param n: 节点数 :param edges: 边列表 [(u, v, lb, ub, cost)] :param source: 源点 :param sink: 汇点 :param visualize: 是否可视化 """ self.n = n self.source = source self.sink = sink self.original_edges = edges.copy() # 保存原始边 self.visualize = visualize self.fig, self.ax = plt.subplots(figsize=(14, 10)) self.fig.suptitle("有源汇上下界费用流算法动态可视化", fontsize=16) # 初始化超级源汇 self.super_source = n self.super_sink = n + 1 self.total_nodes = n + 2 # 计算每个节点的流量差 self.A = [0] * (n + 2) for u, v, lb, ub, cost in self.original_edges: self.A[u] -= lb self.A[v] += lb # 添加源汇之间无限容量的边 edges.append((sink, source, 0, float('inf'), 0)) # 创建最小费用流数据结构 self.graph = [[] for _ in range(self.total_nodes)] self.dist = [float('inf')] * self.total_nodes self.vis = [False] * self.total_nodes self.pre = [-1] * self.total_nodes self.edge_info = {} # 存储边信息 self.total_cost = 0 # 总费用 # 添加图中的边 self.edge_refs = [] for i, (u, v, lb, ub, cost) in enumerate(edges): cap = ub - lb # 添加边并记录信息 self.add_edge(u, v, cap, cost, (i, lb, ub, cost, f"e{i}")) # 仅原始边(不包括后添加的sink->source边)记录在edge_refs中 if i < len(edges) - 1: # 最后一条是后添加的sink->source边 self.edge_refs.append((u, v, len(self.graph[u]) - 1, lb)) # 添加超级源汇的边 self.total_flow = 0 for i in range(n + 2): # 包含所有节点 if self.A[i] > 0: self.add_edge(self.super_source, i, self.A[i], 0, (f"S→{i}", "super_source")) self.total_flow += self.A[i] elif self.A[i] < 0: self.add_edge(i, self.super_sink, -self.A[i], 0, (f"{i}→T", "super_sink")) # 初始化可视化 if self.visualize: self.initialize_visualization() def add_edge(self, u, v, cap, cost, info=None): """添加边并存储信息""" forward = [v, cap, cost, 0, info] # [目标, 容量, 费用, 流量, 信息] reverse = [u, 0, -cost, 0, None] # 反向边 forward[3] = reverse reverse[3] = forward self.graph[u].append(forward) self.graph[v].append(reverse) # 存储边信息用于可视化 if info: self.edge_info[(u, v)] = { 'capacity': cap, 'cost': cost, 'flow': 0, 'info': info } return forward def spfa(self, s, t): """SPFA算法寻找最小费用增广路径""" self.dist = [float('inf')] * self.total_nodes self.vis = [False] * self.total_nodes self.pre = [-1] * self.total_nodes self.dist[s] = 0 self.vis[s] = True queue = deque([s]) # 可视化:显示SPFA开始 if self.visualize: self.visualize_step(f"SPFA: 从超级源点S开始寻找最小费用路径") plt.pause(0.5) while queue: u = queue.popleft() self.vis[u] = False for idx, edge in enumerate(self.graph[u]): v, cap, cost, rev, info = edge if cap > 0 and self.dist[u] + cost < self.dist[v]: self.dist[v] = self.dist[u] + cost self.pre[v] = (u, idx) # 记录前驱节点和边索引 # 可视化:更新节点距离 if self.visualize: node_label = self.get_node_label(v) self.visualize_step(f"SPFA: 更新 {node_label} 距离: {self.dist[v]}") plt.pause(0.3) if not self.vis[v]: self.vis[v] = True queue.append(v) return self.dist[t] < float('inf') def min_cost_flow(self): """计算最小费用流并动态可视化""" total_flow = 0 iteration = 1 while self.spfa(self.super_source, self.super_sink): # 计算增广路径上的最小容量 flow = float('inf') cur = self.super_sink path_nodes = [] while cur != self.super_source: u, idx = self.pre[cur] edge = self.graph[u][idx] path_nodes.append(cur) flow = min(flow, edge[1]) cur = u path_nodes.append(self.super_source) path_nodes.reverse() # 可视化:显示找到的增广路径 if self.visualize: path_desc = "→".join([self.get_node_label(n) for n in path_nodes]) self.visualize_step(f"找到增广路径: {path_desc}\n流量: {flow}, 费用: {self.dist[self.super_sink]}") plt.pause(1.5) # 更新增广路径上的流量 cur = self.super_sink path_edges = [] while cur != self.super_source: u, idx = self.pre[cur] edge = self.graph[u][idx] rev_edge = edge[3] # 更新边流量 edge[1] -= flow rev_edge[1] += flow edge[4] = edge[4] or {} # 确保info存在 edge[4]['flow'] = edge[4].get('flow', 0) + flow # 更新费用 self.total_cost += flow * edge[2] # 记录路径边用于可视化 path_edges.append((u, cur)) # 更新可视化信息 if (u, cur) in self.edge_info: self.edge_info[(u, cur)]['flow'] += flow elif (cur, u) in self.edge_info: # 处理反向边 self.edge_info[(cur, u)]['flow'] -= flow cur = u # 可视化:显示流量更新 if self.visualize: self.visualize_step(f"沿路径更新流量: {flow}\n累计费用: {self.total_cost}") plt.pause(0.8) total_flow += flow iteration += 1 # 检查可行解 if total_flow != self.total_flow: if self.visualize: self.visualize_step(f"无可行解!\n需求流量: {self.total_flow}, 实际流量: {total_flow}") plt.pause(3.0) return None, None # 计算原图中每条边的实际流量 flows = [] for u, v, idx, lb in self.edge_refs: # 跳过最后添加的sink->source边 if u == self.sink and v == self.source: continue edge = self.graph[u][idx] actual_flow = lb + edge[1] # 实际流量 = 下界 + 残余网络中的剩余容量 flows.append(actual_flow) if self.visualize: self.visualize_final_flow(flows) plt.pause(5.0) return flows, self.total_cost def get_node_label(self, node): """获取节点标签""" if node == self.super_source: return "S" elif node == self.super_sink: return "T" elif node == self.source: return f"源点({node})" elif node == self.sink: return f"汇点({node})" else: return f"{node}" def get_edge_description(self, u, v): """获取边的描述信息""" if u == self.super_source: return f"S → {v}" elif v == self.super_sink: return f"{u} → T" elif u == self.source and v == self.sink: return f"{u}→{v} (源汇边)" elif (u, v) in self.edge_info: info = self.edge_info[(u, v)]['info'] if isinstance(info, tuple) and len(info) > 3: return f"{u} → {v} ({info[4]})" return f"{u} → {v}" def initialize_visualization(self): """初始化可视化布局""" self.G = nx.DiGraph() # 添加节点 for i in range(self.n): self.G.add_node(i, label=f"{i}") self.G.add_node(self.super_source, label="S") self.G.add_node(self.super_sink, label="T") # 添加边 for u in range(self.total_nodes): for edge in self.graph[u]: v, cap, cost, _, info = edge if cap > 0: # 只添加正向边 self.G.add_edge(u, v, capacity=cap, cost=cost, flow=0) # 创建环形布局 self.pos = {} # 普通节点布置在圆上 angles = np.linspace(0, 2 * np.pi, self.n, endpoint=False) for i in range(self.n): angle = angles[i] self.pos[i] = (np.cos(angle), np.sin(angle)) # 特殊节点位置 self.pos[self.source] = (0, 1.2) # 源点在上方 self.pos[self.sink] = (0, -1.2) # 汇点在下方 self.pos[self.super_source] = (-1.5, 0) # 超级源点在左侧 self.pos[self.super_sink] = (1.5, 0) # 超级汇点在右侧 # 初始绘图 self.ax.clear() # 节点颜色:普通节点-浅蓝,源汇点-浅绿,超级源汇-浅红 node_colors = [] for node in self.G.nodes(): if node == self.source or node == self.sink: node_colors.append('lightgreen') elif node == self.super_source or node == self.super_sink: node_colors.append('salmon') else: node_colors.append('lightblue') nx.draw_networkx_nodes(self.G, self.pos, node_size=800, node_color=node_colors) nx.draw_networkx_labels(self.G, self.pos, labels={n: d['label'] for n, d in self.G.nodes(data=True)}) # 绘制边 self.edge_collection = nx.draw_networkx_edges( self.G, self.pos, arrowstyle='->', arrowsize=20, edge_color='gray', width=1, ax=self.ax ) # 初始化边标签 self.edge_labels = {} for u, v in self.G.edges(): self.edge_labels[(u, v)] = self.ax.text(0, 0, "", fontsize=8, ha='center', va='center') self.ax.set_title("初始化网络", fontsize=14) self.ax.set_axis_off() plt.tight_layout() plt.pause(2.0) def visualize_step(self, message): """可视化当前步骤""" self.ax.clear() # 节点颜色 node_colors = [] for node in self.G.nodes(): if node == self.source or node == self.sink: node_colors.append('lightgreen') elif node == self.super_source or node == self.super_sink: node_colors.append('salmon') else: node_colors.append('lightblue') # 绘制节点 nx.draw_networkx_nodes(self.G, self.pos, node_size=800, node_color=node_colors) nx.draw_networkx_labels(self.G, self.pos, labels={n: d['label'] for n, d in self.G.nodes(data=True)}) # 绘制边并设置颜色和宽度 edge_colors = [] edge_widths = [] for u, v in self.G.edges(): # 获取当前边的状态 cap = self.G[u][v]['capacity'] flow = self.edge_info.get((u, v), {}).get('flow', 0) # 计算饱和度 saturation = flow / cap if cap > 0 else 0 # 使用颜色表示饱和度 edge_colors.append(plt.cm.RdYlGn(saturation)) # 使用宽度表示流量 edge_widths.append(1 + 3 * saturation) # 绘制边 nx.draw_networkx_edges( self.G, self.pos, arrowstyle='->', arrowsize=20, edge_color=edge_colors, width=edge_widths, ax=self.ax ) # 更新边标签 for (u, v), text in self.edge_labels.items(): # 获取边信息 cap = self.G[u][v]['capacity'] cost = self.G[u][v]['cost'] flow = self.edge_info.get((u, v), {}).get('flow', 0) # 特殊边处理 if u == self.super_source or v == self.super_sink: label = f"{flow}/{cap}\n费用:0" else: # 获取原始边信息 info = self.edge_info.get((u, v), {}).get('info', None) if info and isinstance(info, tuple): _, lb, ub, cost_val, name = info actual_flow = lb + flow label = f"{name}: {actual_flow}/{ub}\n费用:{cost_val}\n[{lb},{ub}]" else: label = f"{flow}/{cap}\n费用:{cost}" # 计算边的中点位置 x = (self.pos[u][0] + self.pos[v][0]) / 2 y = (self.pos[u][1] + self.pos[v][1]) / 2 # 更新文本位置和内容 text.set_position((x, y)) text.set_text(label) self.ax.add_artist(text) # 显示当前信息 self.ax.set_title(f"{message}\n总费用: {self.total_cost}", fontsize=14) self.ax.set_axis_off() plt.tight_layout() plt.draw() def visualize_final_flow(self, flows): """可视化最终可行流分配(仅显示原图边)""" self.ax.clear() # 创建仅包含原图节点和边的子图 H = nx.DiGraph() for i in range(self.n): H.add_node(i, label=f"{i}") # 添加原图边(排除最后添加的sink->source边) for i, (u, v, lb, ub, cost) in enumerate(self.original_edges): if i >= len(flows): continue H.add_edge(u, v, flow=flows[i], lb=lb, ub=ub, cost=cost, name=f"e{i}") # 使用原布局,但只保留原图节点的位置 pos = {k: v for k, v in self.pos.items() if k in H.nodes()} # 绘制节点 node_colors = ['lightgreen' if node == self.source or node == self.sink else 'lightblue' for node in H.nodes()] nx.draw_networkx_nodes(H, pos, node_size=800, node_color=node_colors) nx.draw_networkx_labels(H, pos) # 绘制边并设置颜色和宽度 edge_colors = [] edge_widths = [] for u, v in H.edges(): flow = H[u][v]['flow'] ub = H[u][v]['ub'] saturation = flow / ub edge_colors.append(plt.cm.RdYlGn(saturation)) edge_widths.append(1 + 3 * saturation) nx.draw_networkx_edges( H, pos, arrowstyle='->', arrowsize=20, edge_color=edge_colors, width=edge_widths, ax=self.ax ) # 添加边标签 edge_labels = {} for u, v in H.edges(): flow = H[u][v]['flow'] lb = H[u][v]['lb'] ub = H[u][v]['ub'] cost = H[u][v]['cost'] name = H[u][v]['name'] edge_labels[(u, v)] = f"{name}: {flow}\n费用:{cost}\n[{lb},{ub}]" nx.draw_networkx_edge_labels(H, pos, edge_labels=edge_labels, font_size=8) self.ax.set_title(f"最小费用流分配结果(总费用: {self.total_cost})", fontsize=14) self.ax.set_axis_off() plt.tight_layout() plt.draw() def min_cost_flow_visual(n, edges, source, sink): """有源汇上下界费用流求解与可视化""" # 创建可视化实例 mcf_visual = MinCostFlowSourceSinkVisual(n, edges, source, sink, visualize=True) # 计算最小费用流 flows, total_cost = mcf_visual.min_cost_flow() if flows is None: print("无可行流解") return None, None print("\n各边实际流量分配和费用:") for i, (u, v, lb, ub, cost) in enumerate(edges[:-1]): # 排除最后添加的sink->source边 print(f"边 {u}→{v} ({lb},{ub}): 流量={flows[i]}, 费用={cost}") # 计算源点到汇点的总流量 source_flow = sum(flows[i] for i, (u, v, _, _, _) in enumerate(edges) if u == source) sink_flow = sum(flows[i] for i, (u, v, _, _, _) in enumerate(edges) if v == sink) print(f"\n源点({source})总输出流量: {source_flow}") print(f"汇点({sink})总输入流量: {sink_flow}") print(f"总费用: {total_cost}") plt.show() # 保持窗口打开 return flows, total_cost if __name__ == "__main__": # 15节点有可行解的网络示例 - 简化版 print("=" * 50) print("15节点网络的有源汇上下界费用流计算 (保证有可行解)") # 简化设计:确保网络平衡 n = 15 edges = [ # 源点→核心节点 (u, v, lb, ub, cost) (0, 1, 5, 10, 2), (0, 2, 5, 10, 3), # 核心环状结构 (1, 2, 0, 5, 1), (2, 3, 2, 8, 4), (3, 4, 2, 8, 2), (4, 1, 0, 5, 1), # 核心→中间节点 (1, 5, 1, 4, 3), (2, 6, 1, 4, 2), (3, 7, 1, 4, 4), (4, 8, 1, 4, 3), # 中间层平衡结构 (5, 6, 0, 5, 1), (6, 7, 0, 5, 2), (7, 8, 0, 5, 1), (8, 5, 0, 5, 3), # 中间→汇点 (5, 14, 3, 6, 5), (6, 14, 3, 6, 4), (7, 14, 2, 5, 3), (8, 14, 2, 5, 2), # 连接外围节点 (1, 9, 0, 3, 2), (2, 10, 0, 3, 3), (3, 11, 0, 3, 1), (4, 12, 0, 3, 2), (9, 13, 0, 3, 4), (10, 13, 0, 3, 2), (11, 13, 0, 3, 3), (12, 13, 0, 3, 1), (13, 14, 0, 5, 2) # 汇点入口 ] # 设置源点和汇点 source = 0 # 节点0作为源点 sink = 14 # 节点14作为汇点 # 计算并可视化最小费用流 flows, total_cost = min_cost_flow_visual(n, edges, source, sink) C:\Users\25827\.conda\envs\torch\python.exe C:\Users\25827\Desktop\图论代码\有源汇上下界费用流.py ================================================== 15节点网络的有源汇上下界费用流计算 (保证有可行解) Traceback (most recent call last): File "C:\Users\25827\Desktop\图论代码\有源汇上下界费用流.py", line 516, in <module> flows, total_cost = min_cost_flow_visual(n, edges, source, sink) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\25827\Desktop\图论代码\有源汇上下界费用流.py", line 442, in min_cost_flow_visual flows, total_cost = mcf_visual.min_cost_flow() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\25827\Desktop\图论代码\有源汇上下界费用流.py", line 165, in min_cost_flow edge[4]['flow'] = edge[4].get('flow', 0) + flow ^^^^^^^^^^^ AttributeError: 'tuple' object has no attribute 'get' 进程已结束,退出代码为 1 给出修改后的完整代码

AttributeError Traceback (most recent call last) Cell In[8], line 8 5 t = np.linspace(0, 2 * np.pi, 1000) 7 # 创建图形 ----> 8 plt.figure(figsize=(12, 8)) 10 # 对每个i值进行计算和绘图 11 for i in range(3): 12 # 常量计算 File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\pyplot.py:1027, in figure(num, figsize, dpi, facecolor, edgecolor, frameon, FigureClass, clear, **kwargs) 1017 if len(allnums) == max_open_warning >= 1: 1018 _api.warn_external( 1019 f"More than {max_open_warning} figures have been opened. " 1020 f"Figures created through the pyplot interface " (...) 1024 f"Consider using matplotlib.pyplot.close().", 1025 RuntimeWarning) -> 1027 manager = new_figure_manager( 1028 num, figsize=figsize, dpi=dpi, 1029 facecolor=facecolor, edgecolor=edgecolor, frameon=frameon, 1030 FigureClass=FigureClass, **kwargs) 1031 fig = manager.canvas.figure 1032 if fig_label: File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\pyplot.py:550, in new_figure_manager(*args, **kwargs) 548 """Create a new figure manager instance.""" 549 _warn_if_gui_out_of_main_thread() --> 550 return _get_backend_mod().new_figure_manager(*args, **kwargs) File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backend_bases.py:3507, in _Backend.new_figure_manager(cls, num, *args, **kwargs) 3505 fig_cls = kwargs.pop('FigureClass', Figure) 3506 fig = fig_cls(*args, **kwargs) -> 3507 return cls.new_figure_manager_given_figure(num, fig) File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backend_bases.py:3512, in _Backend.new_figure_manager_given_figure(cls, num, figure) 3509 @classmethod 3510 def new_figure_manager_given_figure(cls, num, figure): 3511 """Create a new figure manager instance for the given figure.""" -> 3512 return cls.FigureCanvas.new_manager(figure, num) File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backend_bases.py:1797, in FigureCanvasBase.new_manager(cls, figure, num) 1786 @classmethod 1787 def new_manager(cls, figure, num): 1788 """ 1789 Create a new figure manager for *figure*, using this canvas class. 1790 (...) 1795 FigureManager.create_with_canvas. 1796 """ -> 1797 return cls.manager_class.create_with_canvas(cls, figure, num) File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backends\_backend_tk.py:504, in FigureManagerTk.create_with_canvas(cls, canvas_class, figure, num) 501 window.iconphoto(False, icon_img_large, icon_img) 503 canvas = canvas_class(figure, master=window) --> 504 manager = cls(canvas, num, window) 505 if mpl.is_interactive(): 506 manager.show() File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backends\_backend_tk.py:457, in FigureManagerTk.__init__(self, canvas, num, window) 455 def __init__(self, canvas, num, window): 456 self.window = window --> 457 super().__init__(canvas, num) 458 self.window.withdraw() 459 # packing toolbar first, because if space is getting low, last packed 460 # widget is getting shrunk first (-> the canvas) File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backend_bases.py:2655, in FigureManagerBase.__init__(self, canvas, num) 2650 self.toolmanager = (ToolManager(canvas.figure) 2651 if mpl.rcParams['toolbar'] == 'toolmanager' 2652 else None) 2653 if (mpl.rcParams["toolbar"] == "toolbar2" 2654 and self._toolbar2_class): -> 2655 self.toolbar = self._toolbar2_class(self.canvas) 2656 elif (mpl.rcParams["toolbar"] == "toolmanager" 2657 and self._toolmanager_toolbar_class): 2658 self.toolbar = self._toolmanager_toolbar_class(self.toolmanager) File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backends\_backend_tk.py:649, in NavigationToolbar2Tk.__init__(self, canvas, window, pack_toolbar) 644 self._message_label = tk.Label(master=self, font=self._label_font, 645 textvariable=self.message, 646 justify=tk.RIGHT) 647 self._message_label.pack(side=tk.RIGHT) --> 649 NavigationToolbar2.__init__(self, canvas) 650 if pack_toolbar: 651 self.pack(side=tk.BOTTOM, fill=tk.X) File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\backend_bases.py:2850, in NavigationToolbar2.__init__(self, canvas) 2848 self.canvas = canvas 2849 canvas.toolbar = self -> 2850 self._nav_stack = cbook._Stack() 2851 # This cursor will be set after the initial draw. 2852 self._last_cursor = tools.Cursors.POINTER File D:\Anaconda3\envs\Test3\lib\site-packages\matplotlib\_api\__init__.py:217, in caching_module_getattr.<locals>.__getattr__(name) 215 if name in props: 216 return props[name].__get__(instance) --> 217 raise AttributeError( 218 f"module {cls.__module__!r} has no attribute {name!r}") AttributeError: module 'matplotlib.cbook' has no attribute '_Stack'

import numpy as np import gurobipy as gp from gurobipy import GRB from scipy.spatial import distance from scipy import io import itertools import random import copy import time import matplotlib.pyplot as plt from collections import deque # ====================== # 仿真环境构建 # ====================== print("初始化仿真环境...") J = 8 # 服务区域内站点数量 m = 3 # 车队规模 c = 6 # 车载容量 N = 16 # 每趟列车的乘客发生量强度 batch_size = 10 # 总batch数量 service_time = 1 # 停站服务时间(分钟) headway = 10 # 车头时距间隔(分钟) M = 100 # 未被服务时的出行时间 t = 0 # 当前仿真时间 btw = np.zeros((m, 1)) # 车辆可用的时间 btw_vehicle = [] for i in range(m): vehicle = { 'record': [ [0, 0, 0, 0] # 初始化,第一行为时间、路径等信息 ] } btw_vehicle.append(vehicle) beta1, beta2, beta3 = 100, 10, 1 # 权重参数 metro_station = np.array([[0, 0]]) # 地铁站坐标 vehicle_speed = 30 # 公里/小时 servicearea_L, servicearea_W = 5, 3 # 服务区域尺寸 mat_data = io.loadmat('F:/LM程序/LM_stop8.mat') LM_stop = mat_data['LM_stop'] all_stop = np.vstack([metro_station, LM_stop]) # 合并所有站点 dist_matrix = distance.squareform(distance.pdist(all_stop)) # 计算欧氏距离 traveltime_matrix = dist_matrix / vehicle_speed * 60 # ====================== # 辅助函数 # ====================== def candidate_route_gen(max_stop, max_traveltime, traveltime_matrix, service_time, J): """生成满足约束的候选路径""" print(f"生成候选路径 (最多站点: {max_stop}, 最大行程时间: {max_traveltime}分钟)...") all_routes = [] for i in range(1, max_stop + 1): combinations = itertools.combinations(range(1, J + 1), i) for combo in combinations: permutations = itertools.permutations(combo) for perm in permutations: route = [0] + list(perm) + [0] all_routes.append(route) candidate_route = [] for route in all_routes: travel_time = 0 for j in range(len(route) - 1): from_node = route[j] to_node = route[j + 1] travel_time += traveltime_matrix[from_node][to_node] total_time = travel_time + (len(route) - 2) * service_time if total_time <= max_traveltime: candidate_route.append({ 'route': route, 'travel_time': total_time }) print(f"生成 {len(candidate_route)} 条候选路径") return candidate_route def parameter_gen(candidate_routes, J, M, service_time, traveltime_matrix): """生成模型所需参数矩阵""" print("生成模型参数...") K = len(candidate_routes) tk = [route['travel_time'] for route in candidate_routes] phi_jk = [] for j in range(1, J + 1): row = [] for route in candidate_routes: # 检查站点j是否在当前路径中 route_nodes = route['route'] row.append(1 if j in route_nodes else 0) phi_jk.append(row) t_jk = [[M] * K for _ in range(J)] for j_idx in range(J): j = j_idx + 1 for k, route in enumerate(candidate_routes): current_route = route['route'] if j not in current_route: continue idx = current_route.index(j) arrival_time = 0.0 for seg in range(idx): from_node = current_route[seg] to_node = current_route[seg + 1] arrival_time += traveltime_matrix[from_node, to_node] arrival_time += service_time * (idx - 1) t_jk[j_idx][k] = arrival_time return phi_jk, tk, t_jk def route_rank2(Wk, tk, Zjk, phi_jk, btw, t, headway): """对路径进行优先级排序""" btw = np.maximum(btw, t) valid_indices = np.where(Wk >= 1)[0] if len(valid_indices) == 0: return np.empty((0, 4), dtype=int) route_numbers = (valid_indices + 1).astype(int) S = np.zeros((len(route_numbers), 4), dtype=int) S[:, 0] = route_numbers S[:, 1] = Wk[valid_indices] S[:, 2] = [tk[i] for i in valid_indices] # 使用列表推导式获取正确的行程时间 all_permutations = list(itertools.permutations(route_numbers)) min_ft = float('inf') best_sequence = None for seq in all_permutations: current_btw = btw.copy() total_wait = 0 for route in seq: vid = np.argmin(current_btw) start_time = current_btw[vid].item() # 获取当前路径的行程时间 route_idx = route - 1 route_travel_time = tk[route_idx] current_btw[vid] += route_travel_time total_wait += np.sum(Zjk[:, route_idx]) * (start_time - t) idle_time = np.sum(np.maximum(t + headway - current_btw, 0)) ft = total_wait + idle_time if ft < min_ft: min_ft = ft best_sequence = seq if best_sequence is not None: priority_dict = {route: idx + 1 for idx, route in enumerate(best_sequence)} S[:, 3] = np.vectorize(priority_dict.get)(S[:, 0]) return S[S[:, 3].argsort()] else: return S # ====================== # 修正后的车辆调度函数 # ====================== def vehicle_dispatch(btw, t_jk, S, U, Zjk, t, headway, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke): """执行车辆调度""" U = U.copy() btw = btw.copy() for i in range(len(btw)): if btw[i] < t: vr = btw_vehicle[i] if len(vr['record']) == 0: vr['record'].append([t, t, 0, 0]) else: last_end = vr['record'][-1][1] vr['record'].append([last_end, t, 0, 0]) btw[i] = t for current_time in range(t, t + headway + 1): available = [i for i, bt in enumerate(btw) if current_time > bt] sorted_available = sorted(available, key=lambda x: btw[x]) if sorted_available and np.sum(U) > 0: for bus_idx in sorted_available: if np.sum(U) <= 0: break if S.size == 0: break route_info = S[0] total_trip[0] += 1 route_idx = route_info[0] - 1 # 路径索引 route_travel_time = route_info[2] # 路径行程时间 total_traveltime[0] += route_travel_time served_pax = Zjk[:, route_idx] totoal_ridingtime[0] += np.sum(served_pax * t_jk[:, route_idx]) waiting_time = btw[bus_idx] - t total_waitingtime[0] += np.sum(served_pax) * waiting_time # 更新乘客信息 for j in range(len(served_pax)): if served_pax[j] > 0: stop = j + 1 # 站点编号 pax_mask = (chengke[:, 2] == stop) & (chengke[:, 9] == 0) pax_candidates = np.where(pax_mask)[0] if len(pax_candidates) > 0: num_pax = min(served_pax[j], len(pax_candidates)) selected = pax_candidates[:num_pax] chengke[selected, 9] = 1 # 标记为已服务 chengke[selected, 4] = btw[bus_idx] # 上车时间 chengke[selected, 5] = chengke[selected, 4] - chengke[selected, 3] # 等待时间 chengke[selected, 6] = t_jk[j, route_idx] # 乘车时间 chengke[selected, 7] = route_info[0] # 路径ID chengke[selected, 8] = bus_idx + 1 # 车辆ID # 更新车辆记录 vr = btw_vehicle[bus_idx] if not vr['record']: vr['record'].append([ btw[bus_idx], btw[bus_idx] + route_travel_time, route_info[0], route_travel_time ]) else: last_end = vr['record'][-1][1] vr['record'].append([ last_end, last_end + route_travel_time, route_info[0], route_travel_time ]) # 更新车辆可用时间和需求 btw[bus_idx] += route_travel_time U = U - Zjk[:, route_idx] S = np.delete(S, 0, axis=0) # 移除已分配路径 if np.sum(U) <= 0: break # 处理未服务的乘客 if current_time == t + headway and np.sum(U) > 0: total_waitingtime[0] += np.sum(U) * headway return (btw, S, U, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke) def lastmile_model(phi_jk, tk, t_jk, U, beta1, beta2, beta3, K, J, c): """构建并求解混合整数规划模型""" print("构建并求解MIP模型...") try: model = gp.Model("LastMile") model.Params.OutputFlag = 0 model.Params.TimeLimit = 30 # 设置30秒时间限制 wk = model.addVars(K, vtype=GRB.INTEGER, name="wk") g = model.addVar(vtype=GRB.INTEGER, name="g") zjk = model.addVars(J, K, vtype=GRB.INTEGER, name="zjk") obj = beta1 * g obj += beta2 * gp.quicksum(tk[k] * wk[k] for k in range(K)) obj += beta3 * gp.quicksum(t_jk[j][k] * zjk[j, k] for j in range(J) for k in range(K)) model.setObjective(obj, GRB.MINIMIZE) # 约束1: 所有需求必须被满足 for j in range(J): model.addConstr( gp.quicksum(zjk[j, k] * phi_jk[j][k] for k in range(K)) == U[j], name=f"constr1_j{j}" ) # 约束2: 车辆容量约束 for k in range(K): model.addConstr( gp.quicksum(zjk[j, k] * phi_jk[j][k] for j in range(J)) <= c * wk[k], name=f"constr2_k{k}" ) # 约束3: 总行程数 model.addConstr( gp.quicksum(wk[k] for k in range(K)) == g, name="constr3_total_trips" ) # 约束4: 非负约束 model.addConstr(g >= 1, name="constr4_g_min") for k in range(K): model.addConstr(wk[k] >= 0, name=f"constr4_wk{k}_min") for j in range(J): for k in range(K): model.addConstr(zjk[j, k] >= 0, name=f"constr4_zjk{j}{k}_min") model.optimize() if model.status == GRB.OPTIMAL: Zjk = np.zeros((J, K), dtype=int) Wk = np.zeros(K, dtype=int) for j in range(J): for k in range(K): Zjk[j][k] = round(zjk[j, k].X) for k in range(K): Wk[k] = round(wk[k].X) G = round(g.X) return Zjk, Wk, G else: # 如果未找到最优解,使用启发式方法生成可行解 print("未找到最优解,使用启发式方法生成可行解...") return heuristic_solution(phi_jk, U, c, K, J) except gp.GurobiError as e: print(f"Gurobi错误: {e}") return heuristic_solution(phi_jk, U, c, K, J) def heuristic_solution(phi_jk, U, c, K, J): """启发式方法生成可行解""" print("使用启发式方法生成可行解...") Zjk = np.zeros((J, K), dtype=int) Wk = np.zeros(K, dtype=int) # 简单启发式:为每个站点分配车辆 remaining_demand = U.copy() k = 0 while np.sum(remaining_demand) > 0 and k < K: # 尝试覆盖尽可能多的站点 coverage = np.zeros(J, dtype=int) for j in range(J): if phi_jk[j][k] == 1 and remaining_demand[j] > 0: coverage[j] = 1 if np.sum(coverage) > 0: # 分配车辆 Wk[k] = 1 # 分配乘客 for j in range(J): if coverage[j] == 1: assign = min(remaining_demand[j], c) Zjk[j][k] = assign remaining_demand[j] -= assign k += 1 else: k += 1 G = np.sum(Wk) return Zjk, Wk, G # ====================== # 数据加载与预处理 # ====================== print("加载乘客分布数据...") passenger_distributionUN = io.loadmat('F:/LM程序/passenger_distribution_16UN.mat')['passenger_distributionUN'] passenger_distributionSH = io.loadmat('F:/LM程序/passenger_distribution_16SH.mat')['passenger_distributionSH'] passenger_distributionEH = io.loadmat('F:/LM程序/passenger_distribution_16EH.mat')['passenger_distributionEH'] ui = passenger_distributionEH # 选择分布类型 chengke = [] # 初始化乘客列表 for i in range(1, batch_size + 1): passenger_count_in_batch = 1 for j in range(1, J + 1): passenger_num = ui[i - 1, j - 1].item() if passenger_num > 0: for _ in range(int(passenger_num)): arrival_time = t + (i - 1) * headway passenger_record = [ i, # 批次编号 passenger_count_in_batch, # 批次内序号 j, # 下车站点 arrival_time, # 到达时间 *[0] * 6 # 初始化后6个字段 ] chengke.append(passenger_record) passenger_count_in_batch += 1 # ====================== # 候选路径生成 # ====================== candidate_route = candidate_route_gen( max_stop=3, max_traveltime=14, traveltime_matrix=traveltime_matrix, service_time=service_time, J=J ) K = len(candidate_route) phi_jk, tk, t_jk = parameter_gen(candidate_route, J, M, service_time, traveltime_matrix) # ====================== # 初始化记录变量 # ====================== total_trip = [0] total_traveltime = [0] total_waitingtime = [0] totoal_ridingtime = [0] chengke = np.array(chengke) t_jk = np.array(t_jk) btw = np.array(btw) tk = np.array(tk) btw_record = np.zeros((len(btw), batch_size + 1)) s = [{'route': None} for _ in range(batch_size + 100)] # 确保s是字典列表 pax_asg = [{'record': None} for _ in range(batch_size + 100)] # 确保pax_asg是字典列表 # ====================== # 主仿真循环 # ====================== print("开始主仿真循环...") for i in range(batch_size): print(f"\n处理批次 {i + 1}/{batch_size}...") if i == 0: U = ui[0, :].copy() else: U += ui[i, :] print(f"当前需求: {U}") # 求解模型 Zjk, Wk, G = lastmile_model(phi_jk, tk, t_jk, U, beta1, beta2, beta3, K, J, c) print(f"模型求解完成: 总行程数={G}, 路径分配={Wk}") # 路径排序 S = route_rank2(Wk, tk, Zjk, phi_jk, btw, t, headway) print(f"路径排序完成: 分配{len(S)}条路径") Temp_S = S.copy() if S.size > 0 else np.array([]) # 车辆调度 (btw, S, U, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke) = vehicle_dispatch( btw, t_jk, S, U, Zjk, t, headway, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke ) # 保存结果 if Temp_S.size > 0: s[i]["route"] = Temp_S pax_asg[i]['record'] = Zjk else: s[i]["route"] = np.array([]) pax_asg[i]['record'] = np.zeros((J, K)) # 更新时间和车辆状态 t += headway btw_record[:, i + 1] = btw.squeeze() print(f"批次完成, 剩余需求: {np.sum(U)}") # 处理剩余需求 print("\n处理剩余需求...") plus_trip = batch_size while np.sum(U) > 0 and plus_trip < batch_size + 10: # 添加安全限制 plus_trip += 1 print(f"额外批次 {plus_trip - batch_size}, 剩余需求: {np.sum(U)}") # 求解模型 Zjk, Wk, G = lastmile_model(phi_jk, tk, t_jk, U, beta1, beta2, beta3, K, J, c) print(f"模型求解完成: 总行程数={G}, 路径分配={Wk}") # 路径排序 S = route_rank2(Wk, tk, Zjk, phi_jk, btw, t, headway) print(f"路径排序完成: 分配{len(S)}条路径") Temp_S = S.copy() if S.size > 0 else np.array([]) # 车辆调度 (btw, S, U, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke) = vehicle_dispatch( btw, t_jk, S, U, Zjk, t, headway, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke ) # 保存结果 if Temp_S.size > 0: s[plus_trip] = {"route": Temp_S} pax_asg[plus_trip] = {'record': Zjk} else: s[plus_trip] = {"route": np.array([])} pax_asg[plus_trip] = {'record': np.zeros((J, K))} # 更新时间 t += headway print(f"\n额外的运行周期:{plus_trip - batch_size}") total_pax = np.sum(ui) print(f'总的乘客数量为:{total_pax}') print(f'总的行程数量为:{total_trip[0]}') print(f'总的服务时间为:{total_traveltime[0]}') print(f'乘客总的乘车时间为:{totoal_ridingtime[0]}') print(f'乘客总的等待时间为:{total_waitingtime[0]}') if total_pax > 0: avg_riding = totoal_ridingtime[0] / total_pax avg_waiting = total_waitingtime[0] / total_pax print(f'乘客总的平均乘车时间为:{avg_riding:.2f}') print(f'乘客总的平均等待时间为:{avg_waiting:.2f}') else: print('乘客总数为零,无法计算平均值') # ====================== # 禁忌搜索优化器 # ====================== class TabuSearchOptimizer: def __init__(self, initial_solution, candidate_routes, travel_time_matrix, passenger_data, vehicle_capacity, headway, num_vehicles, max_iter=50, max_no_improve=10, tabu_tenure=7): """ 初始化禁忌搜索优化器 """ self.initial_solution = initial_solution self.candidate_routes = candidate_routes self.travel_time_matrix = travel_time_matrix self.passenger_data = passenger_data self.vehicle_capacity = vehicle_capacity self.headway = headway self.num_vehicles = num_vehicles self.max_iter = max_iter self.max_no_improve = max_no_improve self.tabu_tenure = tabu_tenure # 初始化数据结构 self.best_solution = self.initialize_solution(initial_solution) self.best_objective = self.evaluate_solution(self.best_solution) self.current_solution = copy.deepcopy(self.best_solution) self.current_objective = self.best_objective self.tabu_list = deque(maxlen=tabu_tenure) self.objective_history = [self.best_objective] self.improvement_history = [] def initialize_solution(self, solution): """确保解决方案使用列表而不是numpy数组""" initialized = [] for interval in solution: # 转换route为列表 if 'route' in interval and isinstance(interval['route'], np.ndarray): # 将numpy数组转换为列表 if interval['route'].size > 0: interval['route'] = interval['route'].tolist() else: interval['route'] = [] initialized.append(interval) return initialized def evaluate_solution(self, solution): """ 评估解决方案的目标函数值(总等待时间+乘车时间) """ total_waiting = 0 total_riding = 0 vehicle_available = np.zeros(self.num_vehicles) unserved_passengers = [] # 预处理乘客数据为结构化数组 passenger_array = np.array(self.passenger_data, dtype=object) # 处理每个时间间隔 for i, interval in enumerate(solution): interval_start = i * self.headway # 添加当前间隔到达的乘客 batch_mask = (passenger_array[:, 0] == i + 1) if np.any(batch_mask): batch_passengers = passenger_array[batch_mask].copy() batch_passengers = np.column_stack((batch_passengers, np.full(batch_passengers.shape[0], interval_start))) unserved_passengers.extend(batch_passengers.tolist()) # 处理当前间隔的路径 if 'route' in interval and interval['route']: routes = interval['route'] # 按优先级排序 sorted_routes = sorted(routes, key=lambda x: x[3] if len(x) > 3 else 0) for route in sorted_routes: route_idx = route[0] - 1 route_info = self.candidate_routes[route_idx] # 选择最早可用的车辆 vehicle_idx = np.argmin(vehicle_available) start_time = max(vehicle_available[vehicle_idx], interval_start) # 服务乘客 capacity_used = 0 passengers_to_remove = [] route_stops = set(route_info['route'][1:-1]) # 筛选符合条件的乘客 eligible_passengers = [] for idx, p in enumerate(unserved_passengers): if p[2] in route_stops: eligible_passengers.append((idx, p)) # 按到达时间排序 eligible_passengers.sort(key=lambda x: x[1][3]) # 服务乘客直到车辆满载 for idx, p in eligible_passengers: if capacity_used >= self.vehicle_capacity: break # 计算等待时间和乘车时间 waiting_time = start_time - p[3] from_node = 0 # 起点(地铁站) to_node = p[2] # 下车站点 riding_time = self.travel_time_matrix[from_node][to_node] total_waiting += waiting_time total_riding += riding_time capacity_used += 1 passengers_to_remove.append(idx) # 移除已服务乘客 for idx in sorted(passengers_to_remove, reverse=True): unserved_passengers.pop(idx) # 更新车辆可用时间 vehicle_available[vehicle_idx] = start_time + route_info['travel_time'] # 对未服务乘客的惩罚 last_time = len(solution) * self.headway for p in unserved_passengers: total_waiting += (last_time - p[3]) * 10 # 惩罚因子 return total_waiting + total_riding def generate_neighbors(self, solution, num_neighbors=10): """ 生成邻域解 """ neighbors = [] for _ in range(num_neighbors): neighbor = copy.deepcopy(solution) interval_idx = random.randint(0, len(solution) - 1) operation = random.choice(['replace', 'swap', 'add', 'remove']) # 替换操作 if operation == 'replace' and 'route' in neighbor[interval_idx] and neighbor[interval_idx]['route']: route_idx = random.randint(0, len(neighbor[interval_idx]['route']) - 1) new_route_idx = random.randint(0, len(self.candidate_routes) - 1) new_route = [ new_route_idx + 1, 1, self.candidate_routes[new_route_idx]['travel_time'], random.random() # 随机优先级 ] neighbor[interval_idx]['route'][route_idx] = new_route move = ('replace', interval_idx, route_idx, new_route_idx) neighbors.append((neighbor, move)) # 交换操作 elif operation == 'swap' and len(solution) > 1: interval_idx1 = random.randint(0, len(solution) - 1) interval_idx2 = random.randint(0, len(solution) - 1) if interval_idx1 != interval_idx2: if ('route' in neighbor[interval_idx1] and neighbor[interval_idx1]['route'] and 'route' in neighbor[interval_idx2] and neighbor[interval_idx2]['route']): route_idx1 = random.randint(0, len(neighbor[interval_idx1]['route']) - 1) route_idx2 = random.randint(0, len(neighbor[interval_idx2]['route']) - 1) # 交换路径 (neighbor[interval_idx1]['route'][route_idx1], neighbor[interval_idx2]['route'][route_idx2]) = ( neighbor[interval_idx2]['route'][route_idx2], neighbor[interval_idx1]['route'][route_idx1] ) move = ('swap', interval_idx1, interval_idx2, route_idx1, route_idx2) neighbors.append((neighbor, move)) # 添加操作 elif operation == 'add': new_route_idx = random.randint(0, len(self.candidate_routes) - 1) new_route = [ new_route_idx + 1, 1, self.candidate_routes[new_route_idx]['travel_time'], random.random() # 随机优先级 ] if 'route' not in neighbor[interval_idx]: neighbor[interval_idx]['route'] = [new_route] elif neighbor[interval_idx]['route'] is None: neighbor[interval_idx]['route'] = [new_route] else: neighbor[interval_idx]['route'].append(new_route) move = ('add', interval_idx, new_route_idx) neighbors.append((neighbor, move)) # 删除操作 elif operation == 'remove' and 'route' in neighbor[interval_idx] and neighbor[interval_idx]['route']: route_idx = random.randint(0, len(neighbor[interval_idx]['route']) - 1) removed_route = neighbor[interval_idx]['route'].pop(route_idx) move = ('remove', interval_idx, removed_route[0]) neighbors.append((neighbor, move)) return neighbors def is_tabu(self, move): """检查移动是否在禁忌表中""" for tabu_move in self.tabu_list: if move == tabu_move: return True return False def optimize(self): """执行禁忌搜索优化""" no_improve_count = 0 start_time = time.time() print(f"开始禁忌搜索优化,初始目标值: {self.best_objective:.2f}") print(f"{'迭代':<5} | {'当前目标值':<12} | {'历史最优':<12} | {'改进量':<10} | {'耗时(s)':<8}") print("-" * 60) for iteration in range(self.max_iter): iter_start = time.time() neighbors = self.generate_neighbors(self.current_solution, num_neighbors=20) best_neighbor = None best_neighbor_obj = float('inf') best_move = None # 评估邻域解 for neighbor, move in neighbors: if self.is_tabu(move): continue neighbor_obj = self.evaluate_solution(neighbor) if neighbor_obj < best_neighbor_obj: best_neighbor = neighbor best_neighbor_obj = neighbor_obj best_move = move # 更新当前解 if best_neighbor is not None: self.current_solution = best_neighbor self.current_objective = best_neighbor_obj self.tabu_list.append(best_move) # 更新历史最优解 if best_neighbor_obj < self.best_objective: improvement = self.best_objective - best_neighbor_obj self.improvement_history.append(improvement) self.best_solution = copy.deepcopy(best_neighbor) self.best_objective = best_neighbor_obj no_improve_count = 0 # 打印改进信息 iter_time = time.time() - iter_start print(f"{iteration + 1:<5} | {best_neighbor_obj:<12.2f} | {self.best_objective:<12.2f} | " f"+{improvement:<10.2f} | {iter_time:<8.2f}") else: no_improve_count += 1 else: no_improve_count += 1 self.objective_history.append(self.current_objective) # 提前终止条件 if no_improve_count >= self.max_no_improve: print(f"\n提前终止:连续 {no_improve_count} 次迭代无改进") break total_time = time.time() - start_time print("\n优化完成!") print(f"总迭代次数: {iteration + 1}") print(f"总耗时: {total_time:.2f}秒") print(f"初始目标值: {self.objective_history[0]:.2f}") print(f"最终目标值: {self.best_objective:.2f}") improvement_percent = ((self.objective_history[0] - self.best_objective) / self.objective_history[0]) * 100 print(f"改进幅度: {self.objective_history[0] - self.best_objective:.2f} ({improvement_percent:.2f}%)") return self.best_solution, self.best_objective def plot_optimization_progress(self): """绘制优化过程图""" plt.figure(figsize=(12, 6)) # 目标函数值变化 plt.subplot(1, 2, 1) plt.plot(self.objective_history, 'b-', linewidth=2) plt.xlabel('迭代次数') plt.ylabel('目标函数值') plt.title('目标函数优化过程') plt.grid(True) # 改进历史 if self.improvement_history: plt.subplot(1, 2, 2) plt.plot(self.improvement_history, 'go-', linewidth=2) plt.xlabel('改进次数') plt.ylabel('改进量') plt.title('每次改进的优化量') plt.grid(True) plt.tight_layout() plt.savefig('optimization_progress.png', dpi=300) plt.show() # ====================== # 执行禁忌搜索优化 # ====================== print("\n准备禁忌搜索优化...") # 准备初始解数据 initial_solution = [] for i in range(min(batch_size + plus_trip, len(s)): # 确保不越界 interval_data = { 'route': s[i].get('route', None), 'pax_asg': pax_asg[i].get('record', None) if i < len(pax_asg) else None } initial_solution.append(interval_data) # 创建禁忌搜索优化器 ts_optimizer = TabuSearchOptimizer( initial_solution=initial_solution, candidate_routes=candidate_route, travel_time_matrix=traveltime_matrix, passenger_data=chengke.tolist(), vehicle_capacity=c, headway=headway, num_vehicles=m, max_iter=50, max_no_improve=10, tabu_tenure=7 ) # 执行优化 best_solution, best_objective = ts_optimizer.optimize() ts_optimizer.plot_optimization_progress() # 输出最优解 print("\n最优解结构:") for i, interval in enumerate(best_solution): print(f"间隔 {i + 1}:") if 'route' in interval and interval['route']: for j, route in enumerate(interval['route']): print(f" 路径 {j + 1}: ID={route[0]}, 服务时间={route[2]}, 优先级={route[3]}") else: print(" 无路径") print("\n优化完成!") Traceback (most recent call last): File "F:\PycharmProjects\PythonProject1\taboo3.py", line 755, in <module> best_solution, best_objective = ts_optimizer.optimize() ^^^^^^^^^^^^^^^^^^^^^^^ File "F:\PycharmProjects\PythonProject1\taboo3.py", line 640, in optimize neighbors = self.generate_neighbors(self.current_solution, num_neighbors=20) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\PycharmProjects\PythonProject1\taboo3.py", line 615, in generate_neighbors removed_route = neighbor[interval_idx]['route'].pop(route_idx) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'numpy.ndarray' object has no attribute 'pop'. Did you mean: 'ptp'? 进程已结束,退出代码为 1

生成完成! 点数: 11 Traceback (most recent call last): File "D:\pycharm\Trial\CSDNexp001.py", line 228, in <module> main() File "D:\pycharm\Trial\CSDNexp001.py", line 183, in main preview_dxf_save(points, width, height) File "D:\pycharm\Trial\CSDNexp001.py", line 145, in preview_dxf_save fig, ax = plt.subplots(figsize=(12, 10)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\site-packages\matplotlib\pyplot.py", line 1775, in subplots fig = figure(**fig_kw) ^^^^^^^^^^^^^^^^ File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\site-packages\matplotlib\pyplot.py", line 1042, in figure manager = new_figure_manager( ^^^^^^^^^^^^^^^^^^^ File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\site-packages\matplotlib\pyplot.py", line 551, in new_figure_manager _warn_if_gui_out_of_main_thread() File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\site-packages\matplotlib\pyplot.py", line 528, in _warn_if_gui_out_of_main_thread canvas_class = cast(type[FigureCanvasBase], _get_backend_mod().FigureCanvas) ^^^^^^^^^^^^^^^^^^ File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\site-packages\matplotlib\pyplot.py", line 369, in _get_backend_mod switch_backend(rcParams._get("backend")) File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\site-packages\matplotlib\pyplot.py", line 426, in switch_backend canvas_class = module.FigureCanvas ^^^^^^^^^^^^^^^^^^^ AttributeError: module 'backend_interagg' has no attribute 'FigureCanvas'. Did you mean: 'FigureCanvasAgg'? 进程已结束,退出代码 1

最新推荐

recommend-type

APA多步垂直泊车与全局路径规划MPC控制算法联合仿真,开源版持续迭代更新

APA多步垂直泊车系统的仿真研究,重点探讨了Carsim与Matlab在自动泊车仿真中的联合应用。文章首先介绍了Carsim场景及车辆配置文件,展示了如何模拟车辆在不同道路条件下的行驶轨迹和碰撞风险。接着讨论了Simulink文件中的纵向逻辑控制,包括动力分配和刹车控制等。随后阐述了MPC横向控制算法文件的作用,即通过预测未来的系统状态来优化车辆的横向移动和控制。最后,文章讲解了路径规划算法及其全局规划方法,强调了基于规则和启发式的路径规划策略。文中提到的所有模型均开源,便于研究人员参考和学习。 适合人群:从事自动驾驶技术研发的研究人员和技术爱好者。 使用场景及目标:适用于希望深入了解自动泊车仿真技术的研究人员,特别是那些关注路径规划和MPC控制算法的人群。目标是帮助他们掌握Carsim与Matlab联合仿真的具体实现方法,从而应用于实际项目中。 其他说明:本文不仅提供了详细的理论解释,还附带了完整的开源模型,方便读者进行实践操作和进一步研究。
recommend-type

基于S7-300与组态王的污水处理厂沉淀池-V型滤池自动控制系统解析

基于西门子S7-300 PLC和组态王软件的污水处理厂沉淀池-V型滤池综合控制系统的设计与实现。主要内容涵盖梯形图程序逻辑(如液位联锁控制、定时排泥、手动急停)、接线图与IO分配细节(如超声波液位计、故障信号、急停按钮等),以及组态王的画面设计(如反冲洗流程动画)。此外,还分享了一些实际应用中的经验和教训,如硬件接线注意事项、定期维护措施等。 适用人群:从事工业自动化领域的工程师和技术人员,尤其是对PLC编程和SCADA系统有研究的人士。 使用场景及目标:适用于需要理解和实施污水处理厂自动化控制系统的场合,帮助工程师掌握S7-300 PLC与组态王软件的具体应用方法,提高系统的可靠性和稳定性。 其他说明:文中提供的实例和经验有助于避免常见错误,提升项目的成功率。同时强调了硬件选型和日常维护的重要性,确保系统长期稳定运行。
recommend-type

Pansophica开源项目:智能Web搜索代理的探索

Pansophica开源项目是一个相对较新且具有创新性的智能Web搜索代理,它突破了传统搜索引擎的界限,提供了一种全新的交互方式。首先,我们来探讨“智能Web搜索代理”这一概念。智能Web搜索代理是一个软件程序或服务,它可以根据用户的查询自动执行Web搜索,并尝试根据用户的兴趣、历史搜索记录或其他输入来提供个性化的搜索结果。 Pansophica所代表的不仅仅是搜索结果的展示,它还强调了一个交互式的体验,在动态和交互式虚拟现实中呈现搜索结果。这种呈现方式与现有的搜索体验有着根本的不同。目前的搜索引擎,如Google、Bing和Baidu等,多以静态文本和链接列表的形式展示结果。而Pansophica通过提供一个虚拟现实环境,使得搜索者可以“扭转”视角,进行“飞行”探索,以及“弹网”来浏览不同的内容。这种多维度的交互方式使得信息的浏览变得更加快速和直观,有望改变用户与网络信息互动的方式。 接着,我们关注Pansophica的“开源”属性。所谓开源,指的是软件的源代码可以被公众获取,任何个人或组织都可以自由地使用、学习、修改和分发这些代码。开源软件通常由社区进行开发和维护,这样的模式鼓励了协作创新并减少了重复性劳动,因为全世界的开发者都可以贡献自己的力量。Pansophica项目作为开源软件,意味着其他开发者可以访问和使用其源代码,进一步改进和扩展其功能,甚至可以为Pansophica构建新的应用或服务。 最后,文件名称“Pansophica-src-1.3”表明了我们讨论的特定版本的Pansophica开源代码。数字“1.3”很可能指的是该版本号,表明这是Pansophica项目的第1.3个公开版本。这个版本号对于开发者来说很重要,因为它可以帮助他们追踪更新的进度、比较版本间的差异以及选择最适合其项目的版本。 综合以上信息,Pansophica开源项目不仅在搜索代理领域引入了创新的交互方式,而且通过开源的方式,提供了技术共享和协作的可能性。对于IT行业来说,开源项目是推动技术发展的重要力量,它激发了社区的活力,促进了知识的传播和技术的进步。通过构建一个开源社区,Pansophica可以吸引来自世界各地的开发者和研究人员,共同探索如何将搜索技术与交互式虚拟现实技术结合起来,为用户带来更加丰富和有效的信息检索体验。同时,Pansophica的开源性质也使其更容易适应未来技术的发展,随着社区的不断贡献和反馈,这一平台有望在功能和性能上得到持续的优化和增强。
recommend-type

跨平台内容提取无忧:coze工作流应对社交媒体挑战

# 1. 社交媒体内容提取的必要性与挑战 ## 1.1 内容提取的必要性 在数字时代,社交媒体内容是信息与数据流动的主干道,公司、个人和研究者都在寻求从这些信息中提取价值。无论是为了市场趋势分析、品牌监控、消费者洞察还是公共舆论监测,有效的社交媒体内容提取变得至关重要。这不仅有助于企业做出数据驱动的决策,也能使个人更好地理
recommend-type

vrrp主设备发送的免费arp

<think>好的,我现在需要回答用户关于VRRP主设备发送免费ARP的目的和工作原理的问题。首先,我得回忆一下VRRP的基本概念和工作机制。VRRP,虚拟路由冗余协议,主要用于在局域网中提供默认网关的冗余备份,确保网络的高可用性。主设备(Master)负责转发流量,而备用设备(Backup)则在主设备故障时接管工作。 用户的问题集中在主设备发送免费ARP的目的和机制上。根据引用[2],免费ARP用于通知下游设备虚拟MAC地址的变更。当主设备被选举出来后,它需要让局域网内的其他设备知道虚拟IP对应的MAC地址已经指向自己,这样流量才会被正确路由到主设备。免费ARP的作用应该就是更新这些设备的
recommend-type

为Ghost博客平台打造的Meteor流星包装使用指南

从给定文件信息中,我们可以提炼出以下IT知识点: ### 标题知识点:流星Ghost软件包 1. **流星Ghost软件包的用途**:流星Ghost软件包是专为Ghost博客平台设计的流星(Meteor)应用程序。流星是一个开源的全栈JavaScript平台,用于开发高性能和易于编写的Web应用程序。Ghost是一个开源博客平台,它提供了一个简单且专业的写作环境。 2. **软件包的作用**:流星Ghost软件包允许用户在流星平台上轻松集成Ghost博客。这样做的好处是可以利用流星的实时特性以及易于开发和部署的应用程序框架,同时还能享受到Ghost博客系统的便利和美观。 ### 描述知识点:流星Ghost软件包的使用方法 1. **软件包安装方式**:用户可以通过流星的命令行工具添加名为`mrt:ghost`的软件包。`mrt`是流星的一个命令行工具,用于添加、管理以及配置软件包。 2. **初始化Ghost服务器**:描述中提供了如何在服务器启动时运行Ghost的基本代码示例。这段代码使用了JavaScript的Promise异步操作,`ghost().then(function (ghostServer) {...})`这行代码表示当Ghost服务器初始化完成后,会在Promise的回调函数中提供一个Ghost服务器实例。 3. **配置Ghost博客**:在`then`方法中,首先会获取到Ghost服务器的配置对象`config`,用户可以在此处进行自定义设置,例如修改主题、配置等。 4. **启动Ghost服务器**:在配置完成之后,通过调用`ghostServer.start()`来启动Ghost服务,使其能够处理博客相关的请求。 5. **Web浏览器导航**:一旦流星服务器启动并运行,用户便可以通过Web浏览器访问Ghost博客平台。 ### 标签知识点:JavaScript 1. **JavaScript作为流星Ghost软件包的开发语言**:标签指出流星Ghost软件包是使用JavaScript语言开发的。JavaScript是一种在浏览器端广泛使用的脚本语言,它也是流星平台的基础编程语言。 2. **流星和Ghost共同使用的语言**:JavaScript同样也是Ghost博客平台的开发语言。这表明流星Ghost软件包可以无缝集成,因为底层技术栈相同。 ### 压缩包子文件的文件名称列表知识点:meteor-ghost-master 1. **版本控制和软件包结构**:文件名称`meteor-ghost-master`暗示了该软件包可能托管在像GitHub这样的版本控制系统上。文件名中的`master`通常指的是主分支或主版本。 2. **软件包的目录结构**:通过文件名称可以推断出该软件包可能拥有一个标准的流星软件包结构,包含了初始化、配置、运行等必要的模块和文件。 3. **软件包的维护状态**:由于文件名没有包含特定的版本号,我们无法直接得知软件包的最新更新情况。通常,软件包维护者会将最新的版本代码放在`master`分支上。 ### 总结 流星Ghost软件包提供了一个有效的解决方案,使得流星平台的开发者能够在他们的应用中添加Ghost博客功能。软件包的使用简便,通过流星的命令行工具安装,并通过JavaScript代码配置和启动Ghost服务。通过流星Ghost软件包,开发者能够享受流星的实时特性以及Ghost博客系统的便利性。此外,软件包的命名和结构也暗示了其维护和版本控制的模式,有助于开发者更好地理解如何使用和维护这一软件包。
recommend-type

抖音标题生成自动化:用coze工作流释放创意

# 1. 抖音标题生成自动化的重要性 随着社交媒体平台的崛起,内容的吸引力很大程度上取决于标题的创意与精准性。抖音作为一个日活亿级的短视频平台,高质量的标题能够有效提高视频的点击率,增加内容的传播。但是,人工撰写标题不仅耗时耗力,而且很难做到快速响应热点,自动化标题生成工具应运而生。coze工作流,作为一种实现自动化生成抖音标题的工具,其重要性不言而喻。它能够利用大数据分析和机器学习技术,提高标题的吸引
recommend-type

spss消费结构因子分析

<think>好的,我现在需要帮助用户在SPSS中进行消费结构的因子分析。首先,我要回忆一下因子分析的基本步骤和SPSS的操作流程。用户可能对SPSS不太熟悉,所以步骤要详细,同时需要结合他们提供的引用内容,特别是引用[2]中的适用条件和检验方法。 首先,用户提到了消费结构的数据,这可能包括多个变量,如食品、住房、交通等支出。因子分析适用于这种情况,可以降维并找出潜在因子。根据引用[2],需要检查样本量是否足够,变量间是否有相关性,以及KMO和Bartlett检验的结果。 接下来,我需要按照步骤组织回答:数据准备、适用性检验、因子提取、因子旋转、命名解释、计算得分。每个步骤都要简明扼要,说
recommend-type

OpenMediaVault的Docker映像:快速部署与管理指南

根据提供的文件信息,我们将详细讨论与标题和描述中提及的Docker、OpenMediaVault以及如何部署OpenMediaVault的Docker镜像相关的一系列知识点。 首先,Docker是一个开源的应用容器引擎,允许开发者打包应用及其依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口(类似 iPhone 的 app)。 OpenMediaVault是一个基于Debian的NAS(网络附加存储)解决方案。它专为家庭或小型办公室提供文件共享、网络附加存储以及打印服务。它提供了一个易用的Web界面,通过这个界面用户可以管理服务器配置、网络设置、用户权限、文件服务等。 在描述中提到了一些Docker命令行操作: 1. `git clone`:用于克隆仓库到本地,这里的仓库指的是“docker-images-openmedivault”。 2. `docker build -t omv`:这是一个构建Docker镜像的命令,其中`-t`参数用于标记镜像名称和标签,这里是标记为“omv”。 3. `docker run`:运行一个容器实例,`-t`参数用于分配一个伪终端,`-i`参数用于交互式操作,`-p 80:80`则是将容器的80端口映射到宿主机的80端口。 启动服务的部分涉及OpenMediaVault的配置和初始化: - ssh服务:用于远程登录到服务器的协议。 - php5-fpm:是PHP的一个FastCGI实现,用于加速PHP的运行。 - nginx:是一个高性能的HTTP和反向代理服务器,常用于优化静态内容的分发。 - openmediavault引擎:指的是OpenMediaVault的核心服务。 - rrdcached:用于收集和缓存性能数据,这些数据可以被rrdtool图形化工具读取。 - collectd:是一个守护进程,用于收集系统性能和提供各种存储方式和传输方式来存储所收集的数据。 为了访问服务,需要在浏览器中输入"http:// IP_OF_DOCKER",其中`IP_OF_DOCKER`指的是运行Docker容器的主机IP地址。 描述中还提到了一个步骤:“在System-> Network-> Interfaces中添加带有dhcp的eth0”,这指的是需要在OpenMediaVault的Web管理界面中配置网络接口。`eth0`是网络接口的名称,通常代表第一个以太网接口。DHCP(动态主机配置协议)是一种自动为网络中的设备分配IP地址的协议,这样设备就可以连接网络并开始通信,无需手动配置IP地址。 【压缩包子文件的文件名称列表】中的“docker-images-openmediavault-master”暗示了这是一个包含Docker镜像文件的代码仓库。通常,“master”分支是代码的主分支,包含了代码库中最新且通常是最稳定的版本。用户可以通过克隆该仓库到本地来获取所有相关的Dockerfile、配置脚本及依赖文件,以便能够自行构建和运行OpenMediaVault的Docker镜像。 综上所述,这些知识点涵盖了从基本的Docker概念、Docker命令行操作、OpenMediaVault服务启动和管理,到具体的网络配置及Docker仓库操作,都是进行Docker化OpenMediaVault部署的关键步骤。
recommend-type

小红书文案提取一步到位:coze工作流操作全攻略

# 1. coze工作流概述 工作流系统是企业信息化和数字化转型的核心组件之一,它通过自动化流程管理提升效率,确保业务流程的顺畅执行。coze工作流作为当前市场上较为先进的工作流解决方案,它不仅仅是一套软件工具,更是一个集成化的平台,旨在通过流程自动化和智能化提升企业运营效率。 coze工作流的引入不仅有助于标准化和优化企业的业务流程,还可以通过可配置的流程设计,满足不同部门的特定需求。在组织的业务流程中