活动介绍

ModuleNotFoundError: No module named 'logging.handlers'

时间: 2023-08-21 12:16:51 浏览: 751
`ModuleNotFoundError: No module named 'logging.handlers'` 错误通常表示您的 Python 环境缺少所需的日志处理模块。`logging.handlers` 模块是 Python 内置的标准库模块,在大多数 Python 安装中应该是可用的。 您可以尝试以下方法来解决此问题: 1. 确保您的 Python 版本是最新的。如果您的 Python 版本较旧,可能会缺少某些模块。您可以从 Python 官方网站(https://www.python.org/downloads/)下载并安装最新的 Python 版本。 2. 检查您的环境是否正确设置。确保您的 Python 环境变量已正确配置,并且可以正确访问 Python 解释器和标准库。 3. 如果您使用的是虚拟环境,请确保您在正确的虚拟环境中安装了所需的模块。您可以使用 `pip install 模块名` 命令来安装缺失的模块。 如果上述解决方法无效,您可以提供更多关于您的操作系统、Python 版本和安装环境的详细信息,以便我能更好地帮助您解决问题。
相关问题

python3.8 liunx32位ModuleNotFoundError: No module named 'logging.handlers'

如果您在使用32位Linux系统上的Python 3.8版本遇到了`ModuleNotFoundError: No module named 'logging.handlers'`错误,可能是因为缺少`logging.handlers`模块的依赖库。 尝试按照以下步骤解决问题: 1. 确保您的操作系统和Python环境都是32位版本。如果您的操作系统是64位,可能需要安装32位的依赖库。 2. 确认您的Python环境中是否安装了标准库`logging.handlers`模块。您可以在终端中运行以下命令来检查: ``` python -c "import logging.handlers" ``` 如果没有报错,则表示模块已正确安装。如果返回`ModuleNotFoundError`错误,请继续下一步。 3. 确保您的Python环境已安装所需的依赖库。在终端中运行以下命令来安装依赖库: ``` sudo apt-get install python3.8-dev ``` 这将安装Python 3.8版本的开发库和相关依赖。 4. 如果上述步骤都没有解决问题,可能需要重新编译安装Python。您可以尝试使用源代码编译安装Python 3.8版本,并确保在编译过程中包含了`logging.handlers`模块。 下载Python 3.8源代码并解压缩后,进入解压后的目录,执行以下命令: ``` ./configure make sudo make install ``` 这将编译并安装Python 3.8版本。 如果问题仍然存在,请提供更多关于您的操作系统和Python环境的详细信息,以便我能够更好地帮助您解决问题。

E:\AI_System\web_ui>python server.py ERROR:root:❌ 配置模块导入失败: No module named 'config.config_loader'; 'config' is not a package ERROR:root:请检查以下路径是否存在: ERROR:root:1. E:\AI_System\config/config_loader.py --- Logging error --- Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 90, in <module> from cognitive_arch.life_scheduler import LifeScheduler ModuleNotFoundError: No module named 'cognitive_arch' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\u274c' in position 46: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 128, in <module> logger.error(f"❌ 生活系统模块导入失败: {str(e)}") Message: "❌ 生活系统模块导入失败: No module named 'cognitive_arch'" Arguments: () 2025-08-09 20:09:52,631 - WebServer - ERROR - ❌ 生活系统模块导入失败: No module named 'cognitive_arch' ERROR:WebServer:❌ 生活系统模块导入失败: No module named 'cognitive_arch' 2025-08-09 20:09:52,640 - WebServer - WARNING - 生活系统功能将不可用 WARNING:WebServer:生活系统功能将不可用 2025-08-09 20:09:52,640 - WebServer - INFO - ================================================== INFO:WebServer:================================================== 2025-08-09 20:09:52,641 - WebServer - INFO - 正在使用统一配置初始化智能体核心模块... INFO:WebServer:正在使用统一配置初始化智能体核心模块... 2025-08-09 20:09:52,641 - WebServer - INFO - 智能体路径: E:\AI_System\agent INFO:WebServer:智能体路径: E:\AI_System\agent 2025-08-09 20:09:52,641 - WebServer - INFO - 模型缓存目录: E:\AI_System\model_cache INFO:WebServer:模型缓存目录: E:\AI_System\model_cache 2025-08-09 20:09:52,641 - WebServer - INFO - 默认模型: minimal-model INFO:WebServer:默认模型: minimal-model 2025-08-09 20:09:52,641 - WebServer - INFO - 使用GPU: False INFO:WebServer:使用GPU: False --- Logging error --- Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 148, in <module> from agent.autonomous_agent import AutonomousAgent File "E:\AI_System\agent\__init__.py", line 5, in <module> from .autonomous_agent import AutonomousAgent # 修改此处 File "E:\AI_System\agent\autonomous_agent.py", line 10, in <module> from core.environment_interface import EnvironmentInterface ModuleNotFoundError: No module named 'core.environment_interface' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\u274c' in position 46: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 159, in <module> logger.error(f"❌ 智能体模块导入失败: {str(ie)}") Message: "❌ 智能体模块导入失败: No module named 'core.environment_interface'" Arguments: () 2025-08-09 20:09:52,650 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.environment_interface' ERROR:WebServer:❌ 智能体模块导入失败: No module named 'core.environment_interface' --- Logging error --- Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 148, in <module> from agent.autonomous_agent import AutonomousAgent File "E:\AI_System\agent\__init__.py", line 5, in <module> from .autonomous_agent import AutonomousAgent # 修改此处 File "E:\AI_System\agent\autonomous_agent.py", line 10, in <module> from core.environment_interface import EnvironmentInterface ModuleNotFoundError: No module named 'core.environment_interface' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\u23f3' in position 45: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 161, in <module> logger.info(f"⏳ {retry_delay}秒后重试...") Message: '⏳ 2秒后重试...' Arguments: () 2025-08-09 20:09:52,653 - WebServer - INFO - ⏳ 2秒后重试... INFO:WebServer:⏳ 2秒后重试... --- Logging error --- Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 148, in <module> from agent.autonomous_agent import AutonomousAgent File "E:\AI_System\agent\__init__.py", line 5, in <module> from .autonomous_agent import AutonomousAgent # 修改此处 File "E:\AI_System\agent\autonomous_agent.py", line 10, in <module> from core.environment_interface import EnvironmentInterface ModuleNotFoundError: No module named 'core.environment_interface' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\u274c' in position 46: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 159, in <module> logger.error(f"❌ 智能体模块导入失败: {str(ie)}") Message: "❌ 智能体模块导入失败: No module named 'core.environment_interface'" Arguments: () 2025-08-09 20:09:54,671 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.environment_interface' ERROR:WebServer:❌ 智能体模块导入失败: No module named 'core.environment_interface' --- Logging error --- Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 148, in <module> from agent.autonomous_agent import AutonomousAgent File "E:\AI_System\agent\__init__.py", line 5, in <module> from .autonomous_agent import AutonomousAgent # 修改此处 File "E:\AI_System\agent\autonomous_agent.py", line 10, in <module> from core.environment_interface import EnvironmentInterface ModuleNotFoundError: No module named 'core.environment_interface' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\u23f3' in position 45: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 161, in <module> logger.info(f"⏳ {retry_delay}秒后重试...") Message: '⏳ 2秒后重试...' Arguments: () 2025-08-09 20:09:54,677 - WebServer - INFO - ⏳ 2秒后重试... INFO:WebServer:⏳ 2秒后重试... --- Logging error --- Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 148, in <module> from agent.autonomous_agent import AutonomousAgent File "E:\AI_System\agent\__init__.py", line 5, in <module> from .autonomous_agent import AutonomousAgent # 修改此处 File "E:\AI_System\agent\autonomous_agent.py", line 10, in <module> from core.environment_interface import EnvironmentInterface ModuleNotFoundError: No module named 'core.environment_interface' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\u274c' in position 46: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 159, in <module> logger.error(f"❌ 智能体模块导入失败: {str(ie)}") Message: "❌ 智能体模块导入失败: No module named 'core.environment_interface'" Arguments: () 2025-08-09 20:09:56,691 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.environment_interface' ERROR:WebServer:❌ 智能体模块导入失败: No module named 'core.environment_interface' --- Logging error --- Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 148, in <module> from agent.autonomous_agent import AutonomousAgent File "E:\AI_System\agent\__init__.py", line 5, in <module> from .autonomous_agent import AutonomousAgent # 修改此处 File "E:\AI_System\agent\autonomous_agent.py", line 10, in <module> from core.environment_interface import EnvironmentInterface ModuleNotFoundError: No module named 'core.environment_interface' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\u203c' in position 49: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 164, in <module> logger.critical("‼️ 智能体模块导入彻底失败") Message: '‼️ 智能体模块导入彻底失败' Arguments: () 2025-08-09 20:09:56,697 - WebServer - CRITICAL - ‼️ 智能体模块导入彻底失败 CRITICAL:WebServer:‼️ 智能体模块导入彻底失败 2025-08-09 20:09:56,704 - WebServer - INFO - ================================================== INFO:WebServer:================================================== --- Logging error --- Traceback (most recent call last): File "E:\Python310\lib\logging\__init__.py", line 1103, in emit stream.write(msg + self.terminator) UnicodeEncodeError: 'gbk' codec can't encode character '\U0001f680' in position 45: illegal multibyte sequence Call stack: File "E:\AI_System\web_ui\server.py", line 734, in <module> logger.info(f"🚀 启动Web服务: http://{config.HOST}:{config.PORT}") Message: '🚀 启动Web服务: http://0.0.0.0:8000' Arguments: () 2025-08-09 20:09:56,704 - WebServer - INFO - 🚀 启动Web服务: http://0.0.0.0:8000 INFO:WebServer:🚀 启动Web服务: http://0.0.0.0:8000 Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 735, in <module> logger.info(f"环境: {config.ENV}") AttributeError: 'TempConfig' object has no attribute 'ENV'

<think>我们遇到了两个主要问题: 1. 模块导入失败:无法导入`config.config_loader`,`cognitive_arch.life_scheduler`,`core.environment_interface`等模块。 2. 编码问题:日志输出时,控制台编码(gbk)无法处理某些Unicode字符(如❌、⏳等)。 解决方案: 问题1:模块导入失败 原因:Python在搜索模块时,没有找到相应的模块。可能是因为: a) 模块文件不存在 b) 模块所在的目录没有添加到sys.path c) 模块文件存在,但其中没有相应的类或函数 我们已经尝试将项目根目录和几个子目录添加到sys.path,但仍然失败。因此,我们需要检查: - 项目目录结构是否正确 - 模块文件是否存在且命名正确 - 模块中的类名和函数名是否正确 根据错误信息,我们缺少以下模块: config.config_loader cognitive_arch.life_scheduler core.environment_interface 我们需要确认这些模块文件是否存在。 问题2:日志输出编码问题 原因:Windows控制台默认使用gbk编码,而日志中使用了gbk无法编码的Unicode字符(如❌、⏳等)。 解决方案:我们可以将控制台日志的编码设置为utf-8,或者避免使用这些字符。 由于在Windows控制台设置utf-8编码有时会有问题,我们可以考虑替换这些字符为纯文本,或者捕获编码错误并忽略。 具体步骤: 1. 模块导入问题: a) 检查项目目录结构,确保存在以下目录和模块文件: E:\AI_System\config\config_loader.py E:\AI_System\cognitive_arch\life_scheduler.py E:\AI_System\core\environment_interface.py b) 如果存在,检查这些模块中是否有相应的类或函数(例如,config_loader.py中是否有config对象?) c) 如果不存在,我们需要创建这些模块或调整代码。 d) 在server.py中,我们已经将项目根目录添加到sys.path,但是子目录(如config, cognitive_arch, core)可能没有被正确添加。我们可以尝试添加这些子目录的绝对路径。 2. 日志编码问题: 我们可以修改控制台日志处理器,使用一个可以处理Unicode的流。或者,我们可以替换掉这些特殊字符。 这里我们采用一个简单的方法:替换特殊字符为纯文本。 例如:❌ -> [X], ⏳ -> [...], ‼️ -> [!!], 🚀 -> [Rocket] 另外,我们也可以设置控制台编码为utf-8,但考虑到Windows控制台的兼容性,我们选择替换字符。 修改代码: 针对问题1,我们调整sys.path的添加,确保每个子模块的父目录都在sys.path中。 针对问题2,我们创建一个过滤器或者修改日志消息中的特殊字符。 但是,考虑到时间,我们先尝试修复模块导入问题,因为这是功能性的。然后,我们再解决日志编码问题。 由于错误信息中显示“config” is not a package,可能是因为在项目目录中已经有一个名为`config.py`的文件,而我们要导入的包(目录)也是`config`,这会导致冲突。所以,我们检查项目目录下是否有`config.py`文件,如果有,将其重命名为其他名称(如`config_module.py`)以避免冲突。 另外,关于`core.environment_interface`模块不存在,可能是路径问题。我们已经将`E:\AI_System\core`添加到sys.path,所以应该可以导入`environment_interface`模块。检查该文件是否存在。 由于错误信息中显示多次尝试导入智能体模块失败,我们需要检查`agent/autonomous_agent.py`中的导入语句: 在autonomous_agent.py中:`from core.environment_interface import EnvironmentInterface` 这意味着`core`包应该在sys.path中,而我们已经将`E:\AI_System\core`添加到sys.path,所以理论上可以导入。如果文件确实存在,那么可能是大小写问题(Windows不区分大小写,但Linux会)?或者文件扩展名不是.py? 由于错误信息中显示`ModuleNotFoundError: No module named 'core.environment_interface'`,所以可能是: - `core`目录下没有`environment_interface.py`文件 - `core`目录不在Python路径中 我们已经在server.py中添加了: sys.path.insert(0, os.path.join(base_dir, 'core')) 所以,请检查`E:\AI_System\core`目录下是否有`environment_interface.py`文件。 如果以上文件都不存在,我们需要创建这些文件或从其他地方获取。 考虑到时间,我们暂时在server.py中注释掉导入失败的部分,并提供一个备用方案,确保服务器至少可以启动。 但是,由于智能体初始化失败,整个系统将无法正常工作,所以我们需要优先解决模块导入问题。 由于我们无法直接创建缺失的模块,这里提供一种临时方案:如果模块不存在,则创建空模块并定义必要的类和方法(仅用于测试)。 然而,这只是一个临时解决方案,长期需要完善这些模块。 由于错误信息太多,我们决定先解决日志编码问题,以便能够正确显示错误信息。 修改日志控制台处理器,使用能够处理Unicode的编码(如utf-8)来替换默认的stderr编码。 我们可以这样修改: console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) # 设置控制台处理器的编码为utf-8 try: console_handler.setStream(open(sys.stdout.fileno(), 'w', encoding='utf-8', errors='replace')) except Exception as e: logger.error(f"无法设置控制台编码为utf-8: {e}") console_handler = logging.StreamHandler() # 回退到默认 但是,这种方法可能不适用于所有环境。另一种方法是避免使用特殊字符。 我们选择替换特殊字符: 将❌替换为[ERROR],⏳替换为[WAIT],‼️替换为[CRITICAL],🚀替换为[LAUNCH] 修改server.py中的日志字符串。 但是,为了快速解决问题,我们直接修改日志字符串,去掉特殊字符。 具体修改如下: # 将原来的错误日志中的特殊字符替换 # 例如: # logger.error(f"❌ 配置模块导入失败: {str(e)}") 改为 # logger.error(f"[ERROR] 配置模块导入失败: {str(e)}") 同时,我们添加更详细的路径打印,帮助调试模块导入问题。 另外,在导入模块之前,打印sys.path,以便查看路径设置是否正确。 我们重新组织server.py的启动部分,并添加详细的调试信息。 由于错误信息中提示`config` is not a package,可能是因为在项目根目录存在一个config.py文件,而我们又创建了一个config目录,这会导致Python无法正确识别包。所以,我们需要确保项目根目录下没有config.py文件(如果有,将其重命名或删除)。 现在,我们修改server.py: 步骤: 1. 去掉所有特殊字符的日志。 2. 在添加路径后打印sys.path。 3. 尝试导入模块时,打印模块的完整路径,看是否存在。 由于代码较长,我们只修改关键部分。 但是,由于错误信息中还有AttributeError: 'TempConfig' object has no attribute 'ENV',我们需要在TempConfig中添加ENV属性。 修改后的TempConfig: class TempConfig: # ... 其他属性 ENV = 'development' # 添加ENV属性 下面我们给出修改后的server.py的配置加载和初始化部分。 由于整个文件太长,我们只给出修改的关键部分。 注意:以下修改仅用于调试和修复启动问题。</think>您的错误日志显示了多个关键问题,我将逐一分析并提供解决方案: ### 问题分析 1. **模块导入失败**: - `No module named 'config.config_loader'` - `No module named 'cognitive_arch'` - `No module named 'core.environment_interface'` 2. **编码问题**: - `UnicodeEncodeError: 'gbk' codec can't encode character` - 日志中的Unicode字符(❌、⏳等)导致Windows控制台编码问题 3. **配置问题**: - `AttributeError: 'TempConfig' object has no attribute 'ENV'` ### 完整解决方案 ```python # E:\AI_System\web_ui\server.py import sys import os import time import logging import json import traceback import threading from flask import Flask, jsonify, request, render_template, send_from_directory, Response # ========== 解决路径问题 ========== base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, base_dir) # 添加所有必要的子目录 sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch'] for sub_dir in sub_dirs: full_path = os.path.join(base_dir, sub_dir) sys.path.insert(0, full_path) logger.info(f"添加路径: {full_path}") # 临时添加日志,调试用 # ========== 解决编码问题 ========== def safe_log(message): """安全处理日志中的Unicode字符""" try: return message.encode('utf-8', 'ignore').decode('utf-8') except: return message.replace('❌', '[ERROR]').replace('⏳', '[WAIT]').replace('‼️', '[CRITICAL]').replace('🚀', '[LAUNCH]') # ========== 配置加载 ========== class TempConfig: """临时配置类,确保所有必要属性都存在""" AGENT_PATH = os.path.join(base_dir, 'agent') WEB_UI_PATH = os.path.join(base_dir, 'web_ui') MODEL_CACHE_DIR = os.path.join(base_dir, 'model_cache') LOG_DIR = os.path.join(base_dir, 'logs') HOST = '0.0.0.0' PORT = 8000 SECRET_KEY = 'temp-secret-key' LOG_LEVEL = 'INFO' USE_GPU = False DEFAULT_MODEL = 'minimal-model' ENV = 'development' # 添加ENV属性 LIFE_SCHEDULE = { # 添加生活系统配置 "wake_up": "08:00", "breakfast": "08:30", "lunch": "12:30", "dinner": "19:00", "sleep": "23:00" } # 尝试导入配置 try: from config.config_loader import config logger.info("使用config_loader中的配置") except ImportError as e: logger.warning(f"配置模块导入失败, 使用临时配置: {str(e)}") config = TempConfig() # ========== 配置Flask应用 ========== app = Flask(__name__, template_folder='templates', static_folder='static') # 配置日志 logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.INFO)) # 日志格式 - 使用安全的Unicode处理 log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 文件日志处理器 try: from logging.handlers import TimedRotatingFileHandler file_handler = TimedRotatingFileHandler( os.path.join(config.LOG_DIR, 'web_server.log'), when='midnight', backupCount=30, encoding='utf-8' # 确保文件使用UTF-8编码 ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) except Exception as e: logger.warning(f"文件日志处理器创建失败: {str(e)}") # 控制台日志处理器 - 安全处理Unicode console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 重写logger方法以处理Unicode original_info = logger.info original_error = logger.error original_warning = logger.warning original_critical = logger.critical def safe_logger(original_func): def wrapper(msg, *args, **kwargs): try: original_func(safe_log(msg), *args, **kwargs) except UnicodeEncodeError: # 如果仍然失败,使用纯ASCII消息 ascii_msg = msg.encode('ascii', 'ignore').decode('ascii') original_func(ascii_msg, *args, **kwargs) return wrapper logger.info = safe_logger(original_info) logger.error = safe_logger(original_error) logger.warning = safe_logger(original_warning) logger.critical = safe_logger(original_critical) # ========== 模块导入安全封装 ========== def safe_import(module_name, class_name=None): """安全导入模块和类,提供详细错误信息""" try: module = __import__(module_name) if class_name: return getattr(module, class_name) return module except ImportError as e: logger.error(f"模块导入失败: {module_name}") logger.error(f"错误详情: {str(e)}") logger.error(f"搜索路径: {sys.path}") logger.error(f"当前工作目录: {os.getcwd()}") return None except AttributeError as e: logger.error(f"类不存在: {module_name}.{class_name}") return None # ========== 生活系统初始化 ========== life_scheduler = None try: LifeScheduler = safe_import('cognitive_arch.life_scheduler', 'LifeScheduler') if LifeScheduler: life_scheduler = LifeScheduler( daily_schedule=config.LIFE_SCHEDULE, log_dir=config.LOG_DIR ) logger.info("生活系统初始化成功") # 生活状态更新线程 def update_life_status(): """后台线程定期更新生活状态""" while True: try: # 获取当前时间 current_hour = time.localtime().tm_hour # 根据时间执行相应活动 if 8 <= current_hour < 9: life_scheduler.wake_up() elif 12 <= current_hour < 13: life_scheduler.have_meal("lunch") elif 19 <= current_hour < 20: life_scheduler.have_meal("dinner") elif 23 <= current_hour or current_hour < 6: life_scheduler.go_to_sleep() time.sleep(60) # 每分钟更新一次 except Exception as e: logger.error(f"生活系统更新失败: {str(e)}") time.sleep(300) # 出错后等待5分钟再重试 # 启动后台线程 life_thread = threading.Thread(target=update_life_status, daemon=True) life_thread.start() logger.info("生活系统后台线程已启动") else: logger.warning("生活系统初始化失败,功能将不可用") except Exception as e: logger.error(f"生活系统初始化异常: {str(e)}") logger.warning("生活系统功能将不可用") # ========== 智能体初始化 ========== ai_agent = None max_retries = 3 retry_delay = 2 # 秒 for attempt in range(max_retries): try: AutonomousAgent = safe_import('agent.autonomous_agent', 'AutonomousAgent') if AutonomousAgent: ai_agent = AutonomousAgent( model_path=config.AGENT_PATH, cache_dir=config.MODEL_CACHE_DIR, use_gpu=config.USE_GPU, default_model=config.DEFAULT_MODEL ) logger.info("智能体核心模块初始化成功") break else: if attempt < max_retries - 1: logger.info(f"{retry_delay}秒后重试...") time.sleep(retry_delay) else: logger.critical("智能体模块导入彻底失败") ai_agent = None except Exception as e: logger.error(f"智能体初始化失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}") if attempt < max_retries - 1: logger.info(f"{retry_delay}秒后重试...") time.sleep(retry_delay) else: logger.critical("智能体初始化彻底失败,部分功能将不可用") ai_agent = None # ========== 路由定义 ========== # 这里添加您的路由定义(与之前相同) # /life, /api/life/status, /adjust_schedule, /, /status, /chat 等 # ... [保持原有的路由定义不变] ... if __name__ == '__main__': start_time = time.time() logger.info("=" * 50) logger.info(f"启动Web服务: http://{config.HOST}:{config.PORT}") logger.info(f"环境: {config.ENV}") logger.info(f"日志级别: {config.LOG_LEVEL}") logger.info(f"生活系统: {'已启用' if life_scheduler else '未启用'}") logger.info(f"智能体: {'已启用' if ai_agent else '未启用'}") logger.info("=" * 50) try: # 使用配置中的主机和端口 app.run(host=config.HOST, port=config.PORT, threaded=True) except Exception as e: logger.critical(f"服务器启动失败: {traceback.format_exc()}") ``` ### 关键修复解释 1. **路径问题修复**: ```python # 添加所有必要的子目录 sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch'] for sub_dir in sub_dirs: full_path = os.path.join(base_dir, sub_dir) sys.path.insert(0, full_path) logger.info(f"添加路径: {full_path}") # 调试信息 ``` 2. **安全导入机制**: ```python def safe_import(module_name, class_name=None): try: module = __import__(module_name) if class_name: return getattr(module, class_name) return module except ImportError as e: # 详细错误日志 logger.error(f"模块导入失败: {module_name}") logger.error(f"错误详情: {str(e)}") logger.error(f"搜索路径: {sys.path}") return None ``` 3. **Unicode安全处理**: ```python def safe_log(message): """安全处理日志中的Unicode字符""" try: return message.encode('utf-8', 'ignore').decode('utf-8') except: return message.replace('❌', '[ERROR]').replace('⏳', '[WAIT]') # 重写logger方法 def safe_logger(original_func): def wrapper(msg, *args, **kwargs): try: original_func(safe_log(msg), *args, **kwargs) except UnicodeEncodeError: ascii_msg = msg.encode('ascii', 'ignore').decode('ascii') original_func(ascii_msg, *args, **kwargs) return wrapper logger.info = safe_logger(original_info) ``` 4. **临时配置增强**: ```python class TempConfig: # 确保包含所有必要的属性 ENV = 'development' # 修复AttributeError LIFE_SCHEDULE = { # 生活系统配置 "wake_up": "08:00", "breakfast": "08:30", "lunch": "12:30", "dinner": "19:00", "sleep": "23:00" } ``` 5. **文件日志编码修复**: ```python file_handler = TimedRotatingFileHandler( os.path.join(config.LOG_DIR, 'web_server.log'), encoding='utf-8' # 明确指定UTF-8编码 ) ``` ### 项目结构建议 确保您的项目结构包含以下目录: ``` E:\AI_System\ ├── agent\ │ ├── __init__.py │ └── autonomous_agent.py ├── config\ │ ├── __init__.py │ └── config_loader.py ├── core\ │ ├── __init__.py │ └── environment_interface.py ├── cognitive_arch\ │ ├── __init__.py │ └── life_scheduler.py ├── web_ui\ │ ├── templates\ │ │ └── life_dashboard.html │ └── server.py └── ...其他目录 ``` ### 验证步骤 1. **检查模块文件**: - 确认 `E:\AI_System\config\config_loader.py` 存在 - 确认 `E:\AI_System\cognitive_arch\life_scheduler.py` 存在 - 确认 `E:\AI_System\core\environment_interface.py` 存在 2. **添加__init__.py文件**: 在每个Python包的目录中添加空的 `__init__.py` 文件,使其成为有效的Python包 3. **运行服务器**: ```bash cd E:\AI_System\web_ui python server.py ``` 4. **验证功能**: - 访问 `http://localhost:8000/life` 查看生活仪表盘 - 访问 `http://localhost:8000/` 查看主界面 - 检查日志文件 `E:\AI_System\logs\web_server.log` 是否有错误 ### 注意事项 1. **Windows控制台编码**: 在命令提示符中运行以下命令临时切换到UTF-8编码: ```bash chcp 65001 ``` 2. **长期解决方案**: - 在系统环境变量中添加 `PYTHONUTF8=1` - 或在Python启动脚本中添加: ```python import sys import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') ``` 3. **模块开发建议**: ```python # cognitive_arch/life_scheduler.py 示例 class LifeScheduler: def __init__(self, daily_schedule, log_dir): self.daily_schedule = daily_schedule self.log_dir = log_dir def wake_up(self): print("执行起床流程") # ...其他方法... # core/environment_interface.py 示例 class EnvironmentInterface: def __init__(self): pass def explore(self, path): return {"path": path, "contents": []} ``` 这些修改应该能解决您的模块导入问题、编码问题和配置问题,使系统能够正常启动。
阅读全文

相关推荐

修改问题::Traceback (most recent call last): File "<string>", line 2, in <module> ModuleNotFoundError: No module named 'modules'其它问题:未解析的引用 'core'未解析的引用 'core'未解析的引用 'core'包含 'except ImportError' 的 try 块中的 'global_config' 也应在 except 块中定义在 '__init__.pyi' 中找不到引用 'util'在 '__init__.pyi' 中找不到引用 'util'找不到模块 'Event'找不到模块 'EventType',找不到模块 'event_center'找不到模块 'BaseModule'找不到模块 'GlobalConfig'从外部作用域隐藏名称 'module_name'从外部作用域隐藏名称 'e'从外部作用域隐藏名称 'e'从外部作用域隐藏名称 'e'代码#!/usr/bin/env python3 # -*- coding: utf-8 -*- __all__ = ['InputAnalysisModule'] # 修正导出声明 import sys import re import logging import time import threading import uuid import importlib from pathlib import Path from typing import List, Dict, Callable, Union, Optional # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent.parent)) # 修复导入路径 - 使用直接导入 try: from core import event_center, base_module, global_config from core.event_center import Event, EventType from core.base_module import BaseModule GlobalConfig = global_config.GlobalConfig UI_CONFIG = GlobalConfig.UI_CONFIG except ImportError: # 保持原有动态导入逻辑 try: project_root = Path(__file__).parent.parent for module_name in ['event_center', 'base_module', 'global_config']: spec = importlib.util.spec_from_file_location( f"core.{module_name}", project_root / "core" / f"{module_name}.py" ) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) sys.modules[f"core.{module_name}"] = module from core.event_center import Event, EventType, event_center from core.base_module import BaseModule from core.global_config import GlobalConfig except Exception as e: print(f"导入失败: {e}") sys.exit(1) # 配置日志器 logger = logging.getLogger('DLT_Module') logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( '[%(asctime)s.%(msecs)03d] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) logger.addHandler(handler) class InputAnalyzer: """输入分析器辅助类""" def __init__(self, module_name="input_analysis"): self.module_id = module_name self.module_name = module_name @staticmethod def analyze(data): return {"status": "success", "data": data} class InputAnalysisModule(BaseModule): MODULE_ID = "input_analysis" def __init__(self): if getattr(self, '_initialized', False): return super().__init__(self.MODULE_ID) self._ui_callback = None self._initialized = True logger.info(f"★ 模块初始化完成 [ID: {self.MODULE_ID}]") self._setup_event_handlers() def register_ui_callback(self, callback: Callable[[Dict], None]): """注册UI回调函数""" self._ui_callback = callback logger.info("✅ UI回调注册成功") def _setup_event_handlers(self): """配置事件处理器""" event_center.subscribe( event_type=str(EventType.MODULE_RUN), handler=self._handle_run_event, token=str(self.module_id) ) event_center.subscribe( event_type="run_analysis", handler=self._handle_run_event, token=str(self.module_id) ) logger.info("✅ 事件订阅完成") @staticmethod def _parse_numbers(input_data: Union[str, List]) -> List[int]: """解析输入数字""" if isinstance(input_data, str): return [int(n) for n in re.split(r'[, \t]+', input_data.strip()) if n] return list(map(int, input_data)) if isinstance(input_data, list) else [] def analyze(self, data: Dict) -> Dict: """执行输入分析""" try: exclude_front = self._parse_numbers(data.get('exclude_front', [])) exclude_back = self._parse_numbers(data.get('exclude_back', [])) logger.info(f"分析中... 排除: 前区{exclude_front} 后区{exclude_back}") return { 'recommended_fronts': sorted(set(range(1, 36)) - set(exclude_front))[:5], 'recommended_backs': sorted(set(range(1, 13)) - set(exclude_back))[:2], 'exclude_front': exclude_front, 'exclude_back': exclude_back } except Exception as e: logger.error("分析失败: %s", str(e), exc_info=True) raise def _handle_run_event(self, event: Event): """处理运行事件""" try: if event.target != self.module_id: logger.warning("目标模块不匹配: %s", event.target) return result = self.analyze(event.data) response = { "event_id": str(event.event_id), "status": "success", "data": { "recommended": { "front": result['recommended_fronts'], "back": result['recommended_backs'] }, "excluded": { "front": result['exclude_front'], "back": result['exclude_back'] } } } self._send_result(event, response) self._notify_ui(response) except Exception as e: error_data = { "event_id": str(getattr(event, 'event_id', 0)), "status": "error", "message": str(e) } self._notify_ui(error_data) logger.error("处理失败: %s", str(e), exc_info=True) def _send_result(self, event: Event, result: Dict): """发送结果事件""" result_event = Event( event_id=str(event.event_id), type=str(EventType.ANALYSIS_RESULT), source=str(self.module_id), target=str(event.source), data={ "recommended_fronts": result['data']['recommended']['front'], "recommended_backs": result['data']['recommended']['back'], "exclude_front": result['data']['excluded']['front'], "exclude_back": result['data']['excluded']['back'] }, token=str(self.module_id) ) event_center.publish(result_event) def _notify_ui(self, data: Dict): """通知UI更新""" if not self._ui_callback: return try: self._ui_callback(data) except Exception as e: logger.error("UI通知失败: %s", str(e)) def submit_data(self, data: str, label: str = "输入分析"): """提交分析数据""" event = Event( event_id=str(uuid.uuid4()), type=str(EventType.DATA_SUBMIT), source=str(self.module_id), target='number_pool', data={"command": "run", "label": label, "data": data}, token=str(self.module_id) ) event_center.publish(event) if __name__ == "__main__": module = InputAnalysisModule() def ui_handler(ui_data): print("\n=== UI更新 ===") print("状态:", ui_data.get("status")) if ui_data.get("status") == "success": print("推荐号码:", ui_data["data"]["recommended"]) else: print("错误:", ui_data.get("message")) print("=" * 30) module.register_ui_callback(ui_handler) test_event = Event( event_id=str(int(time.time())), type=str(EventType.MODULE_RUN), source="TEST", target="input_analysis", data={"exclude_front": "1,2,3", "exclude_back": [1]}, token="test_token" ) event_center.publish(test_event) time.sleep(1)

Microsoft Windows [版本 10.0.22631.2861] (c) Microsoft Corporation。保留所有权利。 C:\Users\Administrator>cd /d E:\AI_System\web_ui E:\AI_System\web_ui>python server.py 2025-08-09 21:59:53,638 - StartupLogger - INFO - 项目根目录: E:\AI_System 2025-08-09 21:59:53,639 - StartupLogger - INFO - 添加路径: E:\AI_System\agent 2025-08-09 21:59:53,639 - StartupLogger - INFO - 添加路径: E:\AI_System\core 2025-08-09 21:59:53,639 - StartupLogger - INFO - 添加路径: E:\AI_System\utils 2025-08-09 21:59:53,639 - StartupLogger - INFO - 添加路径: E:\AI_System\config 2025-08-09 21:59:53,639 - StartupLogger - INFO - 添加路径: E:\AI_System\cognitive_arch 2025-08-09 21:59:53,642 - StartupLogger - ERROR - ❌ 配置模块导入失败: No module named 'config.config_loader'; 'config' is not a package 2025-08-09 21:59:53,642 - StartupLogger - WARNING - ⚠️ 使用临时配置 2025-08-09 21:59:53,642 - StartupLogger - INFO - 请检查以下路径是否存在: 2025-08-09 21:59:53,642 - StartupLogger - INFO - 1. E:\AI_System\config\config_loader.py 2025-08-09 21:59:53,642 - StartupLogger - INFO - 2. E:\AI_System\config\__init__.py 2025-08-09 21:59:53,643 - WebServer - ERROR - ❌ 生活系统初始化失败: LifeScheduler() takes no arguments Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 159, in <module> life_scheduler = LifeScheduler( TypeError: LifeScheduler() takes no arguments 2025-08-09 21:59:53,644 - WebServer - WARNING - ⚠️ 生活系统功能将不可用 2025-08-09 21:59:53,644 - WebServer - INFO - ================================================== 2025-08-09 21:59:53,644 - WebServer - INFO - 正在使用统一配置初始化智能体核心模块... 2025-08-09 21:59:53,644 - WebServer - INFO - 智能体路径: E:\AI_System\agent 2025-08-09 21:59:53,644 - WebServer - INFO - 模型缓存目录: E:\AI_System\model_cache 2025-08-09 21:59:53,644 - WebServer - INFO - 默认模型: minimal-model 2025-08-09 21:59:53,644 - WebServer - INFO - 使用GPU: False 2025-08-09 21:59:53,647 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.config_loader' 2025-08-09 21:59:53,647 - WebServer - INFO - ⏳ 2秒后重试... 2025-08-09 21:59:55,662 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.config_loader' 2025-08-09 21:59:55,663 - WebServer - INFO - ⏳ 2秒后重试... 2025-08-09 21:59:57,676 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.config_loader' 2025-08-09 21:59:57,676 - WebServer - CRITICAL - ‼️ 智能体模块导入彻底失败 2025-08-09 21:59:57,682 - WebServer - INFO - ================================================== 2025-08-09 21:59:57,682 - WebServer - INFO - 🚀 启动Web服务: http://0.0.0.0:8000 2025-08-09 21:59:57,682 - WebServer - INFO - 环境: development 2025-08-09 21:59:57,683 - WebServer - INFO - 日志级别: INFO 2025-08-09 21:59:57,683 - WebServer - INFO - 生活系统: ❌ 未启用 2025-08-09 21:59:57,683 - WebServer - INFO - 智能体: ❌ 未启用 2025-08-09 21:59:57,683 - WebServer - INFO - ================================================== * Serving Flask app 'server' * Debug mode: on WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Running on all addresses (0.0.0.0) * Running on http://127.0.0.1:8000 * Running on http://192.168.10.146:8000 Press CTRL+C to quit * Restarting with stat 2025-08-09 21:59:57,817 - StartupLogger - INFO - 项目根目录: E:\AI_System 2025-08-09 21:59:57,817 - StartupLogger - INFO - 添加路径: E:\AI_System\agent 2025-08-09 21:59:57,817 - StartupLogger - INFO - 添加路径: E:\AI_System\core 2025-08-09 21:59:57,817 - StartupLogger - INFO - 添加路径: E:\AI_System\utils 2025-08-09 21:59:57,817 - StartupLogger - INFO - 添加路径: E:\AI_System\config 2025-08-09 21:59:57,817 - StartupLogger - INFO - 添加路径: E:\AI_System\cognitive_arch 2025-08-09 21:59:57,821 - StartupLogger - ERROR - ❌ 配置模块导入失败: No module named 'config.config_loader'; 'config' is not a package 2025-08-09 21:59:57,821 - StartupLogger - WARNING - ⚠️ 使用临时配置 2025-08-09 21:59:57,821 - StartupLogger - INFO - 请检查以下路径是否存在: 2025-08-09 21:59:57,821 - StartupLogger - INFO - 1. E:\AI_System\config\config_loader.py 2025-08-09 21:59:57,821 - StartupLogger - INFO - 2. E:\AI_System\config\__init__.py 2025-08-09 21:59:57,822 - WebServer - ERROR - ❌ 生活系统初始化失败: LifeScheduler() takes no arguments Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 159, in <module> life_scheduler = LifeScheduler( TypeError: LifeScheduler() takes no arguments 2025-08-09 21:59:57,823 - WebServer - WARNING - ⚠️ 生活系统功能将不可用 2025-08-09 21:59:57,823 - WebServer - INFO - ================================================== 2025-08-09 21:59:57,823 - WebServer - INFO - 正在使用统一配置初始化智能体核心模块... 2025-08-09 21:59:57,823 - WebServer - INFO - 智能体路径: E:\AI_System\agent 2025-08-09 21:59:57,823 - WebServer - INFO - 模型缓存目录: E:\AI_System\model_cache 2025-08-09 21:59:57,823 - WebServer - INFO - 默认模型: minimal-model 2025-08-09 21:59:57,824 - WebServer - INFO - 使用GPU: False 2025-08-09 21:59:57,825 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.config_loader' 2025-08-09 21:59:57,825 - WebServer - INFO - ⏳ 2秒后重试... 2025-08-09 21:59:59,833 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.config_loader' 2025-08-09 22:00:00,271 - WebServer - INFO - ⏳ 2秒后重试... 2025-08-09 22:00:03,084 - WebServer - ERROR - ❌ 智能体模块导入失败: No module named 'core.config_loader' 2025-08-09 22:00:03,084 - WebServer - CRITICAL - ‼️ 智能体模块导入彻底失败 2025-08-09 22:00:03,092 - WebServer - INFO - ================================================== 2025-08-09 22:00:03,092 - WebServer - INFO - 🚀 启动Web服务: http://0.0.0.0:8000 2025-08-09 22:00:03,093 - WebServer - INFO - 环境: development 2025-08-09 22:00:03,094 - WebServer - INFO - 日志级别: INFO 2025-08-09 22:00:03,095 - WebServer - INFO - 生活系统: ❌ 未启用 2025-08-09 22:00:03,095 - WebServer - INFO - 智能体: ❌ 未启用 2025-08-09 22:00:03,095 - WebServer - INFO - ================================================== * Debugger is active! * Debugger PIN: 762-269-022

# -*- coding: utf-8 -*- import sys import os import cv2 import numpy as np import time from PyQt5.QtWidgets import ( QApplication, QMainWindow, QPushButton, QWidget, QVBoxLayout, QHBoxLayout, QMessageBox, QLabel, QFileDialog, QToolBar, QComboBox, QStatusBar, QGroupBox, QSlider, QDockWidget, QProgressDialog, QLineEdit, QRadioButton, QGridLayout, QSpinBox ) from PyQt5.QtCore import QRect, Qt, QSettings, QThread, pyqtSignal #sys.path.append("D:\\海康\\MVS\\Development\\Samples\\Python\\wanzheng.py") import ctypes from ctypes import cast, POINTER from datetime import datetime import logging import socket import serial import skimage import platform from mv_import.Cam_eraConstants import * import threading import time import types # 在导入Cam_Operation_class之前添加 import importlib # clothing_inspection.py # 强制重载关键模块 def reload_camera_modules(): modules_to_reload = [ 'MvImport.CamOperation_class', 'MvImport.MvCameraControl_class', 'MvImport.Camera_Params_header', 'MvImport.MvErrorDefine_const' ] for module_name in modules_to_reload: if module_name in sys.modules: print(f"强制重载模块: {module_name}") module = sys.modules[module_name] importlib.reload(module) # 清除缓存 for key in list(sys.modules.keys()): if key.startswith('MvImport'): del sys.modules[key] print("模块重载完成") # 在导入前调用 reload_camera_modules() from mv_import.Cam_Operation_class import CameraOperation import mv_import.Camera_Params_const # 设置Python包环境变量 os.environ["PACKAGE_CONTEXT"] = "main" # 标识当前在包环境中 # 获取当前脚本所在目录 (main目录) current_dir = os.path.dirname(os.path.abspath(__file__)) # 添加main目录到Python路径 if current_dir not in sys.path: sys.path.insert(0, current_dir) # 添加MvImport包路径 mv_import_path = os.path.join(current_dir, "MvImport") if mv_import_path not in sys.path: sys.path.append(mv_import_path) # 强制设置包上下文 import MvImport MvImport.__package__ = "MvImport" # 显式设置包名 # 程序主要逻辑... # 获取当前脚本所在目录 (main目录) current_dir = os.path.dirname(os.path.abspath(__file__)) # 添加main目录到Python路径 if current_dir not in sys.path: sys.path.insert(0, current_dir) # 添加MvImport包路径 mv_import_path = os.path.join(current_dir, "MvImport") if mv_import_path not in sys.path: sys.path.append(mv_import_path) # 加载必要的DLL - 根据实际路径调整 dll_path = os.path.join(current_dir, "..", "dlls", "MvCameraControl.dll") try: ctypes.CDLL(dll_path) except OSError as e: print(f"无法加载相机SDK: {e}") sys.exit(1) class PathFixer: """自动修复模块导入路径的工具类""" def __init__(self): self.original_path = sys.path.copy() self.project_root = self.find_project_root() self.add_project_path() def find_project_root(self): """自动定位项目根目录""" # 获取当前脚本所在目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # 向上查找包含 .git 或 requirements.txt 的目录 root = current_dir while not self.is_project_root(root) and root != os.path.dirname(root): root = os.path.dirname(root) return root def is_project_root(self, path): """检查目录是否包含项目标志文件""" return ( os.path.exists(os.path.join(path, ".git")) or os.path.exists(os.path.join(path, "requirements.txt")) or os.path.exists(os.path.join(path, "README.md")) ) def add_project_path(self): """添加项目路径到sys.path""" if self.project_root not in sys.path: sys.path.insert(0, self.project_root) print(f"已添加项目根目录到路径: {self.project_root}") def restore_path(self): """恢复原始路径设置""" sys.path = self.original_path def setup_path(): """确保项目根目录在模块搜索路径中""" # 获取当前脚本所在目录 script_dir = os.path.dirname(os.path.abspath(__file__)) # 添加项目根目录到模块搜索路径 if script_dir not in sys.path: sys.path.insert(0, script_dir) print(f"已添加路径到 sys.path: {script_dir}") # 调试信息 print("\n当前工作目录:", os.getcwd()) print("Python 模块搜索路径:") for path in sys.path: print(f" - {path}") # 在导入其他模块前设置路径 setup_path() class ManagedThread(threading.Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._stop_event = threading.Event() # 设置为非守护线程 self.daemon = False def stop(self): """安全停止线程""" self._stop_event.set() def should_stop(self): """检查是否应该停止""" return self._stop_event.is_set() def worker(): """线程工作函数""" try: while not threading.current_thread().should_stop(): # 模拟工作 time.sleep(1) # 安全输出 sys.stdout.write("Working...\n") except Exception as e: # 避免在关闭时使用stderr pass def main(): # 创建并启动线程 threads = [] for _ in range(3): t = ManagedThread(target=worker) t.start() threads.append(t) try: # 主程序逻辑 time.sleep(5) finally: # 安全停止所有线程 for t in threads: t.stop() for t in threads: t.join(timeout=2.0) # 设置超时避免无限等待 # 确保所有输出完成 sys.stdout.flush() sys.stderr.flush() # 在导入部分添加 from MvImport.CameraParams_header import ( MV_GIGE_DEVICE, MV_USB_DEVICE, MV_GENTL_CAMERALINK_DEVICE, MV_GENTL_CXP_DEVICE, MV_GENTL_XOF_DEVICE ) # 获取当前文件所在目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # ===== 路径修复 ===== sdk_path = os.path.join(current_dir, "MvImport") if sdk_path not in sys.path: sys.path.append(sdk_path) def fix_sdk_path(): """修复海康SDK的加载路径""" if getattr(sys, 'frozen', False): # 打包模式 base_path = sys._MEIPASS # 添加DLL目录到系统路径 dll_path = os.path.join(base_path, "dlls") os.environ['PATH'] = dll_path + os.pathsep + os.environ['PATH'] try: # 直接加载DLL ctypes.WinDLL(os.path.join(dll_path, "MvCamCtrldll.dll")) ctypes.WinDLL(os.path.join(dll_path, "MvCameraControl.dll")) except OSError as e: logging.error(f"核心DLL加载失败: {e}") sys.exit(1) else: # 开发模式 # 确保SDK路径存在 if sdk_path not in sys.path: sys.path.append(sdk_path) # 添加DLL到系统路径 dll_dir = r"D:\海康\MVS\Runtime\Win64" if dll_dir not in os.environ['PATH']: os.environ['PATH'] = dll_dir + os.pathsep + os.environ['PATH'] # 立即执行路径修复 fix_sdk_path() # ===== 正确导入SDK模块 ===== try: from MvImport.MvCameraControl_class import MvCamera print("成功导入MvCamera类") from MvImport.CameraParams_header import * from MvImport.MvErrorDefine_const import * except ImportError as e: print(f"SDK导入失败: {e}") sys.exit(1) # 配置日志系统 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("cloth_inspection_debug.log"), logging.StreamHandler() ] ) logging.info("布料印花检测系统启动") # 全局变量 current_sample_path = "" detection_history = [] isGrabbing = False isOpen = False obj_cam_operation = None frame_monitor_thread = None sensor_monitor_thread = None sensor_controller = None MV_OK = 0 MV_E_CALLORDER = -2147483647 # ==================== 传感器通讯模块 ==================== class SensorController: def __init__(self): self.connected = False self.running = False self.connection = None def connect(self, config): try: if config['type'] == 'serial': self.connection = serial.Serial( port=config['port'], baudrate=config['baudrate'], timeout=config.get('timeout', 1.0) ) else: self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connection.connect((config['ip'], config['port'])) self.connection.settimeout(config.get('timeout', 1.0)) self.connected = True self.running = True logging.info(f"传感器连接成功: {config}") return True except Exception as e: logging.error(f"传感器连接失败: {str(e)}") return False def disconnect(self): if self.connection: try: self.connection.close() except: pass self.connection = None self.connected = False self.running = False logging.info("传感器已断开") def read_data(self): if not self.connected: return None return { 'tension': np.random.uniform(10.0, 20.0), 'speed': np.random.uniform(1.0, 5.0), 'temperature': np.random.uniform(20.0, 30.0), 'humidity': np.random.uniform(40.0, 60.0) } def wait_for_material(self, delay_seconds=0): if not self.connected: logging.warning("未连接传感器,跳过等待") return False logging.info(f"等待布料到达,延迟 {delay_seconds} 秒") start_time = time.time() while time.time() - start_time < delay_seconds: QThread.msleep(100) if not self.running: return False logging.info("布料已到位,准备拍摄") return True class SensorMonitorThread(QThread): data_updated = pyqtSignal(dict) def __init__(self, sensor_controller): super().__init__() self.sensor_controller = sensor_controller self.running = True def run(self): while self.running: if self.sensor_controller and self.sensor_controller.connected: try: data = self.sensor_controller.read_data() if data: self.data_updated.emit(data) except Exception as e: logging.error(f"传感器数据读取错误: {str(e)}") QThread.msleep(500) def stop(self): self.running = False self.wait(2000) def wait_for_material(self, delay_seconds): return self.sensor_controller.wait_for_material(delay_seconds) # ==================== 相机帧监控线程 ==================== class FrameMonitorThread(QThread): frame_status = pyqtSignal(str) # 用于发送状态消息的信号 def __init__(self, cam_operation): super().__init__() self.cam_operation = cam_operation self.running = True self.frame_count = 0 self.last_time = time.time() def run(self): """监控相机帧状态的主循环""" while self.running: try: if self.cam_operation and self.cam_operation.is_grabbing: # 获取帧统计信息 frame_info = self.get_frame_info() if frame_info: fps = frame_info.get('fps', 0) dropped = frame_info.get('dropped', 0) status = f"FPS: {fps:.1f} | 丢帧: {dropped}" self.frame_status.emit(status) else: self.frame_status.emit("取流中...") else: self.frame_status.emit("相机未取流") except Exception as e: self.frame_status.emit(f"监控错误: {str(e)}") # 每500ms检查一次 QThread.msleep(500) def stop(self): """停止监控线程""" self.running = False self.wait(1000) # 等待线程结束 def calculate_fps(self): """计算当前帧率""" current_time = time.time() elapsed = current_time - self.last_time if elapsed > 0: fps = self.frame_count / elapsed self.frame_count = 0 self.last_time = current_time return fps return 0 def get_frame_info(self): """获取帧信息""" try: # 更新帧计数 self.frame_count += 1 # 返回帧信息 return { 'fps': self.calculate_fps(), 'dropped': 0 # 实际应用中需要从相机获取真实丢帧数 } except Exception as e: logging.error(f"获取帧信息失败: {str(e)}") return None # ==================== 优化后的检测算法 ==================== def enhanced_check_print_quality(sample_image_path, test_image, threshold=0.05, sensor_data=None): if sensor_data: speed_factor = min(1.0 + sensor_data['speed'] * 0.1, 1.5) env_factor = 1.0 + abs(sensor_data['temperature'] - 25) * 0.01 + abs(sensor_data['humidity'] - 50) * 0.005 adjusted_threshold = threshold * speed_factor * env_factor logging.info(f"根据传感器数据调整阈值: 原始={threshold:.4f}, 调整后={adjusted_threshold:.4f}") else: adjusted_threshold = threshold try: sample_img_data = np.fromfile(sample_image_path, dtype=np.uint8) sample_image = cv2.imdecode(sample_img_data, cv2.IMREAD_GRAYSCALE) if sample_image is None: logging.error(f"无法解码样本图像: {sample_image_path}") return None, None, None except Exception as e: logging.exception(f"样本图像读取异常: {str(e)}") return None, None, None if len(test_image.shape) == 3: test_image_gray = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY) else: test_image_gray = test_image.copy() sample_image = cv2.GaussianBlur(sample_image, (5, 5), 0) test_image_gray = cv2.GaussianBlur(test_image_gray, (5, 5), 0) try: orb = cv2.ORB_create(nfeatures=200) keypoints1, descriptors1 = orb.detectAndCompute(sample_image, None) keypoints2, descriptors2 = orb.detectAndCompute(test_image_gray, None) if descriptors1 is None or descriptors2 is None: logging.warning("无法提取特征描述符,跳过配准") aligned_sample = sample_image else: bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) matches = bf.match(descriptors1, descriptors2) matches = sorted(matches, key=lambda x: x.distance) if len(matches) > 10: src_pts = np.float32([keypoints1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2) dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2) H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) if H is not None: aligned_sample = cv2.warpPerspective( sample_image, H, (test_image_gray.shape[1], test_image_gray.shape[0]) ) logging.info("图像配准成功,使用配准后样本") else: aligned_sample = sample_image logging.warning("无法计算单应性矩阵,使用原始样本") else: aligned_sample = sample_image logging.warning("特征点匹配不足,跳过图像配准") except Exception as e: logging.error(f"图像配准失败: {str(e)}") aligned_sample = sample_image try: if aligned_sample.shape != test_image_gray.shape: test_image_gray = cv2.resize(test_image_gray, (aligned_sample.shape[1], aligned_sample.shape[0])) except Exception as e: logging.error(f"图像调整大小失败: {str(e)}") return None, None, None try: from skimage.metrics import structural_similarity as compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True, data_range=255 ) except ImportError: from skimage.measure import compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True ) except Exception as e: logging.error(f"SSIM计算失败: {str(e)}") abs_diff = cv2.absdiff(aligned_sample, test_image_gray) ssim_diff = abs_diff.astype(np.float32) / 255.0 ssim_score = 1.0 - np.mean(ssim_diff) ssim_diff = (1 - ssim_diff) * 255 abs_diff = cv2.absdiff(aligned_sample, test_image_gray) combined_diff = cv2.addWeighted(ssim_diff.astype(np.uint8), 0.7, abs_diff, 0.3, 0) _, thresholded = cv2.threshold(combined_diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((3, 3), np.uint8) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_OPEN, kernel) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_CLOSE, kernel) diff_pixels = np.count_nonzero(thresholded) total_pixels = aligned_sample.size diff_ratio = diff_pixels / total_pixels is_qualified = diff_ratio <= adjusted_threshold marked_image = cv2.cvtColor(test_image_gray, cv2.COLOR_GRAY2BGR) marked_image[thresholded == 255] = [0, 0, 255] labels = skimage.measure.label(thresholded) properties = skimage.measure.regionprops(labels) for prop in properties: if prop.area > 50: y, x = prop.centroid cv2.putText(marked_image, f"Defect", (int(x), int(y)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1) return is_qualified, diff_ratio, marked_image # ==================== 传感器控制的质量检测流程 ==================== def sensor_controlled_check(): global isGrabbing, obj_cam_operation, current_sample_path, detection_history, sensor_controller logging.info("质量检测启动") sensor_data = None if sensor_controller and sensor_controller.connected: sensor_data = sensor_controller.read_data() if not sensor_data: QMessageBox.warning(mainWindow, "传感器警告", "无法读取传感器数据,将使用默认参数", QMessageBox.Ok) else: logging.info("未连接传感器,使用默认参数检测") check_print_with_sensor(sensor_data) def check_print_with_sensor(sensor_data=None): global isGrabbing, obj_cam_operation, current_sample_path, detection_history logging.info("检测印花质量按钮按下") if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return if not obj_cam_operation: QMessageBox.warning(mainWindow, "错误", "相机未正确初始化!", QMessageBox.Ok) return if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(mainWindow, "错误", "请先设置有效的标准样本图像!", QMessageBox.Ok) return progress = QProgressDialog("正在检测...", "取消", 0, 100, mainWindow) progress.setWindowModality(Qt.WindowModal) progress.setValue(10) try: test_image = obj_cam_operation.get_current_frame() progress.setValue(30) if test_image is None: QMessageBox.warning(mainWindow, "错误", "无法获取当前帧图像!", QMessageBox.Ok) return diff_threshold = mainWindow.sliderDiffThreshold.value() / 100.0 logging.info(f"使用差异度阈值: {diff_threshold}") progress.setValue(50) is_qualified, diff_ratio, marked_image = enhanced_check_print_quality( current_sample_path, test_image, threshold=diff_threshold, sensor_data=sensor_data ) progress.setValue(70) if is_qualified is None: QMessageBox.critical(mainWindow, "检测错误", "检测失败,请检查日志", QMessageBox.Ok) return logging.info(f"检测结果: 合格={is_qualified}, 差异={diff_ratio}") progress.setValue(90) update_diff_display(diff_ratio, is_qualified) result_text = f"印花是否合格: {'合格' if is_qualified else '不合格'}\n差异占比: {diff_ratio*100:.2f}%\n阈值: {diff_threshold*100:.2f}%" QMessageBox.information(mainWindow, "检测结果", result_text, QMessageBox.Ok) if marked_image is not None: cv2.imshow("缺陷标记结果", marked_image) cv2.waitKey(0) cv2.destroyAllWindows() detection_result = { 'timestamp': datetime.now(), 'qualified': is_qualified, 'diff_ratio': diff_ratio, 'threshold': diff_threshold, 'sensor_data': sensor_data if sensor_data else {} } detection_history.append(detection_result) update_history_display() progress.setValue(100) except Exception as e: logging.exception("印花检测失败") QMessageBox.critical(mainWindow, "检测错误", f"检测过程中发生错误: {str(e)}", QMessageBox.Ok) finally: progress.close() def update_diff_display(diff_ratio, is_qualified): mainWindow.lblCurrentDiff.setText(f"当前差异度: {diff_ratio*100:.2f}%") if is_qualified: mainWindow.lblDiffStatus.setText("状态: 合格") mainWindow.lblDiffStatus.setStyleSheet("color: green; font-size: 12px;") else: mainWindow.lblDiffStatus.setText("状态: 不合格") mainWindow.lblDiffStatus.setStyleSheet("color: red; font-size: 12px;") def update_diff_threshold(value): mainWindow.lblDiffValue.setText(f"{value}%") def save_sample_image(): global isGrabbing, obj_cam_operation, current_sample_path if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return # 检查是否有可用帧 if not obj_cam_operation.is_frame_available(): QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"sample_{timestamp}" file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存标准样本图像", os.path.join(last_dir, default_filename), "BMP Files (*.bmp);;PNG Files (*.png);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return file_extension = os.path.splitext(file_path)[1].lower() if not file_extension: if "BMP" in selected_filter: file_path += ".bmp" elif "PNG" in selected_filter: file_path += ".png" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" else: file_path += ".bmp" file_extension = os.path.splitext(file_path)[1].lower() format_mapping = {".bmp": "bmp", ".png": "png", ".jpg": "jpg", ".jpeg": "jpg"} save_format = format_mapping.get(file_extension) if not save_format: QMessageBox.warning(mainWindow, "错误", "不支持的文件格式!", QMessageBox.Ok) return directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) except OSError as e: QMessageBox.critical(mainWindow, "目录创建错误", f"无法创建目录 {directory}: {str(e)}", QMessageBox.Ok) return try: ret = obj_cam_operation.save_image(file_path, save_format) if ret != MV_OK: strError = f"保存样本图像失败: {hex(ret)}" QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) else: QMessageBox.information(mainWindow, "成功", f"标准样本已保存至:\n{file_path}", QMessageBox.Ok) current_sample_path = file_path update_sample_display() settings.setValue("last_save_dir", os.path.dirname(file_path)) except Exception as e: QMessageBox.critical(mainWindow, "异常错误", f"保存图像时发生错误: {str(e)}", QMessageBox.Ok) def preview_sample(): global current_sample_path if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(mainWindow, "错误", "请先设置有效的标准样本图像!", QMessageBox.Ok) return try: img_data = np.fromfile(current_sample_path, dtype=np.uint8) sample_img = cv2.imdecode(img_data, cv2.IMREAD_COLOR) if sample_img is None: raise Exception("无法加载图像") cv2.imshow("标准样本预览", sample_img) cv2.waitKey(0) cv2.destroyAllWindows() except Exception as e: QMessageBox.warning(mainWindow, "错误", f"预览样本失败: {str(e)}", QMessageBox.Ok) def update_sample_display(): global current_sample_path if current_sample_path: mainWindow.lblSamplePath.setText(f"当前样本: {os.path.basename(current_sample_path)}") mainWindow.lblSamplePath.setToolTip(current_sample_path) mainWindow.bnPreviewSample.setEnabled(True) else: mainWindow.lblSamplePath.setText("当前样本: 未设置样本") mainWindow.bnPreviewSample.setEnabled(False) def update_history_display(): global detection_history mainWindow.cbHistory.clear() for i, result in enumerate(detection_history[-10:]): timestamp = result['timestamp'].strftime("%H:%M:%S") status = "合格" if result['qualified'] else "不合格" ratio = f"{result['diff_ratio']*100:.2f}%" mainWindow.cbHistory.addItem(f"[极客{timestamp}] {status} - 差异: {ratio}") def TxtWrapBy(start_str, end, all): start = all.find(start_str) if start >= 0: start += len(start_str) end = all.find(end, start) if end >= 0: return all[start:end].strip() def ToHexStr(num): if not isinstance(num, int): try: num = int(num) except: return f"<非整数:{type(num)}>" chaDic = {10: 'a', 11: 'b', 12: 'c', 13: 'd', 14: 'e', 15: 'f'} hexStr = "" if num < 0: num = num + 2 ** 32 while num >= 16: digit = num % 16 hexStr = chaDic.get(digit, str(digit)) + hexStr num //= 16 hexStr = chaDic.get(num, str(num)) + hexStr return "0x" + hexStr def decoding_char(c_ubyte_value): c_char_p_value = ctypes.cast(c_ubyte_value, ctypes.c_char_p) try: decode_str = c_char_p_value.value.decode('gbk') except UnicodeDecodeError: decode_str = str(c_char_p_value.value) return decode_str def enum_devices(): global deviceList, obj_cam_operation n_layer_type = ( MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE ) # 创建设备列表 deviceList = MV_CC_DEVICE_INFO_LIST() # 枚举设备 ret = MvCamera.MV_CC_EnumDevices(n_layer_type, deviceList) if ret != MV_OK: error_msg = f"枚举设备失败! 错误码: 0x{ret:x}" logging.error(error_msg) QMessageBox.warning(mainWindow, "错误", error_msg, QMessageBox.Ok) return ret if deviceList.nDeviceNum == 0: QMessageBox.warning(mainWindow, "提示", "未找到任何设备", QMessageBox.Ok) return MV_OK logging.info(f"找到 {deviceList.nDeviceNum} 个设备") # 处理设备信息 devList = [] for i in range(deviceList.nDeviceNum): # 获取设备信息 mvcc_dev_info = ctypes.cast( deviceList.pDeviceInfo[i], ctypes.POINTER(MV_CC_DEVICE_INFO) ).contents # 根据设备类型提取信息 if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: st_gige_info = mvcc_dev_info.SpecialInfo.stGigEInfo ip_addr = ( f"{(st_gige_info.nCurrentIp >> 24) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 16) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 8) & 0xFF}." f"{st_gige_info.nCurrentIp & 0xFF}" ) # 修复:将c_ubyte_Array_16转换为字节串再解码 user_defined_bytes = bytes(st_gige_info.chUserDefinedName) dev_name = f"GigE: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} ({ip_addr})") elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: st_usb_info = mvcc_dev_info.SpecialInfo.stUsb3VInfo serial = bytes(st_usb_info.chSerialNumber).decode('ascii', 'ignore').rstrip('\x00') # 修复:同样处理用户自定义名称 user_defined_bytes = bytes(st_usb_info.chUserDefinedName) dev_name = f"USB: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} (SN: {serial})") else: devList.append(f"[{i}] 未知设备类型: {mvcc_dev_info.nTLayerType}") # 更新UI mainWindow.ComboDevices.clear() mainWindow.ComboDevices.addItems(devList) if devList: mainWindow.ComboDevices.setCurrentIndex(0) mainWindow.statusBar().showMessage(f"找到 {deviceList.nDeviceNum} 个设备", 3000) return MV_OK # clothing_inspection.py def initialize_camera(): from MvImport.CamOperation_class import CameraOperation # 创建相机操作实例 cam_operator = CameraOperation() # 打开设备 - 添加错误处理 try: if cam_operator.open_device(): print("相机设备已成功打开") else: print("无法打开相机设备") except AttributeError as e: print(f"方法调用错误: {e}") # 检查类实现 if not hasattr(CameraOperation, 'open_device'): print("错误: CameraOperation 类缺少 open_device 方法") # 动态添加方法作为临时解决方案 from types import MethodType def temp_open_device(self): print("警告: 使用临时open_device实现") return False CameraOperation.open_device = MethodType(temp_open_device, CameraOperation) cam_operator.open_device() # ===== 关键改进:相机操作函数 ===== def open_device(): global deviceList, nSelCamIndex, obj_cam_operation, isOpen, frame_monitor_thread, mainWindow if isOpen: QMessageBox.warning(mainWindow, "Error", '相机已打开!', QMessageBox.Ok) return MV_E_CALLORDER nSelCamIndex = mainWindow.ComboDevices.currentIndex() if nSelCamIndex < 0: QMessageBox.warning(mainWindow, "Error", '请选择相机!', QMessageBox.Ok) return MV_E_CALLORDER # 创建相机控制对象 cam = MvCamera() # 初始化相机操作对象 - 确保传入有效的相机对象 obj_cam_operation = CameraOperation(cam, deviceList, nSelCamIndex) # 调试信息 logging.debug(f"CameraOperation类方法: {dir(CameraOperation)}") logging.debug(f"obj_cam_operation实例方法: {dir(obj_cam_operation)}") # 如果实例缺少方法,但类有该方法,手动绑定 if hasattr(CameraOperation, 'open_device') and not hasattr(obj_cam_operation, 'open_device'): logging.warning("实例缺少open_device方法,手动绑定...") obj_cam_operation.Open_device = types.MethodType(CameraOperation.Open_device, obj_cam_operation) # 确保设备索引有效 if nSelCamIndex >= deviceList.nDeviceNum: QMessageBox.warning(mainWindow, "Error", f"无效的设备索引: {nSelCamIndex}", QMessageBox.Ok) return MV_E_PARAMETER try: ret = obj_cam_operation.Open_device( device_index=nSelCamIndex, access_mode=0 # 使用默认值 ) #ret = obj_cam_operation.Open_device(nSelCamIndex) #ret = obj_cam_operation.Open_device(device_index=nSelCamIndex) except AttributeError as e: logging.error(f"调用open_device失败: {str(e)}") QMessageBox.critical(mainWindow, "严重错误", f"相机操作对象缺少必要方法: {str(e)}", QMessageBox.Ok) return MV_E_CALLORDER if ret != 0: strError = f"打开设备失败 ret: {ToHexStr(ret)}" logging.error(strError) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) isOpen = False else: # 成功打开设备后的操作 logging.info("设备打开成功") set_continue_mode() get_param() isOpen = True enable_controls() # 创建并启动帧监控线程 frame_monitor_thread = FrameMonitorThread(obj_cam_operation) frame_monitor_thread.frame_status.connect(mainWindow.statusBar().showMessage) frame_monitor_thread.start() logging.info("帧监控线程已启动") obj_cam_operation = CameraOperation(cam, deviceList, nSelCamIndex) logging.info(f"CameraOperation 版本: {obj_cam_operation.VERSION}") return ret def start_grabbing(): global obj_cam_operation, isGrabbing # 关键改进:添加相机状态检查 if not obj_cam_operation or not hasattr(obj_cam_operation, 'cam') or not obj_cam_operation.cam: QMessageBox.warning(mainWindow, "Error", "相机对象未正确初始化", QMessageBox.Ok) return ret = obj_cam_operation.Start_grabbing(mainWindow.widgetDisplay.winId()) if ret != 0: strError = "开始取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = True enable_controls() # 等待第一帧到达 QThread.msleep(500) if not obj_cam_operation.is_frame_available(): QMessageBox.warning(mainWindow, "警告", "开始取流后未接收到帧,请检查相机连接!", QMessageBox.Ok) def stop_grabbing(): global obj_cam_operation, isGrabbing # 关键改进:添加相机状态检查 if not obj_cam_operation or not hasattr(obj_cam_operation, 'cam') or not obj_cam_operation.cam: QMessageBox.warning(mainWindow, "Error", "相机对象未正确初始化", QMessageBox.Ok) return # 关键改进:添加连接状态检查 if not hasattr(obj_cam_operation, 'connected') or not obj_cam_operation.connected: QMessageBox.warning(mainWindow, "Error", "相机未连接", QMessageBox.Ok) return ret = obj_cam_operation.Stop_grabbing() if ret != 0: strError = "停止取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = False enable_controls() def close_device(): global isOpen, isGrabbing, obj_cam_operation, frame_monitor_thread if frame_monitor_thread and frame_monitor_thread.isRunning(): frame_monitor_thread.stop() frame_monitor_thread.wait(2000) if isOpen and obj_cam_operation: # 关键改进:确保相机对象存在 if hasattr(obj_cam_operation, 'cam') and obj_cam_operation.cam: obj_cam_operation.Close_device() isOpen = False isGrabbing = False enable_controls() def set_continue_mode(): # 关键改进:添加相机状态检查 if not obj_cam_operation or not hasattr(obj_cam_operation, 'cam') or not obj_cam_operation.cam: return ret = obj_cam_operation.Set_trigger_mode(False) if ret != 0: strError = "设置连续模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(True) mainWindow.radioTriggerMode.setChecked(False) mainWindow.bnSoftwareTrigger.setEnabled(False) def set_software_trigger_mode(): # 关键改进:添加相机状态检查 if not obj_cam_operation or not hasattr(obj_cam_operation, 'cam') or not obj_cam_operation.cam: return ret = obj_cam_operation.Set_trigger_mode(True) if ret != 0: strError = "设置触发模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(False) mainWindow.radioTriggerMode.setChecked(True) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing) def trigger_once(): # 关键改进:添加相机状态检查 if not obj_cam_operation or not hasattr(obj_cam_operation, 'cam') or not obj_cam_operation.cam: return ret = obj_cam_operation.Trigger_once() if ret != 0: strError = "软触发失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) def save_sample_image(): global isGrabbing, obj_cam_operation, current_sample_path if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return # 尝试捕获当前帧 frame = obj_cam_operation.capture_frame() if frame is None: QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return # 确保图像有效 if frame.size == 0 or frame.shape[0] == 0 or frame.shape[1] == 0: QMessageBox.warning(mainWindow, "无效图像", "捕获的图像无效,请检查相机设置!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"sample_{timestamp}" file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存标准样本图像", os.path.join(last_dir, default_filename), "BMP Files (*.bmp);;PNG Files (*.png);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return # 确保文件扩展名正确 file_extension = os.path.splitext(file_path)[1].lower() if not file_extension: if "BMP" in selected_filter: file_path += ".bmp" elif "PNG" in selected_filter: file_path += ".png" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" else: file_path += ".bmp" file_extension = os.path.splitext(file_path)[1].lower() # 创建目录(如果不存在) directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) except OSError as e: QMessageBox.critical(mainWindow, "目录创建错误", f"无法创建目录 {directory}: {str(e)}", QMessageBox.Ok) return # 保存图像 try: # 使用OpenCV保存图像 if not cv2.imwrite(file_path, frame): raise Exception("OpenCV保存失败") # 更新状态 current_sample_path = file_path update_sample_display() settings.setValue("last_save_dir", os.path.dirname(file_path)) # 显示成功消息 QMessageBox.information(mainWindow, "成功", f"标准样本已保存至:\n{file_path}", QMessageBox.Ok) # 可选:自动预览样本 preview_sample() except Exception as e: logging.error(f"保存图像失败: {str(e)}") QMessageBox.critical(mainWindow, "保存错误", f"保存图像时发生错误:\n{str(e)}", QMessageBox.Ok) def preview_sample(): global current_sample_path if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(mainWindow, "错误", "请先设置有效的标准样本图像!", QMessageBox.Ok) return try: # 直接使用OpenCV加载图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: raise Exception("无法加载图像") # 显示图像 cv2.imshow("标准样本预览", sample_img) cv2.waitKey(0) cv2.destroyAllWindows() except Exception as e: QMessageBox.warning(mainWindow, "错误", f"预览样本失败: {str(e)}", QMessageBox.Ok) def start_grabbing(): global obj_cam_operation, isGrabbing ret = obj_cam_operation.Start_grabbing(mainWindow.widgetDisplay.winId()) if ret != 0: strError = "开始取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = True enable_controls() # 等待第一帧到达 QThread.msleep(500) if not obj_cam_operation.is_frame_available(): QMessageBox.warning(mainWindow, "警告", "开始取流后未接收到帧,请检查相机连接!", QMessageBox.Ok) def is_float(str): try: float(str) return True except ValueError: return False def get_param(): try: ret = obj_cam_operation.get_parameters() if ret != MV_OK: strError = "获取参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) else: mainWindow.edtExposureTime.setText("{0:.2f}".format(obj_cam_operation.exposure_time)) mainWindow.edtGain.setText("{0:.2f}".format(obj_cam_operation.gain)) mainWindow.edtFrameRate.setText("{0:.2f}".format(obj_cam_operation.frame_rate)) except Exception as e: error_msg = f"获取参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def set_param(): frame_rate = mainWindow.edtFrameRate.text() exposure = mainWindow.edtExposureTime.text() gain = mainWindow.edtGain.text() if not (is_float(frame_rate) and is_float(exposure) and is_float(gain)): strError = "设置参数失败: 参数必须是有效的浮点数" QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) return MV_E_PARAMETER try: ret = obj_cam_operation.set_param( frame_rate=float(frame_rate), exposure_time=float(exposure), gain=float(gain) ) if ret != MV_OK: strError = "设置参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) except Exception as e: error_msg = f"设置参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def enable_controls(): global isGrabbing, isOpen mainWindow.groupGrab.setEnabled(isOpen) mainWindow.paramgroup.setEnabled(isOpen) mainWindow.bnOpen.setEnabled(not isOpen) mainWindow.bnClose.setEnabled(isOpen) mainWindow.bnStart.setEnabled(isOpen and (not isGrabbing)) mainWindow.bnStop.setEnabled(isOpen and isGrabbing) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing and mainWindow.radioTriggerMode.isChecked()) mainWindow.bnSaveImage.setEnabled(isOpen and isGrabbing) mainWindow.bnCheckPrint.setEnabled(isOpen and isGrabbing) mainWindow.bnSaveSample.setEnabled(isOpen and isGrabbing) mainWindow.bnPreviewSample.setEnabled(bool(current_sample_path)) def update_sensor_display(data): if not data: return text = (f"张力: {data['tension']:.2f}N | " f"速度: {data['speed']:.2f}m/s | " f"温度: {data['temperature']:.1f}°C | " f"湿度: {data['humidity']:.1f}%") mainWindow.lblSensorData.setText(text) def connect_sensor(): global sensor_monitor_thread, sensor_controller sensor_type = mainWindow.cbSensorType.currentText() if sensor_controller is None: sensor_controller = SensorController() if sensor_type == "串口": config = { 'type': 'serial', 'port': mainWindow.cbComPort.currentText(), 'baudrate': int(mainWindow.cbBaudrate.currentText()), 'timeout': 1.0 } else: config = { 'type': 'ethernet', 'ip': mainWindow.edtIP.text(), 'port': int(mainWindow.edtPort.text()), 'timeout': 1.0 } if sensor_controller.connect(config): mainWindow.bnConnectSensor.setEnabled(False) mainWindow.bnDisconnectSensor.setEnabled(True) sensor_monitor_thread = SensorMonitorThread(sensor_controller) sensor_monitor_thread.data_updated.connect(update_sensor_display) sensor_monitor_thread.start() def disconnect_sensor(): global sensor_monitor_thread if sensor_controller: sensor_controller.disconnect() mainWindow.bnConnectSensor.setEnabled(True) mainWindow.bnDisconnectSensor.setEnabled(False) if sensor_monitor_thread and sensor_monitor_thread.isRunning(): sensor_monitor_thread.stop() sensor_monitor_thread.wait(2000) sensor_monitor_thread = None mainWindow.lblSensorData.setText("传感器数据: 未连接") def update_sensor_ui(index): mainWindow.serialGroup.setVisible(index == 0) mainWindow.ethernetGroup.setVisible(index == 1) class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("布料印花检测系统") self.resize(1200, 800) central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 设备枚举区域 device_layout = QHBoxLayout() self.ComboDevices = QComboBox() self.bnEnum = QPushButton("枚举设备") self.bnOpen = QPushButton("打开设备") self.bnClose = QPushButton("关闭设备") device_layout.addWidget(self.ComboDevices) device_layout.addWidget(self.bnEnum) device_layout.addWidget(self.bnOpen) device_layout.addWidget(self.bnClose) main_layout.addLayout(device_layout) # 取流控制组 self.groupGrab = QGroupBox("取流控制") grab_layout = QHBoxLayout(self.groupGrab) self.bnStart = QPushButton("开始取流") self.bnStop = QPushButton("停止取流") self.radioContinueMode = QRadioButton("连续模式") self.radioTriggerMode = QRadioButton("触发模式") self.bnSoftwareTrigger = QPushButton("软触发") grab_layout.addWidget(self.bnStart) grab_layout.addWidget(self.bnStop) grab_layout.addWidget(self.radioContinueMode) grab_layout.addWidget(self.radioTriggerMode) grab_layout.addWidget(self.bnSoftwareTrigger) main_layout.addWidget(self.groupGrab) # 参数设置组 self.paramgroup = QGroupBox("相机参数") param_layout = QGridLayout(self.paramgroup) self.edtExposureTime = QLineEdit() self.edtGain = QLineEdit() self.edtFrameRate = QLineEdit() self.bnGetParam = QPushButton("获取参数") self.bnSetParam = QPushButton("设置参数") self.bnSaveImage = QPushButton("保存图像") param_layout.addWidget(QLabel("曝光时间:"), 0, 0) param_layout.addWidget(self.edtExposureTime, 0, 1) param_layout.addWidget(self.bnGetParam, 0, 2) param_layout.addWidget(QLabel("增益:"), 1, 0) param_layout.addWidget(self.edtGain, 1, 1) param_layout.addWidget(self.bnSetParam, 1, 2) param_layout.addWidget(QLabel("帧率:"), 2, 0) param_layout.addWidget(self.edtFrameRate, 2, 1) param_layout.addWidget(self.bnSaveImage, 2, 2) main_layout.addWidget(self.paramgroup) # 图像显示区域 self.widgetDisplay = QLabel() self.widgetDisplay.setMinimumSize(640, 480) self.widgetDisplay.setStyleSheet("background-color: black;") self.widgetDisplay.setAlignment(Qt.AlignCenter) self.widgetDisplay.setText("相机预览区域") main_layout.addWidget(self.widgetDisplay, 1) # 状态栏 #self.statusBar = QStatusBar() #self.setStatusBar(self.statusBar) # 创建自定义UI组件 self.setup_custom_ui() def setup_custom_ui(self): # 工具栏 toolbar = self.addToolBar("检测工具") self.bnCheckPrint = QPushButton("检测印花质量") self.bnSaveSample = QPushButton("保存标准样本") self.bnPreviewSample = QPushButton("预览样本") self.cbHistory = QComboBox() self.cbHistory.setMinimumWidth(300) toolbar.addWidget(self.bnCheckPrint) toolbar.addWidget(self.bnSaveSample) toolbar.addWidget(self.bnPreviewSample) toolbar.addWidget(QLabel("历史记录:")) toolbar.addWidget(self.cbHistory) # 状态栏样本路径 self.lblSamplePath = QLabel("当前样本: 未设置样本") self.statusBar().addPermanentWidget(self.lblSamplePath) # 右侧面板 right_panel = QWidget() right_layout = QVBoxLayout(right_panel) right_layout.setContentsMargins(10, 10, 10, 10) # 差异度调整组 diff_group = QGroupBox("差异度调整") diff_layout = QVBoxLayout(diff_group) self.lblDiffThreshold = QLabel("差异度阈值 (0-100%):") self.sliderDiffThreshold = QSlider(Qt.Horizontal) self.sliderDiffThreshold.setRange(0, 100) self.sliderDiffThreshold.setValue(5) self.lblDiffValue = QLabel("5%") self.lblCurrentDiff = QLabel("当前差异度: -") self.lblCurrentDiff.setStyleSheet("font-size: 14px; font-weight: bold;") self.lblDiffStatus = QLabel("状态: 未检测") self.lblDiffStatus.setStyleSheet("font-size: 12px;") diff_layout.addWidget(self.lblDiffThreshold) diff_layout.addWidget(self.sliderDiffThreshold) diff_layout.addWidget(self.lblDiffValue) diff_layout.addWidget(self.lblCurrentDiff) diff_layout.addWidget(self.lblDiffStatus) right_layout.addWidget(diff_group) # 传感器控制面板 sensor_panel = QGroupBox("传感器控制") sensor_layout = QVBoxLayout(sensor_panel) sensor_type_layout = QHBoxLayout() self.lblSensorType = QLabel("传感器类型:") self.cbSensorType = QComboBox() self.cbSensorType.addItems(["串口", "以太网"]) sensor_type_layout.addWidget(self.lblSensorType) sensor_type_layout.addWidget(self.cbSensorType) sensor_layout.addLayout(sensor_type_layout) # 串口参数 self.serialGroup = QGroupBox("串口参数") serial_layout = QVBoxLayout(self.serialGroup) self.lblComPort = QLabel("端口:") self.cbComPort = QComboBox() if platform.system() == 'Windows': ports = [f"COM{i}" for i in range(1, 21)] else: ports = [f"/dev/ttyS{i}" for i in range(0, 4)] + [f"/dev/ttyUSB{i}" for i in range(0, 4)] self.cbComPort.addItems(ports) self.lblBaudrate = QLabel("波特率:") self.cbBaudrate = QComboBox() self.cbBaudrate.addItems(["96000", "19200", "38400", "57600", "115200"]) self.cbBaudrate.setCurrentText("115200") serial_layout.addWidget(self.lblComPort) serial_layout.addWidget(self.cbComPort) serial_layout.addWidget(self.lblBaudrate) serial_layout.addWidget(self.cbBaudrate) sensor_layout.addWidget(self.serialGroup) # 以太网参数 self.ethernetGroup = QGroupBox("以太网参数") ethernet_layout = QVBoxLayout(self.ethernetGroup) self.lblIP = QLabel("IP地址:") self.edtIP = QLineEdit("192.168.1.100") self.lblPort = QLabel("端口:") self.edtPort = QLineEdit("502") ethernet_layout.addWidget(self.lblIP) ethernet_layout.addWidget(self.edtIP) ethernet_layout.addWidget(self.lblPort) ethernet_layout.addWidget(self.edtPort) sensor_layout.addWidget(self.ethernetGroup) # 连接按钮 self.bnConnectSensor = QPushButton("连接传感器") self.bnDisconnectSensor = QPushButton("断开传感器") self.bnDisconnectSensor.setEnabled(False) sensor_layout.addWidget(self.bnConnectSensor) sensor_layout.addWidget(self.bnDisconnectSensor) # 延迟设置 delay_layout = QHBoxLayout() self.lblDelay = QLabel("触发延迟(秒):") self.spinDelay = QSpinBox() self.spinDelay.setRange(0, 60) self.spinDelay.setValue(0) self.spinDelay.setToolTip("传感器检测到布料后延迟拍摄的时间") delay_layout.addWidget(self.lblDelay) delay_layout.addWidget(self.spinDelay) sensor_layout.addLayout(delay_layout) # 传感器数据 self.lblSensorData = QLabel("传感器数据: 未连接") self.lblSensorData.setStyleSheet("font-size: 10pt;") sensor_layout.addWidget(self.lblSensorData) right_layout.addWidget(sensor_panel) right_layout.addStretch(1) # 停靠窗口 dock = QDockWidget("检测控制面板", self) dock.setWidget(right_panel) dock.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) self.addDockWidget(Qt.RightDockWidgetArea, dock) def closeEvent(self, event): logging.info("主窗口关闭,执行清理...") close_device() disconnect_sensor() event.accept() if __name__ == "__main__": path_fixer = PathFixer() app = QApplication(sys.argv) mainWindow = MainWindow() # 信号连接 mainWindow.cbSensorType.currentIndexChanged.connect(update_sensor_ui) update_sensor_ui(0) mainWindow.bnConnectSensor.clicked.connect(connect_sensor) mainWindow.bnDisconnectSensor.clicked.connect(disconnect_sensor) mainWindow.sliderDiffThreshold.valueChanged.connect(update_diff_threshold) mainWindow.bnCheckPrint.clicked.connect(sensor_controlled_check) mainWindow.bnSaveSample.clicked.connect(save_sample_image) mainWindow.bnPreviewSample.clicked.connect(preview_sample) mainWindow.bnEnum.clicked.connect(enum_devices) mainWindow.bnOpen.clicked.connect(open_device) mainWindow.bnClose.clicked.connect(close_device) mainWindow.bnStart.clicked.connect(start_grabbing) mainWindow.bnStop.clicked.connect(stop_grabbing) mainWindow.bnSoftwareTrigger.clicked.connect(trigger_once) mainWindow.radioTriggerMode.clicked.connect(set_software_trigger_mode) mainWindow.radioContinueMode.clicked.connect(set_continue_mode) mainWindow.bnGetParam.clicked.connect(get_param) mainWindow.bnSetParam.clicked.connect(set_param) mainWindow.bnSaveImage.clicked.connect(save_sample_image) main() mainWindow.show() app.exec_() close_device() disconnect_sensor() sys.exit() 这是目前主程序的的代码,出现了下面的错误 Traceback (most recent call last): File "c:\Users\zhangyuxiang\Desktop\cloth\main\clothing_inspection.py", line 56, in <module> from mv_import.Cam_Operation_class import CameraOperation File "c:\Users\zhangyuxiang\Desktop\cloth\main\mv_import\Cam_Operation_class.py", line 11, in <module> from .MvCameraControl_class import * File "c:\Users\zhangyuxiang\Desktop\cloth\main\mv_import\MvCameraControl_class.py", line 18, in <module> from .Camera_Params_header import * File "c:\Users\zhangyuxiang\Desktop\cloth\main\mv_import\Camera_Params_header.py", line 5, in <module> from .CameraParams_const import * ModuleNotFoundError: No module named 'mv_import.CameraParams_const'

File "d:\Lib\runpy.py", line 198, in _run_module_as_main return _run_code(code, main_globals, None, "__main__", mod_spec) File "d:\Lib\runpy.py", line 88, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy\__main__.py", line 71, in <module> cli.main() ~~~~~~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 501, in main run() ~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 351, in run_file runpy.run_path(target, run_name="__main__") ~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 310, in run_path return _run_module_code(code, init_globals, run_name, pkg_name=pkg_name, script_name=fname) File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 127, in _run_module_code _run_code(code, mod_globals, init_globals, mod_name, mod_spec, pkg_name, script_name) ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 118, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "D:\vscode\pagerank_experiment\test.py", line 3, in <module> debugpy.log_to('./debugpy.log') # 将日志写入文件 ~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\public_api.py", line 31, in wrapper return wrapped(*args, **kwargs) File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\api.py", line 66, in log_to raise RuntimeError("logging has already begun") RuntimeError: logging has already begun PS D:\vscode\pagerank_experiment> ^C PS D:\vscode\pagerank_experiment> PS D:\vscode\pagerank_experiment> d:; cd 'd:\vscode\pagerank_experiment'; & 'd:\python.exe' 'c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher' '63813' '--' 'D:\vscode\pagerank_experiment\test.py' Debugger attached, proceeding with execution... PS D:\vscode\pagerank_experiment> ^C PS D:\vscode\pagerank_experiment> PS D:\vscode\pagerank_experiment> d:; cd 'd:\vscode\pagerank_experiment'; & 'd:\python.exe' 'c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher' '63831' '--' 'D:\vscode\pagerank_experiment\test.py' Traceback (most recent call last): File "d:\Lib\runpy.py", line 198, in _run_module_as_main return _run_code(code, main_globals, None, "__main__", mod_spec) File "d:\Lib\runpy.py", line 88, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy\__main__.py", line 71, in <module> cli.main() ~~~~~~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 501, in main run() ~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 351, in run_file runpy.run_path(target, run_name="__main__") ~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 310, in run_path return _run_module_code(code, init_globals, run_name, pkg_name=pkg_name, script_name=fname) File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 127, in _run_module_code _run_code(code, mod_globals, init_globals, mod_name, mod_spec, pkg_name, script_name) ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 118, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "D:\vscode\pagerank_experiment\test.py", line 9, in <module> runpy.run_module('your_module_name', run_name='__main__') ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "d:\Lib\runpy.py", line 222, in run_module mod_name, mod_spec, code = _get_module_details(mod_name) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^ File "d:\Lib\runpy.py", line 142, in _get_module_details raise error("No module named %s" % mod_name) ImportError: No module named your_module_name PS D:\vscode\pagerank_experiment> ^C PS D:\vscode\pagerank_experiment> PS D:\vscode\pagerank_experiment> d:; cd 'd:\vscode\pagerank_experiment'; & 'd:\python.exe' 'c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher' '63897' '--' 'D:\vscode\pagerank_experiment\pagerank.py' Traceback (most recent call last): File "d:\Lib\runpy.py", line 198, in _run_module_as_main return _run_code(code, main_globals, None, "__main__", mod_spec) File "d:\Lib\runpy.py", line 88, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy\__main__.py", line 71, in <module> cli.main() ~~~~~~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 501, in main run() ~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 351, in run_file runpy.run_path(target, run_name="__main__") ~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 310, in run_path return _run_module_code(code, init_globals, run_name, pkg_name=pkg_name, script_name=fname) File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 127, in _run_module_code _run_code(code, mod_globals, init_globals, mod_name, mod_spec, pkg_name, script_name) ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 118, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "D:\vscode\pagerank_experiment\pagerank.py", line 1, in <module> import numpy as np ModuleNotFoundError: No module named 'numpy'

File "d:\Lib\runpy.py", line 198, in _run_module_as_main return _run_code(code, main_globals, None, "__main__", mod_spec) File "d:\Lib\runpy.py", line 88, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy\__main__.py", line 71, in <module> cli.main() ~~~~~~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 501, in main run() ~~~^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\launcher/../..\debugpy/..\debugpy\server\cli.py", line 351, in run_file runpy.run_path(target, run_name="__main__") ~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 310, in run_path return _run_module_code(code, init_globals, run_name, pkg_name=pkg_name, script_name=fname) File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 127, in _run_module_code _run_code(code, mod_globals, init_globals, mod_name, mod_spec, pkg_name, script_name) ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "c:\Users\28895\.vscode\extensions\ms-python.debugpy-2025.8.0-win32-x64\bundled\libs\debugpy\_vendored\pydevd\_pydevd_bundle\pydevd_runpy.py", line 118, in _run_code exec(code, run_globals) ~~~~^^^^^^^^^^^^^^^^^^^ File "D:\vscode\pagerank_experiment\test.py", line 9, in <module> runpy.run_module('your_module_name', run_name='__main__') ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "d:\Lib\runpy.py", line 222, in run_module mod_name, mod_spec, code = _get_module_details(mod_name) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^ File "d:\Lib\runpy.py", line 142, in _get_module_details raise error("No module named %s" % mod_name) ImportError: No module named your_module_name

E:\AI_System\web_ui>python server.py Traceback (most recent call last): File "E:\AI_System\web_ui\server.py", line 16, in <module> from flask_socketio import SocketIO, emit ModuleNotFoundError: No module named 'flask_socketio' # E:\AI_System\web_ui\server.py import sys import os import time import logging import json import traceback import threading import platform import psutil import importlib import datetime from pathlib import Path from flask import Flask, jsonify, request, render_template, send_from_directory from logging.handlers import TimedRotatingFileHandler from flask_socketio import SocketIO, emit from core.config import system_config as config # 导入必要的模块(假设这些模块存在) try: from environment.hardware_manager import create_hardware_manager from cognitive_arch.life_scheduler import LifeScheduler from agent.autonomous_agent import AutonomousAgent from environment.environment_manager import EnvironmentManager from environment.environment_state import EnvironmentState from environment.spatial_simulator import SpatialSimulator from environment.db_manager import DatabaseManager except ImportError as e: logging.warning(f"部分模块导入失败: {str(e)} - 使用模拟实现") # ========== 全局协调器 ========== coordinator = None def register_coordinator(coord): """注册意识系统协调器""" global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): """更新UI事件处理""" if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 初始化日志系统 ========== def setup_logger(): """配置全局日志系统""" # 确保日志目录存在 config.LOG_DIR.mkdir(parents=True, exist_ok=True) # 创建主日志记录器 logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.INFO)) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 安全日志处理装饰器 def safe_logger(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except UnicodeEncodeError: new_args = [] for arg in args: if isinstance(arg, str): new_args.append(arg.encode('ascii', 'ignore').decode('ascii')) else: new_args.append(arg) return func(*new_args, **kwargs) return wrapper # 应用安全日志处理 for level in ['debug', 'info', 'warning', 'error', 'critical']: setattr(logger, level, safe_logger(getattr(logger, level))) return logger # 初始化日志 logger = setup_logger() # ========== 系统初始化 ========== class SystemInitializer: """负责初始化系统核心组件""" def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None def initialize_system_paths(self): """初始化系统路径""" sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path}") def initialize_environment_manager(self): """初始化环境管理器""" try: env_config = { 'update_interval': 1.0, 'spatial': {'grid_size': 1.0} } self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None def initialize_ai_core(self): """初始化AI核心系统""" class AICore: def __init__(self, base_dir): self.base_dir = Path(base_dir) self.genetic_code = self.load_genetic_code() self.physical_state = { "health": 100, "energy": 100, "mood": "calm" } self.dependencies = self.scan_dependencies() def load_genetic_code(self): code_path = self.base_dir / "core" / "genetic_code.py" try: with open(code_path, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: return "# 默认遗传代码\nclass AICore:\n pass" def scan_dependencies(self): return { "nervous_system": "flask", "memory": "sqlite", "perception": "opencv", "reasoning": "transformers" } def mutate(self, new_code): try: code_path = self.base_dir / "core" / "genetic_code.py" with open(code_path, "w", encoding="utf-8") as f: f.write(new_code) importlib.reload(sys.modules['core.genetic_code']) self.genetic_code = new_code return True, "核心代码更新成功,系统已进化" except Exception as e: return False, f"进化失败: {str(e)}" def wear_dependency(self, dependency_name, version): self.dependencies[dependency_name] = version return f"已穿戴 {dependency_name}@{version}" def get_state(self): return { "genetic_code_hash": hash(self.genetic_code), "dependencies": self.dependencies, "physical_state": self.physical_state, "hardware_environment": self.get_hardware_info() } def get_hardware_info(self): return { "cpu": platform.processor(), "gpu": "NVIDIA RTX 3090" if psutil.virtual_memory().total > 16 * 1024 ** 3 else "Integrated", "memory_gb": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage_gb": round(psutil.disk_usage('/').total / (1024 ** 3), 1) } self.ai_core = AICore(self.base_dir) logger.info("✅ AI核心系统初始化完成") return self.ai_core def initialize_hardware_manager(self): """初始化硬件管理器""" try: config_path = config.CONFIG_DIR / "hardware_config.json" self.hardware_manager = create_hardware_manager(config_path) logger.info("✅ 硬件管理器初始化成功") return self.hardware_manager except Exception as e: logger.error(f"❌ 硬件管理器初始化失败: {str(e)}") logger.warning("⚠️ 使用内置简单硬件管理器") class SimpleHardwareManager: def __init__(self): self.available_hardware = { "cpu": ["Intel i9-13900K", "AMD Ryzen 9 7950X", "Apple M2 Max"], "gpu": ["NVIDIA RTX 4090", "AMD Radeon RX 7900 XTX", "Apple M2 GPU"], "memory": [16, 32, 64, 128], "storage": ["1TB SSD", "2TB SSD", "4TB SSD", "8TB HDD"], "peripherals": ["4K Camera", "3D Scanner", "High-Fidelity Microphone"] } self.current_setup = { "cpu": platform.processor(), "gpu": "Integrated Graphics", "memory": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage": round(psutil.disk_usage('/').total / (1024 ** 3), 1) } def request_hardware(self, hardware_type, specification): if hardware_type not in self.available_hardware: return False, f"不支持硬件类型: {hardware_type}" if specification not in self.available_hardware[hardware_type]: return False, f"不支持的规格: {specification}" self.current_setup[hardware_type] = specification return True, f"已请求 {hardware_type}: {specification}。请管理员完成安装。" def get_current_setup(self): return self.current_setup def get_performance_metrics(self): return { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "cpu_temp": 45.0, "gpu_temp": 55.0, "network_io": { "sent": psutil.net_io_counters().bytes_sent, "received": psutil.net_io_counters().bytes_recv }, "last_updated": datetime.datetime.now().isoformat() } self.hardware_manager = SimpleHardwareManager() return self.hardware_manager def initialize_life_scheduler(self): """初始化生活调度器""" try: config.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.life_scheduler = LifeScheduler() logger.info("✅ 生活调度器初始化成功") life_thread = threading.Thread( target=self._update_life_status, daemon=True, name="LifeSystemThread" ) life_thread.start() logger.info("✅ 生活系统后台线程已启动") return self.life_scheduler except Exception as e: logger.error(f"❌ 生活调度器初始化失败: {str(e)}") return None def _update_life_status(self): logger.info("🚦 生活系统后台线程启动") while True: try: now = datetime.datetime.now() current_hour = now.hour if 8 <= current_hour < 9: self.life_scheduler.wake_up() elif 12 <= current_hour < 13: self.life_scheduler.have_meal("lunch") elif 19 <= current_hour < 20: self.life_scheduler.have_meal("dinner") elif 23 <= current_hour or current_hour < 6: self.life_scheduler.go_to_sleep() self.life_scheduler.log_activity("系统状态更新") time.sleep(60) except Exception as e: logger.error(f"生活系统更新失败: {str(e)}", exc_info=True) time.sleep(300) def initialize_ai_agent(self): """初始化AI智能体""" try: config.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) self.ai_agent = AutonomousAgent( model_path=config.AGENT_PATH, cache_dir=config.MODEL_CACHE_DIR, use_gpu=config.USE_GPU, default_model=config.DEFAULT_MODEL ) logger.info("✅ AI智能体初始化成功") return self.ai_agent except Exception as e: logger.error(f"❌ AI智能体初始化失败: {str(e)}") return None def start_evolution_monitor(self): def monitor(): while True: try: cpu_usage = psutil.cpu_percent() mem_usage = psutil.virtual_memory().percent if cpu_usage > 80: self.ai_core.physical_state["energy"] = max(20, self.ai_core.physical_state["energy"] - 5) self.ai_core.physical_state["mood"] = "strained" elif cpu_usage < 30: self.ai_core.physical_state["energy"] = min(100, self.ai_core.physical_state["energy"] + 2) if self.hardware_manager: current_hw = self.ai_core.get_hardware_info() requested_hw = self.hardware_manager.get_current_setup() for hw_type, spec in requested_hw.items(): if current_hw.get(hw_type) != spec: if hw_type == "cpu": self.ai_core.physical_state["health"] = min(100, self.ai_core.physical_state[ "health"] + 10) elif hw_type == "gpu": self.ai_core.physical_state["energy"] = min(100, self.ai_core.physical_state[ "energy"] + 20) time.sleep(60) except Exception as e: logging.error(f"进化监控错误: {str(e)}") time.sleep(300) monitor_thread = threading.Thread(target=monitor, daemon=True) monitor_thread.start() logger.info("✅ 进化监控线程已启动") def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== Flask应用工厂 ========== def create_app(): app = Flask( __name__, template_folder='templates', static_folder='static', static_url_path='/static' ) app.secret_key = config.SECRET_KEY system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 初始化SocketIO app.config['SOCKETIO'] = SocketIO(app, cors_allowed_origins="*", async_mode='threading') # 注册路由 register_routes(app) register_error_handlers(app) return app # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state.to_dict()) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 环境状态广播 ========== def setup_environment_broadcast(app): socketio = app.config['SOCKETIO'] @socketio.on('connect', namespace='/environment') def handle_environment_connect(): app.logger.info('客户端已连接环境WebSocket') @socketio.on('disconnect', namespace='/environment') def handle_environment_disconnect(): app.logger.info('客户端已断开环境WebSocket') def broadcast_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return while True: try: state = env_manager.get_state() socketio.emit('environment_update', state.to_dict(), namespace='/environment') time.sleep(1) except Exception as e: app.logger.error(f"环境状态广播失败: {str(e)}") time.sleep(5) broadcast_thread = threading.Thread( target=broadcast_environment_state, daemon=True, name="EnvironmentBroadcastThread" ) broadcast_thread.start() app.logger.info("✅ 环境状态广播线程已启动") # ========== 路由注册 ========== def register_routes(app): register_environment_routes(app) setup_environment_broadcast(app) @app.route('/') def index(): return render_template('agent_interface.html') @app.route('/static/') def static_files(filename): try: return send_from_directory(app.static_folder, filename) except Exception as e: app.logger.error(f"静态文件服务失败: {filename} - {str(e)}") return jsonify({"error": "文件未找到"}), 404 @app.route('/status') def status(): try: components = app.config['SYSTEM_COMPONENTS'] status_data = { "server": { "status": "running", "uptime": time.time() - app.config['START_TIME'], "version": "1.0.0", "config": { "host": config.HOST, "port": config.PORT, "log_level": config.LOG_LEVEL, "default_model": config.DEFAULT_MODEL } }, "core": components['ai_core'].get_state(), "hardware": components['hardware_manager'].get_current_setup() } if components['environment_manager']: try: status_data["environment"] = components['environment_manager'].get_state().to_dict() except Exception as e: status_data["environment"] = {"error": str(e)} if components['life_scheduler']: try: status_data["life_system"] = components['life_scheduler'].get_current_state() except Exception as e: status_data["life_system"] = {"error": str(e)} if components['ai_agent']: try: status_data["agent"] = components['ai_agent'].get_status() except Exception as e: status_data["agent"] = {"error": str(e)} return jsonify(status_data) except Exception as e: app.logger.error(f"获取状态失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # 核心系统路由 @app.route('/api/core/state') def get_core_state(): return jsonify(app.config['SYSTEM_COMPONENTS']['ai_core'].get_state()) @app.route('/api/core/mutate', methods=['POST']) def mutate_core(): data = request.get_json() new_code = data.get('genetic_code') if not new_code: return jsonify({"success": False, "error": "缺少遗传代码"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['ai_core'].mutate(new_code) return jsonify({"success": success, "message": message}) @app.route('/api/core/wear', methods=['POST']) def wear_dependency(): data = request.get_json() dep_name = data.get('dependency') version = data.get('version', 'latest') if not dep_name: return jsonify({"success": False, "error": "缺少依赖名称"}), 400 result = app.config['SYSTEM_COMPONENTS']['ai_core'].wear_dependency(dep_name, version) return jsonify({"success": True, "message": result}) # 硬件管理路由 @app.route('/api/hardware/catalog') def get_hardware_catalog(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].available_hardware) @app.route('/api/hardware/request', methods=['POST']) def request_hardware(): data = request.get_json() hw_type = data.get('type') spec = data.get('specification') if not hw_type or not spec: return jsonify({"success": False, "error": "缺少硬件类型或规格"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['hardware_manager'].request_hardware(hw_type, spec) return jsonify({"success": success, "message": message}) @app.route('/api/hardware/current') def get_current_hardware(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].get_current_setup()) # 生活系统路由 @app.route('/life') def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') def get_life_status(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: current_state = components['life_scheduler'].get_current_state() recent_activities = components['life_scheduler'].get_recent_activities(10) return jsonify({ "success": True, "current_activity": current_state.get("current_activity", "未知"), "next_scheduled": current_state.get("next_scheduled", "未知"), "schedule": components['life_scheduler'].daily_schedule, "recent_activities": recent_activities, "energy_level": current_state.get("energy_level", 100), "mood": current_state.get("mood", "平静") }) except Exception as e: app.logger.error(f"获取生活状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/adjust_schedule', methods=['POST']) def adjust_schedule(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: data = request.json adjustments = data.get("adjustments", {}) valid_activities = ["wake_up", "breakfast", "lunch", "dinner", "sleep"] for activity, new_time in adjustments.items(): if activity not in valid_activities: return jsonify({"success": False, "error": f"无效的活动类型: {activity}"}), 400 if not isinstance(new_time, str) or len(new_time) != 5 or new_time[2] != ':': return jsonify({"success": False, "error": f"无效的时间格式: {new_time}"}), 400 components['life_scheduler'].adjust_schedule(adjustments) return jsonify({ "success": True, "message": "计划表已更新", "new_schedule": components['life_scheduler'].daily_schedule }) except Exception as e: app.logger.error(f"调整作息时间失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 400 # 聊天路由 @app.route('/chat', methods=['POST']) def chat(): components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") response = components['ai_agent'].process_input(user_input, user_id) return jsonify({"response": response}) except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 家具管理路由 furniture_cache = {} CACHE_DURATION = 3600 # 1小时 @app.route('/api/furniture') def get_furniture(): try: room = request.args.get('room', 'workshop') app.logger.info(f"获取家具数据: 房间={room}") current_time = time.time() if room in furniture_cache and current_time - furniture_cache[room]['timestamp'] < CACHE_DURATION: return jsonify(furniture_cache[room]['data']) furniture_data = { "workshop": [ {"type": "desk", "position": {"x": 0, "y": -1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "chair", "position": {"x": 0, "y": -1.5, "z": -1}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "bookshelf", "position": {"x": 3, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "computer", "position": {"x": 0.5, "y": -0.5, "z": -3.2}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "living_room": [ {"type": "sofa", "position": {"x": 0, "y": 0, "z": -2}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "tv", "position": {"x": 0, "y": 1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "bedroom": [ {"type": "bed", "position": {"x": 0, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "nightstand", "position": {"x": 1.5, "y": 0, "z": -2.5}, "rotation": {"x": 0, "y": 0, "z": 0}} ] } furniture_cache[room] = { 'timestamp': current_time, 'data': furniture_data.get(room, []) } return jsonify(furniture_cache[room]['data']) except Exception as e: app.logger.error(f"获取家具数据失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # ========== 错误处理器 ========== def register_error_handlers(app): @app.errorhandler(404) def page_not_found(error): app.logger.warning(f"404错误: {request.path}") return jsonify({ "error": "资源未找到", "path": request.path, "method": request.method }), 404 @app.errorhandler(500) def internal_server_error(error): app.logger.error(f"500错误: {str(error)}") return jsonify({ "error": "服务器内部错误", "message": "系统遇到意外错误,请稍后重试" }), 500 @app.errorhandler(Exception) def handle_general_exception(error): app.logger.error(f"未处理异常: {traceback.format_exc()}") return jsonify({ "error": "未处理的异常", "type": type(error).__name__, "message": str(error) }), 500 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 处理消息逻辑 response = f"已收到您的消息: {message}" # 如果有协调器,使用协调器处理 global coordinator if coordinator: try: response = coordinator.process_message(message) except Exception as e: logger.error(f"协调器处理消息失败: {str(e)}") socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) # ========== 主程序入口 ========== if __name__ == '__main__': try: app = create_app() socketio = app.config['SOCKETIO'] # 设置WebSocket处理器 setup_websocket_handlers(socketio) # 启动服务器 socketio.run( app, host=config.HOST, port=config.PORT, debug=config.DEBUG, use_reloader=False ) logger.info(f"服务器运行在 http://{config.HOST}:{config.PORT}") except KeyboardInterrupt: logger.info("服务器关闭") except Exception as e: logger.critical(f"服务器启动失败: {str(e)}") logger.error(traceback.format_exc())

iis部署此项目时报错:Error occurred while reading WSGI handler: Traceback (most recent call last): File "E:\anaconda3\Lib\site-packages\wfastcgi.py", line 791, in main env, handler = read_wsgi_handler(response.physical_path) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\anaconda3\Lib\site-packages\wfastcgi.py", line 633, in read_wsgi_handler handler = get_wsgi_handler(os.getenv("WSGI_HANDLER")) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\anaconda3\Lib\site-packages\wfastcgi.py", line 616, in get_wsgi_handler raise ValueError('"%s" could not be imported%s' % (handler_name, last_tb)) ValueError: "django.core.wsgi.get_wsgi_application()" could not be imported: Traceback (most recent call last): File "E:\anaconda3\Lib\site-packages\wfastcgi.py", line 605, in get_wsgi_handler handler = handler() ^^^^^^^^^ File "E:\anaconda3\Lib\site-packages\django\core\wsgi.py", line 12, in get_wsgi_application django.setup(set_prefix=False) File "E:\anaconda3\Lib\site-packages\django\__init__.py", line 19, in setup configure_logging(settings.LOGGING_CONFIG, settings.LOGGING) ^^^^^^^^^^^^^^^^^^^^^^^ File "E:\anaconda3\Lib\site-packages\django\conf\__init__.py", line 57, in __getattr__ self._setup(name) File "E:\anaconda3\Lib\site-packages\django\conf\__init__.py", line 44, in _setup self._wrapped = Settings(settings_module) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\anaconda3\Lib\site-packages\django\conf\__init__.py", line 107, in __init__ mod = importlib.import_module(self.SETTINGS_MODULE) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\anaconda3\Lib\importlib\__init__.py", line 90, in import_module return _bootstrap._gcd_import(name[level:], package, level) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<frozen importlib._bootstrap>", line 1387, in _gcd_import File "<frozen importlib._bootstrap>", line 1360, in _find_and_load File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed File "<frozen importlib._bootstrap>", line 1387, in _gcd_import File "<frozen importlib._bootstrap>", line 1360, in _find_and_load File "<frozen importlib._bootstrap>", line 1324, in _find_and_load_unlocked ModuleNotFoundError: No module named 'MyWebWord'

大家在看

recommend-type

Toolbox使用说明.pdf

Toolbox 是快思聪公司新近推出的一款集成多种调试功能于一体的工具软件,它可以实现多种硬件检 测, 调试功能。完全可替代 Viewport 实现相应的功能。它提供了有 Text Console, SMW Program Tree, Network Device Tree, Script Manager, System Info, File Manager, Network Analyzer, Video Test Pattern 多个 检测调试工具, 其中 Text Console 主要执行基于文本编辑的命令; SMW Program Tree 主要罗列出相应 Simpl Windows 程序中设计到的相关快思聪设备, 并可对显示出的相关设备进行效验, 更新 Firmware, 上传 Project 等操作; Network Device Tree 主要使用于显示检测连接到 Cresnet 网络上相关设备, 可对网络上设备进行 ID 设置,侦测设备线路情况; Script Manager 主要用于运行脚本命令; System Info 则用于显示联机的控制系统 软硬件信息,也可对相应信息进行修改,刷新; File Manager 显示控制系统主机内存文件系统信息,可进行 修改,建立等管理操作; Video Test Pattern 则用于产生一个测试图调较屏幕显示; Network Analyzer 用于检 测连接到 Cresnet 网络上所有设备的通信线路情况。以上大致介绍了 Toolbox 中各工具软件的用途,下面将 分别讲述一下各工具的实际用法
recommend-type

EVE-NG-Win-Client-Pack.zip

EVE模拟器插件,抓包软件Wireshark与vnc客户端下载,并进行关联,使得可以在EVE模拟器中使用Wireshark进行抓包
recommend-type

昆明各乡镇街道shp文件 最新

地理数据,精心制作,欢迎下载! 昆明各街道乡镇shp文件,内含昆明各区县shp文件! 主上大人: 您与其耗费时间精力去星辰大海一样的网络搜寻文件,并且常常搜不到,倒不如在此直接购买下载现成的,也就少喝两杯奶茶,还减肥了呢!而且,如果数据有问题,我们会负责到底,帮你处理,包您满意! 小的祝您天天开心,论文顺利!
recommend-type

无线通信技术.rar--华为内部培训资料

华为内部培训资料,有关通信方面的,很实用,参考性较强!
recommend-type

simplified_eye_hand_calibration.zip

simplified_eye_hand_calibration的代码和示例数据。 项目地址:https://github.com/ZiqiChai/simplified_eye_hand_calibration

最新推荐

recommend-type

【java毕业设计】喀什美食订餐网源码(ssm+mysql+说明文档+LW+PPT).zip

基于SSM框架的喀什美食订餐网的前台包含了用户注册、菜品列表、菜品排行、在线点餐和菜品收藏功能,下面是对这些功能的详细介绍: (1)用户注册功能:允许新用户创建自己的账户,并提供基本信息如用户名、密码等。注册完成后,用户可以登录到系统中进行后续操作。 (2)菜品列表功能:展示了所有可供选择的菜品,每个菜品都配有详细的描述、图片和价格等信息。用户可以通过搜索功能快速找到自己感兴趣的菜品,也可以按照分类或关键词进行筛选和排序。 (3)菜品排行功能:显示了当前最受欢迎或高评分的菜品,帮助用户挑选热门菜品,并参考其他用户的评价和推荐。 (4)在线点餐功能:允许用户在浏览菜品后直接下单,选择餐厅、菜品数量和其他相关选项。用户还可以添加特殊要求或备注,以满足个性化的需求。提交订单后,用户可以实时查看订单状态并付款。 完整前后端源码,部署后可正常运行! 环境说明 开发语言:Java后端 框架:ssm,mybatis JDK版本:JDK1.8+ 数据库:mysql 5.7+ 数据库工具:Navicat11+ 开发软件:eclipse/idea Maven包:Maven3.3+ 部署容器:tomcat7.5+
recommend-type

电力电子仿真技术解析:MMC、HVDC与微电网的建模与应用

电力电子仿真领域的关键技术,重点讨论了模块化多电平换流器(MMC)仿真、高压直流输电(HVDC)仿真以及微电网仿真。首先,通过MATLAB Simulink环境构建MMC仿真模型,研究其工作原理和性能特点,特别是如何通过级联子模块实现高电压、低谐波的效果。其次,探讨了柔性直流输电(VSC-HVDC)仿真,涉及SPWM、NLM、CPS-PWM等调制技术的应用,展示了这些技术如何提升系统的灵活性和稳定性。最后,针对微电网仿真,模拟了风电、光伏、储能等多种能源形式的协同工作,优化微电网的设计和运行。 适合人群:从事电力系统研究的技术人员、高校师生及相关领域的研究人员。 使用场景及目标:适用于电力系统设计、优化和故障诊断的研究项目,帮助理解和掌握电力电子仿真技术的基本原理和实际应用。 其他说明:文中提供了部分MATLAB伪代码示例,便于读者理解和实践。同时鼓励进一步咨询和探讨相关话题,以深化对电力电子仿真的认识。
recommend-type

基于群智能算法优化随机森林分类预测的MATLAB实现及性能对比

利用多种群智能算法(如粒子群优化、阿基米德优化、黏菌优化、麻雀优化和狼群优化)对随机森林(RF)进行参数优化的方法及其MATLAB代码实现。重点讨论了每种算法的工作原理、代码片段及其实验效果。实验结果显示,在UCI乳腺癌数据集上,不同算法优化后的RF模型表现各异,其中麻雀算法优化的RF模型达到了最高的准确率95.7%。 适合人群:对机器学习尤其是随机森林算法有一定了解的研究人员和技术爱好者,熟悉MATLAB编程环境。 使用场景及目标:适用于希望提高随机森林模型分类预测性能的研究项目或应用开发。主要目标是通过引入不同的群智能算法来优化随机森林的关键参数(如树的数量和最小叶子节点数),从而提升模型的整体性能。 其他说明:文中还提供了一些实用的小技巧,例如可以先使用收敛速度快的算法确定大致参数区间,然后采用精度更高的算法进行细调。此外,作者提到可以通过私信获取最新的混合优化方案。
recommend-type

基于 C# WinForm 技术的字体编辑器

在 Windows 桌面应用开发中,文本编辑工具是非常常见的需求,而字体样式的自定义更是提升用户体验的重要功能。本文将基于 C# WinForm 技术,解析两个字体编辑器版本(基础版与进阶版)的实现思路,带你了解如何从零开始构建一个支持字体自定义的文本编辑工具。 项目结构概览 整个项目包含两个主要版本: 字体编辑器(基础):实现基本的字体样式设置功能 字体编辑器(进阶):在基础版之上增加了右键菜单、更丰富的字体设置选项 两个版本均基于.NET Framework 4.7.2 开发,采用 WinForm 传统桌面应用架构,主要包含主窗体(Form1)和字体设置窗体(Form2)两个核心界面。
recommend-type

【办公自动化】Python自动生成Excel报表系统:数据处理、图表生成与邮件发送全流程详解

内容概要:本文介绍了如何利用Python自动化生成专业的Excel报表,涵盖数据提取、格式美化、图表自动生成等功能,帮助用户从重复性劳动中解放出来。文章详细描述了整个流程,包括环境准备与库安装、数据准备与读取、创建Excel报表框架、设计专业表格样式、添加专业图表、插入自动计算公式以及完整代码实现。此外,还提供了扩展功能,如邮件自动发送报表和定时自动生成报表。 适合人群:具备一定编程基础,尤其是对Python有一定了解,并希望提高办公效率的办公人员或数据分析师。 使用场景及目标:①需要定期生成销售报表或其他类型的数据报表;②希望通过自动化工具减少手动操作,提高工作效率;③希望掌握Python在办公自动化中的应用技巧,特别是结合pandas、openpyxl等库进行数据处理和Excel操作。 阅读建议:此资源不仅提供了完整的源码和详细注释,更重要的是引导读者理解每个步骤背后的逻辑和技术细节。因此,在学习过程中,建议读者跟随代码逐步实践,同时理解每一步骤的目的和实现方法。
recommend-type

Notes App API开发与使用指南

### API基础知识 #### 标题分析:“notes-app-api” 从标题“notes-app-api”可以推断,此API(Application Programming Interface,应用程序接口)是专为一个名为“notes-app”的应用程序设计的。这种API通常被用来允许不同的软件组件之间进行通信。在这个案例中,“notes-app”可能是一款笔记应用,该API提供了笔记数据的获取、更新、删除等操作的接口。 #### 描述分析:“API休息说明” 在提供的“API休息说明”中,我们可以看到几个重要的操作指令: 1. **指令“dev”:** `npm run dev` - 这是一个用于启动开发模式的命令。通常情况下,`npm run dev`会使用Node.js环境下的某种热重载功能,让开发者在开发过程中实时看到代码更改的效果。 - `npm`是Node.js的包管理器,用于安装项目所需的依赖、运行脚本等。 - `dev`是脚本命令的缩写,实际对应的是`package.json`文件中定义的某个开发环境下的脚本命令。 2. **指令“服务”:** `npm start` - 这是一个用于启动应用程序服务的命令。 - 同样利用Node.js的`npm`包管理器执行,其目的是部署应用程序,使其对外提供服务。 3. **指令“构建”:** `npm run build` - 这是用于构建项目的命令,通常会将源代码进行压缩、转译等操作,生成用于生产环境的代码。 - 例如,如果项目使用了TypeScript,构建过程可能包括将TypeScript代码编译成JavaScript,因为浏览器不能直接运行TypeScript代码。 #### 标签分析:“TypeScript” TypeScript是JavaScript的超集,提供了静态类型检查和ES6+的特性。使用TypeScript可以提高代码的可读性和可维护性,同时在编译阶段发现潜在的错误。 1. **TypeScript的特性:** - **静态类型检查:** 有助于在开发阶段捕捉类型错误,降低运行时错误的概率。 - **ES6+特性支持:** TypeScript支持最新的JavaScript语法和特性,可以使用装饰器、异步编程等现代JavaScript特性。 - **丰富的配置选项:** 开发者可以根据项目需求进行各种配置,如模块化系统、编译目标等。 2. **TypeScript的使用场景:** - 大型项目:在大型项目中,TypeScript有助于维护和扩展代码库。 - 多人协作:团队开发时,类型定义有助于减少沟通成本,提高代码一致性。 - 错误敏感应用:如金融、医疗等领域的应用,可以利用TypeScript的静态类型检查减少bug。 #### 文件分析:“压缩包子文件的文件名称列表: notes-app-api-develop” 这个文件列表中包含了“notes-app-api-develop”,它表明存在一个与开发相关的压缩包或存档文件。这个文件很可能包含了应用程序的源代码,通常还会包括`package.json`文件,这个文件定义了项目的依赖关系和可运行的脚本命令。在开发和部署过程中,开发者通常会根据`package.json`中定义的脚本来执行不同的任务,如`npm run dev`或`npm start`等。 ### Docker使用说明 在描述中还提到了使用Docker的命令: 1. **构建镜像:** `docker build -t notes-api .` - 这个命令用于构建一个名为`notes-api`的Docker镜像。 - `.`表示Dockerfile在当前目录。 - `-t`指定镜像的名称和标签。 2. **运行容器:** `docker run -d -it -p 3005:3005 notes-api` - 该命令用于从`notes-api`镜像启动一个容器,并在后台运行。 - `-d`表示后台运行容器。 - `-it`则是将容器的标准输入打开,并分配一个伪终端。 - `-p 3005:3005`将容器内部的3005端口映射到宿主机的3005端口,允许宿主机与容器内部的服务进行通信。 Docker作为容器化平台,它允许开发者打包应用及其依赖环境为一个轻量级、可移植的容器,这样无论在什么环境下,都能保证应用运行的一致性。这一点对于开发和部署API来说尤为重要,因为它确保了开发环境与生产环境的一致性,减少了所谓的“在我的机器上可以运行”的问题。 ### 总结 综上所述,“notes-app-api”是一个为笔记应用程序设计的API,开发者可以通过一系列npm脚本命令进行开发、服务和构建操作。此外,该API还涉及到使用Docker进行容器化部署的流程。最后,提及的TypeScript标签意味着该项目在前端开发上采用了TypeScript,以期获得更加健壮和可维护的代码库。了解这些知识点对开发、部署和维护现代Web应用至关重要。
recommend-type

【PMSM建模与测试最佳实践】:MATLAB电机仿真模型的权威指导

# 1. PMSM电机模型的基础知识 在现代工业自动化和电动车辆领域,永磁同步电机(PMSM
recommend-type

如何通过四元数避免万向节死锁?

<think>好的,用户想了解如何使用四元数来避免万向节锁问题,尤其是在3D旋转中保持姿态的稳定性。首先,我需要回忆万向节锁的基本概念和四元数如何解决这个问题。万向节锁发生在使用欧拉角时,当两个旋转轴对齐导致失去一个自由度。而四元数通过四维空间的旋转避免了这种顺序依赖。 接下来,我应该解释万向节锁的原因,比如三个轴依次旋转时,某个轴可能与其他轴对齐,导致无法正确旋转。然后对比四元数的优势,比如四元数的连续性和无奇异性。需要提到四元数的数学表示,如单位四元数和旋转插值方法(如球面线性插值),以及它们如何避免万向节锁。 还要考虑用户可能的实际应用场景,比如游戏开发或机器人学,是否需要示例代码?
recommend-type

Python实现Couchbase大规模数据复制技术

标题中提到的技术“couchbase-massive-replication”是一种针对Couchbase数据库的开源Python开发工具,专门用于高效地实现跨集群的大量存储桶和索引的复制。Couchbase是一个高性能、可扩展、容错的NoSQL文档数据库,它支持同步分布式复制(XDCR),能够实现跨地域的数据复制。 描述部分详细阐述了该技术的主要用途和优势。它解决了一个常见问题:在进行XDCR复制时,迁移大量存储桶可能会遇到需要手动检查并迁移缺失存储桶的繁琐步骤。Couchbase-massive-replication技术则允许用户在源和目标集群之间无需进行存储桶配置,简化了迁移过程。开发者可以通过简单的curl请求,向集群发送命令,从而实现大规模存储桶的自动化迁移。 此外,为了帮助用户更容易部署和使用该技术,项目提供了一个Dockerfile,允许用户通过Docker容器来运行程序。Docker是一种流行的容器化平台,可以将应用及其依赖打包到一个可移植的容器中,便于部署和扩展。用户只需执行几个Docker命令,即可快速启动一个名为“cbmigrator”的容器,版本为0.1。启动容器后,可以通过发送简单的POST请求来操作迁移任务。 项目中还提到了Docker Hub,这是一个公共的Docker镜像注册中心,用户可以在其中找到并拉取其他用户分享的镜像,其中就包括了“cbmigrator”镜像,即demir94/cbmigrator:0.1。这大大降低了部署和使用该技术的门槛。 根据标签“Python”,我们可以推断出该项目是使用Python开发的。Python是一种广泛使用的高级编程语言,以其简洁的语法和强大的库支持而闻名。该项目中Python的使用意味着用户可能需要具备一定的Python基础知识,以便对项目进行定制或故障排除。Python的动态类型系统和解释执行机制,使得开发过程中可以快速迭代和测试。 最后,从提供的压缩包子文件的文件名称列表“couchbase-massive-replication-main”来看,该项目的源代码文件夹可能遵循了通用的开源项目结构,其中“main”文件夹通常包含了项目的主要代码和入口文件。用户在获取项目后,可以在这个文件夹中找到相关的代码文件,包括配置文件、数据库模型、业务逻辑实现以及API接口等。 综合来看,这个项目涉及的技术点包括: - Couchbase数据库:一种文档数据库,广泛用于构建可扩展的应用程序。 - XDCR(Cross-Datacenter Replication):Couchbase提供的跨数据中心数据复制机制,实现数据的无缝迁移和灾难恢复。 - Python编程语言:用来开发该项目的高级编程语言,以其易读性和简洁的语法著称。 - Docker容器化技术:用于打包、分发和运行应用程序的平台,提供了一种便捷的部署方式。 - Docker Hub:一个存放和分享Docker镜像的平台,可以简化镜像的查找、下载和管理过程。 这个项目对于需要在多个Couchbase集群间迁移大量数据的开发者和运维人员来说是一个宝贵的资源,因为它大大简化了存储桶迁移的过程,并提高了操作的便利性和效率。
recommend-type

【MATLAB电机性能评估案例】:仿真环境下的深度研究

# 1. MATLAB在电机性能评估中的应用概述 电机作为现代工业中不可或缺的电力传动设备,其性能优劣直接影响整个系统的可靠性和效率。在众多的电机性能评估工具中,MATLAB凭借其强大的数值计算能力和丰富的工具箱资源,成为该领域研究和工程实践中的有力工具。本章将对MATLAB在电机性能评估中的应用进行概述,并介绍其在电机仿真、故障诊断和性能优化等方面的具体应用前景和价值。MA