活动介绍

python的asyncio.get_event_loop().set_exception_handler

时间: 2025-07-28 08:12:27 浏览: 6
<think>我们正在讨论Python的asyncio模块中的事件循环异常处理。用户询问set_exception_handler方法的使用示例。根据引用[2]中提供的信息,set_exception_handler用于设置事件循环的异常处理器。当事件循环中发生未捕获的异常时,会调用这个处理器。同时,引用[3]提供了一个PHP的set_exception_handler示例,虽然语言不同,但概念类似:设置一个回调函数来处理未捕获的异常。在Pythonasyncio中,我们可以这样使用:1.定义一个异常处理函数,该函数需要两个参数:loop(事件循环)和context(包含异常信息的字典)。2.使用loop.set_exception_handler(handler)设置这个处理器。引用[2]中给出了一个简单的示例代码,我们在此基础上进行扩展和解释。注意:在事件循环中,如果某个任务或回调引发了异常且未被捕获,事件循环会调用设置的异常处理器。如果没有设置,则使用默认的异常处理器(打印错误信息并继续运行?但默认行为可能是记录错误并停止相关任务)。以下是一个使用示例:</think>### Python asyncio set_exception_handler 使用指南 `asyncio.set_exception_handler()` 用于设置事件循环的全局异常处理器,当任务或回调中出现未捕获异常时触发。以下是具体使用方法和示例: #### 基本语法 ```python loop = asyncio.get_event_loop() loop.set_exception_handler(handler) ``` #### 参数说明 - **`handler`**: 异常处理函数,需接受两个参数: - `loop`: 当前事件循环对象 - `context`: 包含异常信息的字典,关键字段: - `"message"`: 错误描述 - `"exception"` (可选): 异常对象 - `"future"`/`"task"`: 关联的异步对象 --- #### 完整示例代码 ```python import asyncio # 自定义异常处理器 def custom_handler(loop, context): # 提取异常信息 msg = context.get("message", "Unknown error") exc = context.get("exception") task = context.get("task") # 构建错误报告 error_msg = f"⚠️ 事件循环捕获异常:\n- 消息: {msg}" if exc: error_msg += f"\n- 异常类型: {type(exc).__name__}" if task: error_msg += f"\n- 关联任务: {task.get_name()}" print(error_msg) # 此处可添加日志记录/报警等逻辑 async def faulty_task(): await asyncio.sleep(1) raise ValueError("示例错误: 数据校验失败") async def main(): loop = asyncio.get_running_loop() # 设置自定义异常处理器 loop.set_exception_handler(custom_handler) # 创建会抛出异常的任务 task = asyncio.create_task(faulty_task(), name="faulty_task") await task # 显式等待以触发异常处理 # 运行主函数 try: asyncio.run(main()) except ValueError: print("主函数捕获到显式异常") ``` #### 输出结果示例: ``` ⚠️ 事件循环捕获异常: - 消息: Task exception was never retrieved - 异常类型: ValueError - 关联任务: faulty_task ``` --- ### 关键注意事项 1. **异常处理范围** 仅处理事件循环内未捕获的异常(如未`await`的任务异常),已被`try/except`捕获的异常不会触发此处理器[^2]。 2. **与`try/except`的区别** ```python async def safe_task(): try: raise RuntimeError("内部错误") except Exception: print("异常已局部处理") # 不会触发全局处理器 ``` 3. **恢复默认处理器** ```python loop.set_exception_handler(None) # 恢复系统默认处理 ``` 4. **调试模式增强** 启用调试模式可获取更详细堆栈: ```python loop.set_debug(True) # 显示异常发生位置[^2] ``` --- ### 典型应用场景 - 日志集中收集异步任务异常 - 监控系统报警(如发送异常信息到监控平台) - 防止因未处理异常导致事件循环停止 - 自定义错误报告格式(如生产环境隐藏敏感信息)
阅读全文

相关推荐

# -------------------------------------- # Created by Milo on 2025/3/18 # Project: app_maintain # Desc: 纯蓝牙连接 # -------------------------------------- import traceback from asyncio import Event from threading import Thread from typing import Union from PyQt6.QtCore import QThread from bleak import BleakClient, BleakScanner, BLEDevice from bleak.exc import BleakError import asyncio import logging from customer_blue.include.basealgorithm import decimal_to_hex, get_high_low from customer_blue.include.protobuf_msg import MessagePB from customer_blue.include.seting import logger from customer_blue.include.signalmanage import MainSignal BLUE_SERVICE_UUID = "0000ffff-0000-1000-8000-00805f9b34fb" BLUE_WRITE_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb" BLUE_NOTIF_CHAR_UUID = "0000ff02-0000-1000-8000-00805f9b34fb" BLUE_NOTIF_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb" class StableBleakClient: def __init__(self, device_name=None, device_address=None, timeout=20.0, retries=3, heartbeat_interval=10.0): """ 稳定的BLE客户端封装类 参数: device_name: 目标设备名称 (优先使用) device_address: 目标设备MAC地址 timeout: 连接超时时间(秒) retries: 最大重连次数 heartbeat_interval: 心跳检测间隔(秒) """ self.device_name = device_name self.device_address = device_address self.timeout = timeout self.max_retries = retries self.heartbeat_interval = heartbeat_interval self.client = None self.heartbeat_task = None self.connected = False self.alive_callbacks = None self.main_disconnected_callbacks = None self.notification_callbacks = {} # 存储通知回调 async def connect(self): """建立稳定连接,包含重试机制和信号强度筛选""" if not self.device_name and not self.device_address: raise ValueError("必须提供device_name或device_address") device = None attempts = 0 while attempts < self.max_retries and not self.connected: attempts += 1 try: # 通过扫描查找设备 if self.device_name: device = await self._find_device_by_name() elif self.device_address: device = await self._find_device_by_address() if not device: print(f"未找到设备,重试 {attempts}/{self.max_retries}") await asyncio.sleep(2) continue # 创建客户端并连接 self.client = BleakClient(device.address, timeout=self.timeout, disconnected_callback=self._handle_disconnect) await self.client.connect() # 设置断开回调 # 启动心跳检测 self.heartbeat_task = asyncio.create_task(self._heartbeat_check()) self.connected = True print(f"成功连接到设备: {device.name} ({device.address})") return True except (BleakError, asyncio.TimeoutError) as e: print(f"连接失败 (尝试 {attempts}/{self.max_retries}): {str(e)}") await self._cleanup_resources() await asyncio.sleep(1 + attempts) # 指数退避重试 return False async def _find_device_by_name(self): """通过设备名称查找信号最强的设备""" devices = await BleakScanner.discover(timeout=5.0) target_devices = [d for d in devices if d.name and d.name == self.device_name] if not target_devices: return None # 选择RSSI信号最强的设备 strongest_device = max(target_devices, key=lambda d: d.rssi) print(f"找到设备: {strongest_device.name}, RSSI: {strongest_device.rssi}") return strongest_device async def _find_device_by_address(self): """通过MAC地址查找设备""" device = await BleakScanner.find_device_by_address( self.device_address, timeout=5.0 ) if device: print(f"找到设备: {device.address}") return device def regst_disconnect_callback(self, callback): self.main_disconnected_callbacks = callback async def _heartbeat_check(self): """心跳检测维持连接""" while self.connected: try: # 读取设备信息作为心跳检测 if self.client and self.client.is_connected: await self.client.read_gatt_char(BLUE_NOTIF_CHAR_UUID) # 设备名称特征 print("心跳检测成功") else: print("心跳检测失败,连接已断开") self.connected = False except BleakError as e: print(f"心跳检测错误: {str(e)}") self.connected = False await self.alive_callbacks( self.connected # 内部状态标志 and self.client is not None # 客户端对象存在 and self.client.is_connected ) await asyncio.sleep(self.heartbeat_interval) async def enable_notifications(self, char_uuid, callback): """启用通知并注册回调""" if not self.connected or not self.client: raise ConnectionError("未连接") await self.client.start_notify(char_uuid, callback) self.notification_callbacks[char_uuid] = callback async def disable_notifications(self, char_uuid): """禁用通知并移除回调""" if not self.connected or not self.client: return if char_uuid in self.notification_callbacks: await self.client.stop_notify(char_uuid) del self.notification_callbacks[char_uuid] def _handle_disconnect(self, client): """处理断开连接事件""" print("设备断开连接") self.connected = False self.main_disconnected_callbacks("disconnected") asyncio.create_task(self._cleanup_resources()) async def disconnect(self): """安全断开连接并清理资源""" if not self.connected or not self.client: return print("正在断开连接...") self.connected = False await self.alive_callbacks(False) # 取消心跳任务 if self.heartbeat_task: self.heartbeat_task.cancel() try: await self.heartbeat_task except asyncio.CancelledError: pass await self._cleanup_resources() async def _cleanup_resources(self): """清理所有资源""" try: if self.client: # 停止所有通知 for char_uuid in list(self.notification_callbacks.keys()): try: await self.client.stop_notify(char_uuid) except BleakError: pass self.notification_callbacks.clear() # 断开连接 if self.client.is_connected: await self.client.disconnect() # 强制清理(平台特定) if hasattr(self.client, '_backend') and hasattr(self.client._backend, 'cleanup'): self.client._backend.cleanup() # print("资源清理完成") except Exception as e: print(f"清理资源时出错: {str(e)}") finally: self.client = None async def read_characteristic(self, char_uuid): """读取特征值""" if not self.connected or not self.client: raise ConnectionError("未连接") return await self.client.read_gatt_char(char_uuid) async def write_characteristic(self, char_uuid, data, response=True): """写入特征值""" if not self.connected or not self.client: raise ConnectionError("未连接") await self.client.write_gatt_char(char_uuid, data) async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc_value, traceback): await self.disconnect() def is_alive_callbacks(self, callback): """是否连接""" self.alive_callbacks = callback class BleakClientThread(QThread): FLAG = 'BleakClientThread' def __init__(self, address_or_ble_device: Union[BLEDevice, str], parent, disconnect_callback=None): super().__init__() self.send_sequence = 0 self.manage = parent self.ble_device = address_or_ble_device self.loop = asyncio.new_event_loop() self.stop_event = asyncio.Event() self.write_queue = asyncio.Queue() # 当前已经存的长度 self.already_len = None # 当前的片计数 self.cont = None # 需要拼包的总长的,第一次进入时赋值 self.total_length = None # 拼包数据 self.pack_data = [] self.pack_flag = False self.last_seq = None self.notice_callback = None self.recv_callback = None self.is_connected = False self.disconnect_callback = disconnect_callback def register_connect(self, parm): """ 注册连接成功、失败、断开的回调通知函数 :param parm: 函数指针名 :return: """ self.notice_callback = parm def unregister_connect(self): """ 注销回调函数 :return: """ self.notice_callback = None def register_recv(self, parm): """ 注册收数 :param parm: :return: """ self.recv_callback = parm def unregister_recv(self): """ 注销收数 :return: """ self.recv_callback = None def run(self): try: asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self.main()) except Exception as e: logger.error(f"{self.FLAG}, run error: {str(e)}") finally: # Clean up loop resources self.loop.run_until_complete(self.loop.shutdown_asyncgens()) # self.loop.close() async def main(self): # 使用设备名称连接 async with StableBleakClient(device_address=self.ble_device) as client: # 启用通知 await client.enable_notifications( BLUE_NOTIF_CHAR_UUID, self.notification_handler ) client.is_alive_callbacks(self.is_connect) client.regst_disconnect_callback(self.disconnect_callback) # 读取设备信息 # device_name = await client.read_characteristic(BLUE_NOTIF_DESC_UUID) # print(f"设备名称: {device_name.decode('utf-8')}") while True: try: data = await asyncio.wait_for(self.write_queue.get(), timeout=1.0) if data is None: break print("send to ",data) # 写入数据 await client.write_characteristic( BLUE_WRITE_CHAR_UUID, data, response=True ) except asyncio.TimeoutError: continue # Timeout is expected, continue the loop # async def main(self): # try: # await self.client.connect() # for service in self.client.services: # logger.error( # '-----------------service uuid: {} [{}]---------------------'.format(service.uuid, # service.description)) # for c in service.characteristics: # logger.error('characteristic uuid: {} [{}] [{}]'.format(c.uuid, c.description, c.properties)) # await self.client.start_notify(BLUE_NOTIF_CHAR_UUID, self.notification_handler) # MainSignal.get_instance().manage_client_connected.emit(True) # if self.notice_callback: # self.notice_callback(True) # while not self.stop_event.is_set(): # try: # data = await asyncio.wait_for(self.write_queue.get(), timeout=1.0) # if data is None: # break # await self.client.write_gatt_char(BLUE_WRITE_CHAR_UUID, data) # except asyncio.TimeoutError: # continue # Timeout is expected, continue the loop # except Exception as e: # logger.error(f"{self.FLAG}, connection or notification error: {str(e)}") # finally: # try: # await self.client.disconnect() # logger.error(f"{self.FLAG}, Bluetooth disconnected") # except Exception as e: # logger.error(f"{self.FLAG}, disconnect error: {str(e)}") # # MainSignal.get_instance().manage_client_connected.emit(False) # if self.notice_callback: # self.notice_callback(True) # 第一次收到14帧序号,进行初始拼包操作 def inti_merge_data(self, data): self.pack_data.clear() high = data[0:2] low = data[2:4] self.total_length = int(low + high, 16) * 2 self.pack_data.append(data[4:]) self.already_len = len(data[4:]) self.pack_flag = True # 拼包0x14 def dispose_0x14(self, msg_type, fc, data): if not self.pack_flag and fc == '14': self.inti_merge_data(data) elif fc == '14': # 第二次来的还是14 就进行检查是否为上一帧的后续,如果不是,就当第一次14帧处理 temp_height = data[0:2] temp_low = data[2:4] temp_len = int(temp_low + temp_height, 16) * 2 if self.total_length != temp_len + self.already_len: self.inti_merge_data(data) else: self.pack_data.append(data[4:]) self.already_len += len(data[4:]) else: temp_len = len(data) if self.total_length == self.already_len + temp_len: real_data = '' for str_data in self.pack_data: real_data += str_data real_data += data self.pack_flag = False MainSignal.get_instance().manage_data_received.emit(msg_type, real_data) if self.recv_callback: self.recv_callback(msg_type, data) self.pack_data.clear() else: self.dispose_0x04(msg_type, data) # 直接丢出0x04 def dispose_0x04(self, msg_type, data): MainSignal.get_instance().manage_data_received.emit(msg_type, data) if self.recv_callback: self.recv_callback(msg_type, data) def notification_handler(self, sender, data): try: print("", data) data_t = bytes(data).hex() lo = ' '.join([data_t[i:i + 2] for i in range(0, len(data_t), 2)]) # print(lo) msg_type = data_t[0:2] fc = data_t[2:4] seq = data_t[4:6] data_len = data_t[6:8] data_protobuf = data_t[8:] if msg_type == '3d': return # if msg_type != '4d': # print(lo) if fc == '14' or self.pack_flag: self.dispose_0x14(msg_type, fc, data_protobuf) else: self.dispose_0x04(msg_type, data_protobuf) except Exception: traceback.print_exc() # 获取计数 def get_send_num(self): self.send_sequence += 1 self.send_sequence = self.send_sequence & 0xFF return self.send_sequence def send_data(self, obj: object): seqs = self.get_send_num() data = None # TODO if isinstance(obj, MessagePB): obj.setseqs(seqs) data = obj.create_msg() else: # data = obj return while True: data_len = len(data) if data_len == 0: break if data_len > 100: head = [77, 20, seqs, 102] hex_st = decimal_to_hex(data_len) high_byte, low_byte = get_high_low(hex_st) low_byte = int(low_byte, 16) head.append(low_byte) high_byte = int(high_byte, 16) head.append(high_byte) head.extend(data[0:100]) data = data[100:] send_data = bytearray(head) seqs = self.get_send_num() else: len_data = len(data) head = [77, 4, seqs, len_data] head.extend(data) send_data = bytearray(head) data = '' hx = send_data.hex() # logger.debug(f" send data: {' '.join([hx[i:i + 2] for i in range(0, len(hx), 2)])}") if not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.write_queue.put_nowait, send_data) def send_wifi_cntrl(self, msg_type, msg_data): seqs = self.get_send_num() if msg_data is None: head = [msg_type, 0, seqs, 0] else: head = [msg_type, 0, seqs, len(msg_data)] head.extend(msg_data) send_data = bytearray(head) hx = send_data.hex() # logger.debug(f"send data: {' '.join([hx[i:i + 2] for i in range(0, len(hx), 2)])}") if not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.write_queue.put_nowait, send_data) async def is_connect(self, parm): self.is_connected = parm def is_alive(self): return self.is_connected def stop(self): try: self.loop.call_soon_threadsafe(self.write_queue.put_nowait, None) # self.loop.call_soon_threadsafe(self.stop_event.set) except Exception as e: logger.error(f'{self.FLAG} stop error: {str(e)}') 整理成一个可复用的

修改子类模块(1.输入分析)的问题:1.用户已经在“全局配置”里把“排除号码的输入框,推荐号码的标签及条目”都已经布局完成,现在用户要求,子类模块(1.输入分析)与全局配置里“ 'input_analysis': '1. 输入分析'建立通信信道的互相订阅和发送模式。当用户把全局配置里“1. 输入分析”的里[排除号码]的数据输入完成后,点击它里面的“运行”按钮,这时就会立即向模块1.输入分析发送“运行”指令,同时也把“排除号码”的数据一并发送给模块1,看来,在实际分析模块1代码里,就不要输入界面,输入框,按钮了,就直接把运行代码分析的结果(指推荐号码)又原路发送给全局配置里的子类界面里的“推荐号码”里,因此,可以删除在模块1里的输入界面,输入框及其控件和发送或接收成功的代码片段。根据用户的描述,进行修改。生成完整代码。。原代码为# =================== 1.输入分析 =============================== import json import logging import os import queue import threading import time import tkinter as tk import uuid from tkinter import Button, Entry, Frame, Label, LabelFrame, messagebox, Toplevel from typing import Any, Dict, List, Optional, Union # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) # 全局通信队列 global_channel = queue.Queue() ack_channel = queue.Queue() # 号码池ID POOL_ID = "pool.pool_id" # 号码池保存路径 POOL_SAVE_PATH = os.path.join(os.path.expanduser("~"), "Desktop", "号码池.json") MODULE1_SAVE_PATH = os.path.join(os.path.expanduser("~"), "Desktop", "saved_numbers.json") class EventPublisher: """事件发布器基类""" def publish_event(self, event_dict: Dict[str, Any], channel: queue.Queue) -> bool: """发布事件到指定通道""" try: event_dict["event_id"] = str(uuid.uuid4()) event_dict["timestamp"] = time.time() channel.put(event_dict) logging.info(f"[Publisher] 事件发布成功: {event_dict['event_type']}") return True except Exception as e: logging.error(f"[Publisher] 传输失败: {str(e)}") return False class EventSubscriber: """事件订阅器基类""" def __init__(self, channel: queue.Queue): self._running = False self._thread = None self._channel = channel def start(self): """启动订阅线程""" self._running = True self._thread = threading.Thread(target=self._event_loop) self._thread.daemon = True self._thread.start() logging.info("[Subscriber] 订阅器已启动") def stop(self): """停止订阅线程""" self._running = False if self._thread and self._thread.is_alive(): self._thread.join(timeout=1.0) def _event_loop(self): """事件循环""" while self._running: try: event = self._channel.get(timeout=0.5) if event: self.handle_event(event) except queue.Empty: continue except Exception as e: logging.error(f"[Subscriber] 事件处理错误: {str(e)}") def handle_event(self, event: Dict[str, Any]): """处理事件(子类实现)""" raise NotImplementedError class ModuleBase: """模块基类""" def __init__(self, name: str): self.name = name self.root_window = None def set_root_window(self, root): """设置根窗口""" self.root_window = root class Module1(ModuleBase, EventPublisher): """1.输入分析""" MODULE1_ID = "module1.module1_id" def __init__(self): ModuleBase.__init__(self, "module1") EventPublisher.__init__(self) self._lock = threading.Lock() self._pending_events = {} self.root = None self.submit_btn = None self.close_btn = None self.status_var = tk.StringVar() self.input_window = None # 新增:模块1输入窗口引用 self.load_saved_numbers() def set_root(self, root): """设置根窗口""" self.root = root def set_buttons(self, submit_btn: Button, close_btn: Button): """设置按钮引用""" self.submit_btn = submit_btn self.close_btn = close_btn def show_input_window(self): """显示输入窗口(新增方法)""" if self.input_window is None: self.input_window = Toplevel(self.root) self.input_window.title("1.输入分析 ") self.create_ui() self.populate_entries_with_saved_data() self.input_window.deiconify() self.input_window.lift() def create_ui(self): """创建UI界面(原NumberInputApp的UI创建代码移到这里)""" if not self.input_window: return main_frame = Frame(self.input_window, padx=20, pady=20) main_frame.pack(fill=tk.BOTH, expand=True) # 前区输入部分 front_frame = LabelFrame(main_frame, text="前区号码 (1-35)", padx=10, pady=10) front_frame.pack(fill=tk.X, pady=(0, 15)) Label(front_frame, text="输入要排除的号码:").grid(row=0, column=0, columnspan=5, sticky=tk.W, pady=5) self.front_entries = [] # 初始化前区输入框列表 for idx in range(10): row = 1 + idx // 5 col = idx % 5 frame = Frame(front_frame) frame.grid(row=row, column=col, padx=5, pady=5) Label(frame, text=f"{idx + 1}:").pack(side=tk.LEFT) entry = Entry(frame, width=3, validate="key", validatecommand=(frame.register(self.validate_number), '%P', 35)) entry.pack(side=tk.LEFT) self.front_entries.append(entry) # 后区输入部分 back_frame = LabelFrame(main_frame, text="后区号码 (1-12)", padx=10, pady=10) back_frame.pack(fill=tk.X, pady=(0, 20)) Label(back_frame, text="输入要排除的号码:").grid(row=0, column=0, columnspan=5, sticky=tk.W, pady=5) self.back_entries = [] # 初始化后区输入框列表 for idx in range(10): row = 1 + idx // 5 col = idx % 5 frame = Frame(back_frame) frame.grid(row=row, column=col, padx=5, pady=5) Label(frame, text=f"{idx + 1}:").pack(side=tk.LEFT) entry = Entry(frame, width=3, validate="key", validatecommand=(frame.register(self.validate_number), '%P', 12)) entry.pack(side=tk.LEFT) self.back_entries.append(entry) # 按钮区域 btn_frame = Frame(main_frame) btn_frame.pack(fill=tk.X, pady=10) Button(btn_frame, text="保存", width=12, command=self.save_numbers).pack(side=tk.LEFT, padx=10) Button(btn_frame, text="清除", width=12, command=self.clear_entries).pack(side=tk.LEFT, padx=10) # 提交分析按钮 self.submit_btn = Button(btn_frame, text="提交", width=12, command=self.submit_numbers, bg="#4285F4", fg="white", state=tk.NORMAL) self.submit_btn.pack(side=tk.RIGHT, padx=10) # 关闭按钮 self.close_btn = Button(btn_frame, text="关闭", width=12, command=self.input_window.destroy, state=tk.DISABLED) self.close_btn.pack(side=tk.RIGHT, padx=10) # 状态栏 status_bar = Label(self.input_window, textvariable=self.status_var, bd=1, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(side=tk.BOTTOM, fill=tk.X) @staticmethod def validate_number(value: str, max_val: int) -> bool: """验证输入是否为有效数字""" if value == "": return True try: num = int(value) return 1 <= num <= int(max_val) except ValueError: return False def get_valid_numbers(self, entries, max_val: int) -> List[int]: """获取有效的号码输入""" numbers = [] for entry in entries: value = entry.get().strip() if value: try: num = int(value) if 1 <= num <= max_val and num not in numbers: numbers.append(num) except ValueError: pass return sorted(numbers) def save_numbers(self): """保存当前输入的号码""" front_nums = [entry.get() for entry in self.front_entries] back_nums = [entry.get() for entry in self.back_entries] data = { "fronts": front_nums, "backs": back_nums, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } try: with open(MODULE1_SAVE_PATH, "w") as f: json.dump(data, f) self.status_var.set("号码保存成功") except Exception as e: self.status_var.set(f"保存失败: {str(e)}") logging.error(f"保存号码失败: {str(e)}") def load_saved_numbers(self): """从文件加载已保存的号码""" try: with open(MODULE1_SAVE_PATH, 'r') as f: data = json.load(f) self.saved_front_numbers = [int(num) for num in data['fronts'] if num.strip().isdigit()] self.saved_back_numbers = [int(num) for num in data['backs'] if num.strip().isdigit()] self.status_var.set(f"加载保存的号码 (保存于 {data.get('timestamp', '未知时间')})") except (FileNotFoundError, json.JSONDecodeError, KeyError): self.saved_front_numbers = [] self.saved_back_numbers = [] self.status_var.set("没有找到保存的号码") def populate_entries_with_saved_data(self): """用保存的数据填充输入框""" for idx, num in enumerate(self.saved_front_numbers[:10]): self.front_entries[idx].insert(0, str(num)) for idx, num in enumerate(self.saved_back_numbers[:10]): self.back_entries[idx].insert(0, str(num)) def clear_entries(self): """清除所有输入""" for entry in self.front_entries + self.back_entries: entry.delete(0, tk.END) self.status_var.set("已清除所有输入") def submit_numbers(self): """提交号码进行分析""" fronts = self.get_valid_numbers(self.front_entries, 35) backs = self.get_valid_numbers(self.back_entries, 12) if not fronts and not backs: messagebox.showwarning("输入错误", "请至少输入一个前区或后区号码") return self.status_var.set("正在提交分析请求...") if self.submit_numbers_to_pool(fronts, backs): self.status_var.set("分析请求已提交") else: self.status_var.set("提交失败") def submit_numbers_to_pool(self, fronts: List[int], backs: List[int]) -> bool: """提交号码到号码池""" if len(fronts) < 1 or len(backs) < 1: self.show_message("错误", "至少需要1个前区和1个后区号码") return False if self.submit_btn: self.root.after(0, lambda: self.submit_btn.config(state=tk.DISABLED)) event = { "event_type": "number_update", "source": self.name, "pool_id": POOL_ID, "data": { "fronts": fronts, "backs": backs, "status": "ready_for_transfer" } } if self.publish_event(event, global_channel): event_id = event["event_id"] with self._lock: self._pending_events[event_id] = { "status": "pending", "timestamp": time.time() } return True return False def handle_ack(self, ack_event: Dict[str, Any]): """处理确认消息""" event_id = ack_event.get("event_id") with self._lock: if event_id in self._pending_events: self._pending_events[event_id]["status"] = ack_event.get("status", "unknown") self._pending_events[event_id]["response"] = ack_event if ack_event.get("status") == "success": self.show_message("成功", "号码已提交到号码池") if self.close_btn: self.root.after(0, lambda: self.close_btn.config(state=tk.NORMAL)) else: self.show_message("错误", f"提交失败: {ack_event.get('message', '未知错误')}") if self.submit_btn: self.root.after(0, lambda: self.submit_btn.config(state=tk.NORMAL)) # 清理超过5分钟的事件 current_time = time.time() for eid in list(self._pending_events.keys()): if current_time - self._pending_events[eid]["timestamp"] > 300: del self._pending_events[eid] def show_message(self, title: str, message: str): """显示消息对话框""" if self.root: self.root.after(0, lambda: messagebox.showinfo(title, message)) class EventCPool(EventSubscriber): """事件C号码池""" def __init__(self): super().__init__(global_channel) self.front_numbers: List[int] = [] self.back_numbers: List[int] = [] self.number_pool = { "前区": {"name": "front_area", "type": 1, "numbers": []}, "后区": {"name": "back_area", "type": 2, "numbers": []} } self._lock = threading.Lock() self.root_window = None self.module1_ref = None def set_root_window(self, root): """设置根窗口""" self.root_window = root def set_module1_ref(self, module1): """设置模块1引用""" self.module1_ref = module1 def handle_event(self, event: Dict[str, Any]): """处理事件""" if event.get("event_type") == "start_module1" and event.get("source") == "pool": self.start_module1() elif event.get("event_type") == "number_update" and event.get("source") == "module1": data = event.get("data", {}) fronts = data.get("fronts", []) backs = data.get("backs", []) event_id = event.get("event_id") with self._lock: self.front_numbers = fronts self.back_numbers = backs logging.info(f"[EventC] 收到数据: 前区{fronts} 后区{backs}") processed_data = self.process_numbers(fronts, backs) with self._lock: self.number_pool["前区"]["numbers"] = processed_data["recommended_fronts"] self.number_pool["后区"]["numbers"] = processed_data["recommended_backs"] self.save_number_pool() logging.info(f"[EventC] 号码池已更新并保存") self.show_recommendations(processed_data) self.send_ack(event_id, "success", "数据已处理") def start_module1(self): """启动模块1""" if self.module1_ref: self.module1_ref.show_input_window() @staticmethod def process_numbers(fronts: List[int], backs: List[int]) -> Dict[str, Any]: """处理号码数据""" all_fronts = set(range(1, 36)) all_backs = set(range(1, 13)) exclude_fronts = set(fronts) exclude_backs = set(backs) valid_exclude_fronts = [x for x in exclude_fronts if x in all_fronts] valid_exclude_backs = [x for x in exclude_backs if x in all_backs] recommended_fronts = sorted(list(all_fronts - exclude_fronts)) recommended_backs = sorted(list(all_backs - exclude_backs)) return { "exclude_fronts": valid_exclude_fronts, "exclude_backs": valid_exclude_backs, "recommended_fronts": recommended_fronts, "recommended_backs": recommended_backs } def save_number_pool(self): """保存号码池到文件""" try: directory = os.path.dirname(POOL_SAVE_PATH) if directory and not os.path.exists(directory): os.makedirs(directory) with open(POOL_SAVE_PATH, 'w', encoding='utf-8') as f: json.dump(self.number_pool, f, ensure_ascii=False, indent=2) logging.info(f"[EventC] 号码池已保存到: {POOL_SAVE_PATH}") except Exception as e: logging.error(f"[EventC] 保存号码池失败: {str(e)}") def show_recommendations(self, data: Dict[str, Any]): """显示推荐号码""" front_rec = [str(x).zfill(2) for x in data["recommended_fronts"]] back_rec = [str(x).zfill(2) for x in data["recommended_backs"]] front_ex = [str(x).zfill(2) for x in data["exclude_fronts"]] back_ex = [str(x).zfill(2) for x in data["exclude_backs"]] message = ( f"排除号码 - 前区: {', '.join(front_ex)}\n" f"排除号码 - 后区: {', '.join(back_ex)}\n\n" f"推荐号码 - 前区 ({len(front_rec)}个):\n{', '.join(front_rec)}\n\n" f"推荐号码 - 后区 ({len(back_rec)}个):\n{', '.join(back_rec)}" ) if self.root_window: self.root_window.after(0, lambda: messagebox.showinfo("推荐号码", message)) @staticmethod def send_ack(event_id: str, status: str, message: str): """发送确认消息""" ack_event = { "event_type": "ack", "source": "event_c_pool", "target": "module1", "event_id": event_id, "status": status, "message": message, "timestamp": time.time() } ack_channel.put(ack_event) logging.info(f"[EventC] 发送ACK: {event_id}") class AckHandler(EventSubscriber): """确认消息处理器""" def __init__(self, module1: Module1): super().__init__(ack_channel) self.module1 = module1 def handle_event(self, event: Dict[str, Any]): """处理确认消息""" if event.get("event_type") == "ack" and event.get("target") == "module1": self.module1.handle_ack(event) class NumberPoolApp: """号码池应用主类""" def __init__(self, root): self.root = root self.root.title("号码池服务") self.root.geometry("300x150") # 初始化号码池 self.event_c_pool = EventCPool() self.event_c_pool.set_root_window(root) # 初始化模块1 self.module1 = Module1() self.module1.set_root(root) self.event_c_pool.set_module1_ref(self.module1) # 确认消息处理器 self.ack_handler = AckHandler(self.module1) # 启动订阅器 self.event_c_pool.start() self.ack_handler.start() # 创建UI self.create_ui() def create_ui(self): """创建用户界面""" main_frame = Frame(self.root, padx=20, pady=20) main_frame.pack(fill=tk.BOTH, expand=True) # 状态标签 self.status_var = tk.StringVar(value="号码池就绪,等待启动...") Label(main_frame, textvariable=self.status_var).pack(pady=10) # 继续按钮 self.continue_btn = Button(main_frame, text="继续", width=12, command=self.start_module1, bg="green", fg="white") self.continue_btn.pack(pady=10) def start_module1(self): """启动模块1""" self.status_var.set("正在启动1.输入分析") self.continue_btn.config(state=tk.DISABLED) # 发送启动事件 start_event = { "event_type": "start_module1", "source": "pool", "pool_id": POOL_ID, "timestamp": time.time() } global_channel.put(start_event) # 显示模块1输入窗口 self.module1.show_input_window() self.status_var.set("1.输入分析已启动") def main(): """主函数""" root = tk.Tk() app = NumberPoolApp(root) try: root.mainloop() finally: # 清理资源 app.event_c_pool.stop() app.ack_handler.stop() if __name__ == "__main__": main()

刚才的分析推理是错的,用户现在要求把下面的代码只需要删除对话框,输入方式,界面的生成,保存,清除按钮及其功能,显示方法等等,列举出来的内容全部删除,只需要与模块ID: {GlobalConfig.MODULE1_ID}建立订阅发送机制,接收到模块ID: {GlobalConfig.MODULE1_ID}传递过来的“运行”指令后,就开始接收数据,把接收过来的数据在这里进行分析,然后原路返回到模块ID: {GlobalConfig.MODULE1_ID}动态区里,按照标签同名,一一对应进行传递,同时号码池也一并接收数据。强调:一定遵循用户要求来写模块1的代码。。用户原来设计的代码片段(1.输入分析)import json import logging import os import queue import threading import time import tkinter as tk from tkinter import Button, Entry, Frame, Label, LabelFrame, messagebox, Toplevel from typing import Any, Dict, List, Optional, Union # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) # 全局通信队列 global_channel = queue.Queue() # 号码池ID POOL_ID = "pool.pool_id" # 号码池保存路径 POOL_SAVE_PATH = os.path.join(os.path.expanduser("~"), "Desktop", "号码池.json") class EventPublisher: """事件发布器基类""" def publish_event(self, event_dict: Dict[str, Any], channel: queue.Queue) -> bool: """发布事件到指定通道""" try: event_dict["event_id"] = str(uuid.uuid4()) event_dict["timestamp"] = time.time() channel.put(event_dict) logging.info(f"[Publisher] 事件发布成功: {event_dict['event_type']}") return True except Exception as e: logging.error(f"[Publisher] 传输失败: {str(e)}") return False class EventSubscriber: """事件订阅器基类""" def __init__(self, channel: queue.Queue): self._running = False self._thread = None self._channel = channel def start(self): """启动订阅线程""" self._running = True self._thread = threading.Thread(target=self._event_loop) self._thread.daemon = True self._thread.start() logging.info("[Subscriber] 订阅器已启动") def stop(self): """停止订阅线程""" self._running = False if self._thread and self._thread.is_alive(): self._thread.join(timeout=1.0) def _event_loop(self): """事件循环""" while self._running: try: event = self._channel.get(timeout=0.5) if event: self.handle_event(event) except queue.Empty: continue except Exception as e: logging.error(f"[Subscriber] 事件处理错误: {str(e)}") def handle_event(self, event: Dict[str, Any]): """处理事件(子类实现)""" raise NotImplementedError class ModuleBase: """模块基类""" def __init__(self, name: str): self.name = name self.root_window = None def set_root_window(self, root): """设置根窗口""" self.root_window = root class Module1(ModuleBase, EventPublisher): """1.输入分析(无界面版)""" MODULE1_ID = "module1.module1_id" def __init__(self): ModuleBase.__init__(self, "module1") EventPublisher.__init__(self) self._lock = threading.Lock() self._pending_events = {} def process_numbers(self, fronts: List[int], backs: List[int]) -> Dict[str, Any]: """处理号码数据""" all_fronts = set(range(1, 36)) all_backs = set(range(1, 13)) exclude_fronts = set(fronts) exclude_backs = set(backs) valid_exclude_fronts = [x for x in exclude_fronts if x in all_fronts] valid_exclude_backs = [x for x in exclude_backs if x in all_backs] recommended_fronts = sorted(list(all_fronts - exclude_fronts)) recommended_backs = sorted(list(all_backs - exclude_backs)) return { "exclude_fronts": valid_exclude_fronts, "exclude_backs": valid_exclude_backs, "recommended_fronts": recommended_fronts, "recommended_backs": recommended_backs } def handle_event(self, event: Dict[str, Any]): """处理事件""" if event.get("event_type") == "run_analysis" and event.get("source") == "global_config": data = event.get("data", {}) fronts = data.get("fronts", []) backs = data.get("backs", []) event_id = event.get("event_id") logging.info(f"[Module1] 收到分析请求: 前区{fronts} 后区{backs}") # 处理数据 result = self.process_numbers(fronts, backs) # 发送分析结果 result_event = { "event_type": "analysis_result", "source": "module1", "target": "global_config", "event_id": event_id, "data": result } self.publish_event(result_event, global_channel) class EventCPool(EventSubscriber): """事件C号码池""" def __init__(self): super().__init__(global_channel) self.front_numbers: List[int] = [] self.back_numbers: List[int] = [] self.number_pool = { "前区": {"name": "front_area", "type": 1, "numbers": []}, "后区": {"name": "back_area", "type": 2, "numbers": []} } self._lock = threading.Lock() self.root_window = None self.module1_ref = None def set_root_window(self, root): """设置根窗口""" self.root_window = root def set_module1_ref(self, module1): """设置模块1引用""" self.module1_ref = module1 def handle_event(self, event: Dict[str, Any]): """处理事件""" if event.get("event_type") == "run_analysis" and event.get("source") == "global_config": # 转发给模块1处理 if self.module1_ref: self.module1_ref.handle_event(event) elif event.get("event_type") == "analysis_result" and event.get("target") == "global_config": # 处理分析结果 data = event.get("data", {}) with self._lock: self.number_pool["前区"]["numbers"] = data.get("recommended_fronts", []) self.number_pool["后区"]["numbers"] = data.get("recommended_backs", []) self.save_number_pool() logging.info(f"[EventC] 号码池已更新并保存") # 更新全局配置界面 if self.root_window and hasattr(self.root_window, "update_recommendations"): self.root_window.after(0, lambda: self.root_window.update_recommendations( data.get("recommended_fronts", []), data.get("recommended_backs", []) )) def save_number_pool(self): """保存号码池到文件""" try: directory = os.path.dirname(POOL_SAVE_PATH) if directory and not os.path.exists(directory): os.makedirs(directory) with open(POOL_SAVE_PATH, 'w', encoding='utf-8') as f: json.dump(self.number_pool, f, ensure_ascii=False, indent=2) logging.info(f"[EventC] 号码池已保存到: {POOL_SAVE_PATH}") except Exception as e: logging.error(f"[EventC] 保存号码池失败: {str(e)}") class NumberPoolApp: """号码池应用主类""" def __init__(self, root): self.root = root self.root.title("号码池服务") self.root.geometry("300x150") # 初始化号码池 self.event_c_pool = EventCPool() self.event_c_pool.set_root_window(root) # 初始化模块1 self.module1 = Module1() self.module1.set_root_window(root) self.event_c_pool.set_module1_ref(self.module1) # 启动订阅器 self.event_c_pool.start() # 创建UI self.create_ui() def create_ui(self): """创建用户界面""" main_frame = Frame(self.root, padx=20, pady=20) main_frame.pack(fill=tk.BOTH, expand=True) # 状态标签 self.status_var = tk.StringVar(value="号码池就绪,等待启动...") Label(main_frame, textvariable=self.status_var).pack(pady=10) def update_recommendations(self, fronts: List[int], backs: List[int]): """更新全局配置的推荐号码显示""" if hasattr(self.root, "update_recommendations"): self.root.update_recommendations(fronts, backs) def main(): """主函数""" root = tk.Tk() app = NumberPoolApp(root) try: root.mainloop() finally: # 清理资源 app.event_c_pool.stop() if __name__ == "__main__": main()

修改模块1.输入分析问题:用户点击全局配置里按钮(1.输入分析)后,提示已发送运行指令和排除号码数据。可模块1就是没有回应,也没有在控制台出现任何提示语。模块1 代码# =================== 模块1.输入分析 =============================== from 全局配置 import GlobalConfig, UI_CONFIG import logging import queue from typing import Dict, Any from event_center import Event, EventType, event_center from global_config import GlobalConfig # 正确配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 全局通信队列 global_channel = queue.Queue() class Module1: MODULE1_ID = GlobalConfig.module1_id def __init__(self): self._setup_event_handlers() def _setup_event_handlers(self): """订阅运行事件,严格匹配 token""" event_center.subscribe( EventType.MODULE_RUN, self._handle_run_event, token=GlobalConfig.module1_id ) def _handle_run_event(self, event: Event): """处理运行事件,严格验证 token""" if (event.source == GlobalConfig.module1_id and event.token == GlobalConfig.module1_id and event.target == 'module1'): logger.info(f"[Module1] 收到分析请求 (Token: {event.token})") result = self._analyze_numbers(event.data) result_event = Event( event_id=int(event.token[-4:]), type=EventType.ANALYSIS_RESULT, source='module1', target=GlobalConfig.module1_id, data=result, token=GlobalConfig.module1_id ) event_center.publish(result_event) # 直接使用 event_center 实例 @staticmethod def _analyze_numbers(data: dict) -> dict: """分析号码数据""" exclude_front = data.get('exclude_front', []) exclude_back = data.get('exclude_back', []) all_front = set(range(1, 36)) all_back = set(range(1, 13)) return { 'recommended_fronts': sorted(all_front - set(exclude_front)), 'recommended_backs': sorted(all_back - set(exclude_back)) } def handle_event(self, event_data: Dict[str, Any]): """处理事件""" if event_data.get("event_type") == "run_analysis" and event_data.get("target") == self.MODULE1_ID: data = event_data.get("data", {}) fronts = data.get("exclude_front", []) backs = data.get("exclude_back", []) source = event_data.get("source") logger.info(f"[Module1] 收到分析请求: 前区{fronts} 后区{backs}") result = self._analyze_numbers({ 'exclude_front': fronts, 'exclude_back': backs }) result_event = Event( event_id=1234, type=EventType.ANALYSIS_RESULT, source=self.MODULE1_ID, target=source, data=result, token=None ) event_center.publish(result_event) # 直接使用 event_center 实例 class EventPublisher: """事件发布器基类""" def publish_event(self, event_dict: Dict[str, Any], channel: queue.Queue) -> bool: """发布事件到指定通道""" try: event_dict["event_id"] = str(int(1000)) event_dict["timestamp"] = 1.0 channel.put(event_dict) logger.info(f"[Publisher] 事件发布成功: {event_dict['event_type']}") return True except Exception as exc: logger.error(f"[Publisher] 传输失败: {str(exc)}") return False class EventSubscriber: """事件订阅器基类""" def __init__(self, channel: queue.Queue): self._running = False self._thread = None self._channel = channel def start(self): """启动订阅线程""" self._running = True import threading self._thread = threading.Thread(target=self._event_loop, daemon=True) self._thread.start() logger.info("[Subscriber] 订阅器已启动") def stop(self): """停止订阅线程""" self._running = False if self._thread: self._thread.join() def _event_loop(self): """事件循环""" while self._running: try: event = self._channel.get(timeout=0.5) if event: self.handle_event(event) except queue.Empty: continue def handle_event(self, event: Dict[str, Any]): """处理事件(子类实现)""" raise NotImplementedError class EventCPool(EventSubscriber): MODULE1_ID = GlobalConfig.module1_id def __init__(self): super().__init__(global_channel) self.number_pool = { "前区": {"name": "front_area", "type": 1, "numbers": []}, "后区": {"name": "back_area", "type": 2, "numbers": []} } def handle_event(self, event: Dict[str, Any]): """处理事件""" if event.get("event_type") == "analysis_result": data = event.get("data", {}) logger.info(f"[Pool] 收到分析结果: {data}") # 更新号码池逻辑 self._update_pool(data) def _update_pool(self, data: Dict[str, Any]): """更新号码池""" fronts = data.get("recommended_fronts", []) backs = data.get("recommended_backs", []) self.number_pool["前区"]["numbers"] = fronts self.number_pool["后区"]["numbers"] = backs logger.info(f"[Pool] 号码池已更新 - 前区: {fronts}, 后区: {backs}") if __name__ == "__main__": # 初始化组件 module1 = Module1() pool_handler = EventCPool() # 启动订阅器 pool_handler.start() # 创建测试事件 test_event = { "event_type": "run_analysis", "source": "test_sender", "target": Module1.MODULE1_ID, "data": { "exclude_front": [], "exclude_back": [] } } # 处理测试事件 module1.handle_event(test_event) # 模拟运行一段时间 import time time.sleep(1) # 停止订阅器 pool_handler.stop()

import json import os import threading import PySimpleGUI as sg import telnetlib import time import queue import logging from datetime import datetime import re # ============== 全局配置 ============== DATA_FILE = 'pyremote_config.json' PROJECTS_FILE = 'projects_config.json' END_STR = "ROUTER_MPU /home\x1b[m # " LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' ENCODING = 'utf-8' BUFFER_SIZE = 4096 CHUNK_SIZE = 10 CHUNK_DELAY = 0.15 COMMAND_DELAY = 0.5 MAX_PORTS = 2 # 支持的最大端口数 # ============== 日志配置 ============== def setup_logger(): """配置日志系统""" logger = logging.getLogger('RemoteControl') logger.setLevel(logging.DEBUG) # 文件处理器 file_handler = logging.FileHandler('remote_control.log') file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter(LOG_FORMAT)) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger = setup_logger() # ============== 项目管理类 ============== class ProjectManager: def __init__(self): self.projects = self.load_projects() def load_projects(self): """加载项目配置""" if os.path.exists(PROJECTS_FILE): try: with open(PROJECTS_FILE, 'r') as f: return json.load(f) except Exception as e: logger.error(f"加载项目配置失败: {e}") return {"projects": []} return {"projects": []} def save_projects(self): """保存项目配置""" try: with open(PROJECTS_FILE, 'w') as f: json.dump(self.projects, f, indent=4) return True except Exception as e: logger.error(f"保存项目配置失败: {e}") return False def add_project(self, name, commands): """添加新项目""" project = { "name": name, "commands": commands, "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } self.projects["projects"].append(project) if self.save_projects(): return project return None def update_project(self, index, name, commands): """更新项目""" if 0 <= index < len(self.projects["projects"]): self.projects["projects"][index]["name"] = name self.projects["projects"][index]["commands"] = commands if self.save_projects(): return True return False def delete_project(self, index): """删除项目""" if 0 <= index < len(self.projects["projects"]): del self.projects["projects"][index] return self.save_projects() return False def import_projects(self, file_path): """导入项目配置""" try: with open(file_path, 'r') as f: imported = json.load(f) if "projects" in imported and isinstance(imported["projects"], list): self.projects = imported return self.save_projects() except Exception as e: logger.error(f"导入项目失败: {e}") return False def export_projects(self, file_path): """导出项目配置""" try: with open(file_path, 'w') as f: json.dump(self.projects, f, indent=4) return True except Exception as e: logger.error(f"导出项目失败: {e}") return False def get_project_commands(self, index): """获取项目的命令列表""" if 0 <= index < len(self.projects["projects"]): return self.projects["projects"][index]["commands"] return [] # ============== 指令执行类(多端口支持) ============== class CommandExecutor: def __init__(self, port_id): self.port_id = port_id # 端口标识符 (1, 2, ...) self.tn = None self.is_connected = False self.response = "" self.prev_cmd = "" self.log_queue = queue.Queue() self.lock = threading.Lock() self.stop_event = threading.Event() self.pause_event = threading.Event() self.last_response = "" self.expected_prompt = END_STR self.current_command_index = -1 self.current_command = "" self.total_commands = 0 self.log_file = None # 日志文件对象 self.log_file_path = "" # 日志文件路径 self.auto_mode = False # 新增:自动模式标志 self.boot_string_detected = False # 新增:引导字符串检测标志 self.auto_completed = False # 新增:自动操作完成标志 def set_log_file(self, file_path): """设置日志文件路径""" self.log_file_path = file_path try: if self.log_file: self.log_file.close() self.log_file = open(file_path, 'a', encoding=ENCODING) return True except Exception as e: self.log_queue.put(f"打开日志文件失败: {str(e)}") return False def log_to_file(self, message): """记录日志到文件""" if self.log_file: try: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.log_file.write(f"[{timestamp}] [Port {self.port_id}] {message}\n") self.log_file.flush() except Exception as e: self.log_queue.put(f"日志写入失败: {str(e)}") def connect(self, host, port): """连接设备""" try: with self.lock: self.tn = telnetlib.Telnet(host, port, timeout=10) self.is_connected = True self.log_queue.put(f"端口 {self.port_id} 已连接到 {host}:{port}") self.log_to_file(f"Connected to {host}:{port}") # 初始读取以清除欢迎信息 self.read_until_prompt(timeout=3) # 重置自动操作状态 self.boot_string_detected = False self.auto_completed = False return True except Exception as e: self.log_queue.put(f"端口 {self.port_id} 连接失败: {str(e)}") return False def disconnect(self): """断开连接""" with self.lock: if self.tn: try: self.tn.close() except: pass self.is_connected = False self.log_queue.put(f"端口 {self.port_id} 连接已断开") self.log_to_file("Disconnected") if self.log_file: try: self.log_file.close() except: pass self.log_file = None def send(self, content): """发送命令(分块发送)""" if not self.is_connected: self.log_queue.put(f"端口 {self.port_id} 错误:未连接到设备") return False try: with self.lock: # 保存当前命令 self.current_command = content # 使用正确的编码发送 encoded_content = content.encode(ENCODING) # 分块发送 chunks = [encoded_content[i:i+CHUNK_SIZE] for i in range(0, len(encoded_content), CHUNK_SIZE)] for chunk in chunks: if self.stop_event.is_set(): self.log_queue.put(f"端口 {self.port_id} 发送已中止") return False while self.pause_event.is_set(): time.sleep(0.1) if self.stop_event.is_set(): return False self.tn.write(chunk) log_msg = f"发送分块: {chunk.decode(ENCODING, errors='replace')}" self.log_queue.put(log_msg) self.log_to_file(log_msg) time.sleep(CHUNK_DELAY) # 发送回车符 self.tn.write(b"\r\n") self.log_queue.put(f"端口 {self.port_id} 发送回车符") self.log_to_file("Sent ENTER") self.log_queue.put(f"端口 {self.port_id} 完整发送: {content.strip()}") self.log_to_file(f"Sent: {content.strip()}") self.prev_cmd = content return True except Exception as e: self.log_queue.put(f"端口 {self.port_id} 发送命令失败: {str(e)}") self.log_to_file(f"Send error: {str(e)}") return False def read_until_prompt(self, timeout=5): """读取直到遇到提示符""" if not self.is_connected: return "" try: response = self.tn.read_until(self.expected_prompt.encode(ENCODING), timeout=timeout) decoded_response = response.decode(ENCODING, errors='replace') # 移除命令回显 if self.prev_cmd and decoded_response.startswith(self.prev_cmd.strip()): decoded_response = decoded_response[len(self.prev_cmd.strip()):].lstrip() # 移除尾部的提示符 if decoded_response.endswith(self.expected_prompt): decoded_response = decoded_response[:-len(self.expected_prompt)].rstrip() self.last_response = decoded_response # 记录到日志 if decoded_response.strip(): self.log_queue.put(f"端口 {self.port_id} 响应: {decoded_response}") self.log_to_file(f"Response: {decoded_response}") return decoded_response except Exception as e: error_msg = f"端口 {self.port_id} 接收响应失败: {str(e)}" self.log_queue.put(error_msg) self.log_to_file(error_msg) return "" def execute_commands(self, commands): """执行命令序列(增强版)""" if not self.is_connected: self.log_queue.put(f"端口 {self.port_id} 错误:未连接到设备") return False self.stop_event.clear() self.pause_event.clear() self.total_commands = len(commands) self.log_queue.put(f"端口 {self.port_id} 开始执行 {self.total_commands} 条命令") self.log_to_file(f"Starting execution of {self.total_commands} commands") try: for idx, cmd in enumerate(commands): if self.stop_event.is_set(): self.log_queue.put(f"端口 {self.port_id} 命令执行已中止") self.log_to_file("Execution aborted") return False # 设置当前命令索引 self.current_command_index = idx # 检查暂停状态 while self.pause_event.is_set(): time.sleep(0.1) if self.stop_event.is_set(): return False if cmd.strip(): # 跳过空行 # 发送命令 if not self.send(cmd): return False # 等待命令完成 self.read_until_prompt(timeout=10) # 命令间延迟 delay_remaining = COMMAND_DELAY while delay_remaining > 0: if self.stop_event.is_set(): return False if self.pause_event.is_set(): time.sleep(0.1) continue time.sleep(0.1) delay_remaining -= 0.1 self.log_queue.put(f"端口 {self.port_id} 命令执行完成") self.log_to_file("Execution completed") self.current_command_index = -1 return True except Exception as e: error_msg = f"端口 {self.port_id} 命令执行失败: {str(e)}" self.log_queue.put(error_msg) self.log_to_file(error_msg) return False def stop_execution(self): """停止当前执行""" self.stop_event.set() self.log_queue.put(f"端口 {self.port_id} 正在停止执行...") self.log_to_file("Stopping execution") def pause_execution(self): """暂停当前执行""" if not self.pause_event.is_set(): self.pause_event.set() self.log_queue.put(f"端口 {self.port_id} 执行已暂停") self.log_to_file("Execution paused") return True return False def resume_execution(self): """继续执行""" if self.pause_event.is_set(): self.pause_event.clear() self.log_queue.put(f"端口 {self.port_id} 执行已继续") self.log_to_file("Execution resumed") return True return False def get_execution_status(self): """获取执行状态""" return { "port_id": self.port_id, "is_connected": self.is_connected, "is_running": not self.stop_event.is_set() and not self.pause_event.is_set(), "is_paused": self.pause_event.is_set(), "is_stopped": self.stop_event.is_set(), "current_command": self.current_command.strip(), "current_index": self.current_command_index, "total_commands": self.total_commands, "log_file_path": self.log_file_path, "boot_string_detected": self.boot_string_detected, "auto_completed": self.auto_completed } # ============== 新增自动操作功能 ============== def detect_boot_string_and_auto_operation(self): """检测引导字符串并执行自动操作""" if not self.is_connected or self.auto_completed: return try: # 读取响应并检测引导字符串 response = self.read_until_prompt(timeout=1) if "Press 'Ctrl+T' to skip boot" in response: self.boot_string_detected = True self.log_queue.put(f"端口 {self.port_id} 检测到引导字符串,执行自动操作") self.log_to_file("Detected boot string, performing auto operation") # 在1秒内发送Ctrl+T self.log_queue.put(f"端口 {self.port_id} 发送 Ctrl+T") self.log_to_file("Sending Ctrl+T") self.send_ctrl_t() # 等待5秒后发送watchdog命令 time.sleep(6) self.log_queue.put(f"端口 {self.port_id} 发送 watchdog -close") self.log_to_file("Sending watchdog -close") self.send("watchdog -close") # 标记自动操作完成 self.auto_completed = True except Exception as e: self.log_queue.put(f"端口 {self.port_id} 自动操作失败: {str(e)}") self.log_to_file(f"Auto operation failed: {str(e)}") def send_ctrl_t(self): """发送Ctrl+T组合键""" if not self.is_connected: return False try: with self.lock: # Ctrl+T 的ASCII码是20 self.tn.write(b"\x14") self.log_queue.put(f"端口 {self.port_id} 已发送 Ctrl+T") self.log_to_file("Sent Ctrl+T") return True except Exception as e: self.log_queue.put(f"端口 {self.port_id} 发送Ctrl+T失败: {str(e)}") self.log_to_file(f"Send Ctrl+T error: {str(e)}") return False # ============== GUI 界面类(多端口支持) ============== class RemoteControlApp: def __init__(self): self.project_manager = ProjectManager() self.default_data = self.load_default_data() self.executors = {} # 端口ID到执行器的映射 self.window = None self.setup_gui() self.running = True # 初始化端口执行器 for port_id in range(1, MAX_PORTS + 1): self.executors[port_id] = CommandExecutor(port_id) def load_default_data(self): """加载默认配置""" default_data = { "IP1": "71.19.0.120", "port1": "1001", "IP2": "71.19.0.120", "port2": "1002", "FTP_IP": "71.19.0.120", "芯片名称": "Hi1260SV100", '发送信息': "", "board_local_ip": "71.19.0.53", 'interval': "1", "start_addr": "", "end_addr": "", "文件FTP路径": "", "log_dir": os.getcwd() # 默认日志目录 } if os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'r') as f: data = json.load(f) default_data.update({k: data[k] for k in default_data if k in data}) except Exception as e: logger.error(f"加载默认配置失败: {e}") with open(DATA_FILE, 'w') as f: json.dump(default_data, f) else: with open(DATA_FILE, 'w') as f: json.dump(default_data, f) return default_data def save_config(self): """保存配置""" try: values = self.window.read()[1] if self.window else {} config = { "IP1": values.get("IP1", self.default_data["IP1"]), "port1": values.get("port1", self.default_data["port1"]), "IP2": values.get("IP2", self.default_data["IP2"]), "port2": values.get("port2", self.default_data["port2"]), "FTP_IP": values.get("FTP_IP", self.default_data["FTP_IP"]), "芯片名称": values.get("芯片名称", self.default_data["芯片名称"]), '发送信息': values.get("发送信息", self.default_data["发送信息"]), "board_local_ip": values.get("board_local_ip", self.default_data["board_local_ip"]), 'interval': values.get("interval", self.default_data["interval"]), "start_addr": values.get("start_addr", self.default_data["start_addr"]), "end_addr": values.get("end_addr", self.default_data["end_addr"]), "文件FTP路径": values.get("文件FTP路径", self.default_data["文件FTP路径"]), "log_dir": values.get("log_dir", self.default_data["log_dir"]) } with open(DATA_FILE, 'w') as f: json.dump(config, f) self.default_data = config return True except Exception as e: logger.error(f"保存配置失败: {e}") return False def create_project_buttons(self): """创建项目按钮区域""" layout = [] projects = self.project_manager.projects["projects"] if not projects: layout.append([sg.Text("没有项目,请添加新项目", text_color='red')]) for i, project in enumerate(projects): row = [ sg.Button(project["name"], key=f'-PROJECT-{i}-', size=(15,1), tooltip=f"创建于: {project['created_at']}\n命令数: {len(project['commands'])}"), sg.Button("一键升级", key=f'-UPGRADE-{i}-', button_color=('white', 'green')), sg.Button("编辑", key=f'-EDIT-{i}-', button_color=('white', 'blue')), sg.Button("删除", key=f'-DELETE-{i}-', button_color=('white', 'red')) ] layout.append(row) # 添加项目管理按钮 layout.append([ sg.Button("添加新项目", key='-ADD-PROJECT-', button_color=('white', 'purple')), sg.Button("导入项目", key='-IMPORT-PROJECTS-'), sg.Button("导出项目", key='-EXPORT-PROJECTS-'), sg.Button("生成模板", key='-CREATE-TEMPLATE-') ]) return layout def edit_project_window(self, index=None): """项目编辑窗口""" project = None if index is not None and 0 <= index < len(self.project_manager.projects["projects"]): project = self.project_manager.projects["projects"][index] layout = [ [sg.Text("项目名称:"), sg.Input(key='-PROJECT-NAME-', default_text=project["name"] if project else "")], [sg.Text("升级指令:")], [sg.Multiline(key='-PROJECT-COMMANDS-', size=(60, 10), default_text='\n'.join(project["commands"]) if project else "", tooltip="每行一个命令,命令将按顺序执行")], [sg.Button("保存", key='-SAVE-PROJECT-'), sg.Button("取消", key='-CANCEL-PROJECT-')] ] window = sg.Window("项目编辑", layout, modal=True) while True: event, values = window.read() if event in (sg.WINDOW_CLOSED, '-CANCEL-PROJECT-'): break elif event == '-SAVE-PROJECT-': name = values['-PROJECT-NAME-'].strip() commands = [cmd.strip() for cmd in values['-PROJECT-COMMANDS-'].split('\n') if cmd.strip()] if not name: sg.popup("项目名称不能为空") continue if index is None: new_project = self.project_manager.add_project(name, commands) if new_project: sg.popup(f"项目 '{name}' 添加成功") break else: if self.project_manager.update_project(index, name, commands): sg.popup(f"项目 '{name}' 更新成功") break sg.popup("操作失败,请查看日志") window.close() return True def setup_gui(self): """设置GUI界面(多端口支持)""" sg.theme('LightBlue1') # 输出区域 output = sg.Multiline( size=(80, 20), key='-OUTPUT-', autoscroll=True, background_color='#f0f0f0', text_color='black' ) # 状态信息区域 status_info = [ sg.Text("当前命令: ", size=(10,1)), sg.Text("无", key='-CURRENT-COMMAND-', size=(40,1), text_color='blue'), sg.Text("进度: ", size=(5,1)), sg.Text("0/0", key='-COMMAND-PROGRESS-', size=(10,1)) ] # 项目列表区域 projects_frame = sg.Frame("项目列表", [ [sg.Column( self.create_project_buttons(), scrollable=True, vertical_scroll_only=True, size=(700, 100), key='-PROJECTS-COLUMN-' )] ], key='-PROJECTS-FRAME-') # 执行控制区域 control_buttons = [ sg.Button('暂停执行', key='-PAUSE-EXECUTION-', button_color=('white', 'orange'), size=(10,1)), sg.Button('继续执行', key='-RESUME-EXECUTION-', button_color=('white', 'green'), size=(10,1)), sg.Button('停止执行', key='-STOP-EXECUTION-', button_color=('white', 'red'), size=(10,1)), # 新增自动按钮 sg.Button('自动', key='-AUTO-OPERATION-', button_color=('white', 'purple'), size=(10,1), tooltip="检测引导字符串并执行自动操作") ] # 端口连接区域 port_layouts = [] for port_id in range(1, MAX_PORTS + 1): port_layout = [ sg.Text(f'端口 {port_id} IP:', size=(8,1)), sg.Input(key=f"IP{port_id}", default_text=self.default_data[f"IP{port_id}"], size=(15,1)), sg.Text('端口:', size=(5,1)), sg.Input(key=f"port{port_id}", default_text=self.default_data[f"port{port_id}"], size=(8,1)), sg.Button(f'连接{port_id}', key=f'-CONNECT-{port_id}-', button_color=('white', 'green')), sg.Button(f'断开{port_id}', key=f'-DISCONNECT-{port_id}-', button_color=('white', 'red')), sg.Text('●', key=f'-STATUS-LIGHT-{port_id}-', text_color='red', font=('Arial', 12)), sg.Text("未连接", key=f'-CONNECTION-STATUS-{port_id}-'), # 新增自动操作状态指示器 sg.Text('○', key=f'-AUTO-STATUS-{port_id}-', text_color='gray', font=('Arial', 12), tooltip="自动操作状态: ○未执行 ●已检测到引导字符串 ✓已完成") ] port_layouts.append(port_layout) # 日志保存区域 log_layout = [ sg.Text('日志目录:', size=(8,1)), sg.Input(key='log_dir', default_text=self.default_data["log_dir"], size=(40,1)), sg.FolderBrowse('浏览', key='-LOG-BROWSE-'), sg.Button('设置日志目录', key='-SET-LOG-DIR-') ] # 主布局 layout = [ # 端口连接区域 *port_layouts, [projects_frame], status_info, # 执行控制按钮 [sg.Frame("执行控制", [control_buttons], key='-CONTROL-FRAME-')], [sg.Text("发送信息:", size=(8,1)), sg.Input(key='发送信息', size=(50, 1), default_text=self.default_data["发送信息"])], [sg.Button('ROUTER_MPU下发送'), sg.Button('Shell下发送'), sg.Text('每隔', size=(3,1)), sg.Input(key='interval', default_text=self.default_data["interval"], size=(5,1)), sg.Text("秒", size=(2,1)), sg.Button('定时发送'), sg.Button("停止定时发送")], [sg.Button('接收1s')], [sg.Button("使用说明", button_color=('white', 'blue'))], [sg.Text('起始地址:', size=(8,1)), sg.Input(key='start_addr', default_text=self.default_data["start_addr"], size=(12,1)), sg.Text('结束地址:', size=(8,1)), sg.Input(key='end_addr', default_text=self.default_data["end_addr"], size=(12,1)), sg.Button('dump寄存器')], # 日志保存区域 log_layout, [output], [sg.StatusBar("就绪", key='-STATUS-', size=(50, 1))] ] self.window = sg.Window('远程单板连接工具 (多端口)', layout, finalize=True, resizable=True) self.update_status_lights() self.update_auto_status() # 新增:更新自动操作状态 def update_status_lights(self): """更新所有端口的状态指示灯""" for port_id in self.executors: executor = self.executors[port_id] color = 'green' if executor.is_connected else 'red' status = "已连接" if executor.is_connected else "未连接" self.window[f'-STATUS-LIGHT-{port_id}-'].update(text_color=color) self.window[f'-CONNECTION-STATUS-{port_id}-'].update(status) # 新增方法:更新自动操作状态指示器 def update_auto_status(self): """更新所有端口的自动操作状态指示器""" for port_id in self.executors: executor = self.executors[port_id] if executor.auto_completed: # 已完成:绿色勾号 self.window[f'-AUTO-STATUS-{port_id}-'].update('✓', text_color='green') elif executor.boot_string_detected: # 已检测到引导字符串:红色圆点 self.window[f'-AUTO-STATUS-{port_id}-'].update('●', text_color='red') else: # 未执行:灰色圆圈 self.window[f'-AUTO-STATUS-{port_id}-'].update('○', text_color='gray') def update_output(self): """更新输出区域""" output_text = self.window['-OUTPUT-'].get() updated = False # 遍历所有端口的日志队列 for port_id, executor in self.executors.items(): while not executor.log_queue.empty(): try: message = executor.log_queue.get_nowait() output_text += message + '\n' updated = True except queue.Empty: break if updated: self.window['-OUTPUT-'].update(output_text) def update_status(self): """更新状态信息""" # 更新执行状态 for port_id, executor in self.executors.items(): status = executor.get_execution_status() # 更新当前命令显示 if status["current_command"] and port_id == 1: # 只显示第一个端口的命令 self.window['-CURRENT-COMMAND-'].update(status["current_command"]) # 更新进度显示 if status["current_index"] >= 0 and status["total_commands"] > 0: progress_text = f"{status['current_index'] + 1}/{status['total_commands']}" self.window['-COMMAND-PROGRESS-'].update(progress_text) # 更新状态指示灯 color = 'green' if status["is_connected"] else 'red' self.window[f'-STATUS-LIGHT-{port_id}-'].update(text_color=color) self.window[f'-CONNECTION-STATUS-{port_id}-'].update("已连接" if status["is_connected"] else "未连接") # 更新自动操作状态 self.update_auto_status() # 更新按钮状态(基于第一个端口的状态) if self.executors[1].is_connected: status1 = self.executors[1].get_execution_status() if status1["is_running"]: self.window['-PAUSE-EXECUTION-'].update(disabled=False) self.window['-RESUME-EXECUTION-'].update(disabled=True) self.window['-STOP-EXECUTION-'].update(disabled=False) elif status1["is_paused"]: self.window['-PAUSE-EXECUTION-'].update(disabled=True) self.window['-RESUME-EXECUTION-'].update(disabled=False) self.window['-STOP-EXECUTION-'].update(disabled=False) else: self.window['-PAUSE-EXECUTION-'].update(disabled=True) self.window['-RESUME-EXECUTION-'].update(disabled=True) self.window['-STOP-EXECUTION-'].update(disabled=True) def refresh_project_list(self): """刷新项目列表""" self.window['-PROJECTS-COLUMN-'].update(visible=False) self.window['-PROJECTS-COLUMN-'].update(self.create_project_buttons()) self.window['-PROJECTS-COLUMN-'].update(visible=True) def run(self): """运行主循环(多端口支持)""" last_status_update = time.time() stop_loop = 0 interval = 10000 loop_msg = "" while True: event, values = self.window.read(timeout=100) # 100ms超时 if event == sg.WINDOW_CLOSED: break # 更新输出区域 self.update_output() # 定期更新状态(每0.5秒一次) current_time = time.time() if current_time - last_status_update > 0.5: self.update_status() last_status_update = current_time # 端口连接管理 if event.startswith('-CONNECT-'): port_id = int(event.split('-')[2]) ip_key = f"IP{port_id}" port_key = f"port{port_id}" if values[ip_key] and values[port_key]: if self.executors[port_id].connect(values[ip_key], int(values[port_key])): self.window['-STATUS-'].update(f"端口 {port_id} 已连接") self.update_status_lights() else: sg.popup(f"请输入端口 {port_id} 的IP和端口号") elif event.startswith('-DISCONNECT-'): port_id = int(event.split('-')[2]) self.executors[port_id].disconnect() self.window['-STATUS-'].update(f"端口 {port_id} 已断开") self.update_status_lights() # 新增:自动操作按钮 elif event == '-AUTO-OPERATION-': # 对每个已连接的端口执行自动操作 for port_id, executor in self.executors.items(): if executor.is_connected: threading.Thread( target=self.perform_auto_operation, args=(executor,), daemon=True ).start() self.window['-STATUS-'].update("自动操作已启动") # 日志目录设置 elif event == '-SET-LOG-DIR-': if 'log_dir' in values and values['log_dir']: self.default_data["log_dir"] = values['log_dir'] self.save_config() sg.popup(f"日志目录已设置为: {values['log_dir']}") # 项目管理 elif event == '-ADD-PROJECT-': self.edit_project_window() self.refresh_project_list() elif event.startswith('-EDIT-'): index = int(event.split('-')[2]) self.edit_project_window(index) self.refresh_project_list() elif event.startswith('-DELETE-'): index = int(event.split('-')[2]) if self.project_manager.delete_project(index): sg.popup("项目删除成功") self.refresh_project_list() else: sg.popup("项目删除失败") elif event == '-IMPORT-PROJECTS-': file_path = sg.popup_get_file("选择项目配置文件", file_types=(("JSON Files", "*.json"),)) if file_path and self.project_manager.import_projects(file_path): sg.popup("项目导入成功") self.refresh_project_list() else: sg.popup("项目导入失败") elif event == '-EXPORT-PROJECTS-': file_path = sg.popup_get_file("保存项目配置文件", save_as=True, file_types=(("JSON Files", "*.json"),)) if file_path and self.project_manager.export_projects(file_path): sg.popup("项目导出成功") else: sg.popup("项目导出失败") elif event == '-CREATE-TEMPLATE-': self.create_template_file() sg.popup("模板文件已生成: projects_template.json") # 一键升级(多端口支持) elif event.startswith('-UPGRADE-'): index = int(event.split('-')[2]) if index < len(self.project_manager.projects["projects"]): project = self.project_manager.projects["projects"][index] # 选择端口对话框 port_layout = [ [sg.Text(f"选择执行 {project['name']} 的端口:")], *[[sg.Checkbox(f"端口 {port_id}", key=f'-PORT-{port_id}-', default=True)] for port_id in self.executors], [sg.Button("确定"), sg.Button("取消")] ] port_window = sg.Window("选择端口", port_layout, modal=True) port_event, port_values = port_window.read() port_window.close() if port_event == "确定": selected_ports = [port_id for port_id in self.executors if port_values[f'-PORT-{port_id}-']] if not selected_ports: sg.popup("请至少选择一个端口") continue # 设置日志文件 log_dir = self.default_data["log_dir"] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") project_name_safe = re.sub(r'[\\/*?:"<>|]', "", project["name"]) for port_id in selected_ports: log_file = os.path.join(log_dir, f"{project_name_safe}_port{port_id}_{timestamp}.log") self.executors[port_id].set_log_file(log_file) # 在新线程中执行升级 threading.Thread( target=self.execute_project_upgrade, args=(index, selected_ports), daemon=True ).start() # 执行控制 elif event == '-PAUSE-EXECUTION-': for executor in self.executors.values(): if executor.get_execution_status()["is_running"]: executor.pause_execution() self.window['-STATUS-'].update("执行已暂停") elif event == '-RESUME-EXECUTION-': for executor in self.executors.values(): if executor.get_execution_status()["is_paused"]: executor.resume_execution() self.window['-STATUS-'].update("执行已继续") elif event == '-STOP-EXECUTION-': for executor in self.executors.values(): executor.stop_execution() self.window['-STATUS-'].update("执行已停止") # 在 run 方法的事件处理循环中 elif event == '-GO-SHELL-': self.handle_go_shell() # 其他功能 elif event == 'ROUTER_MPU下发送': if self.save_config() and values["发送信息"]: for port_id, executor in self.executors.items(): if executor.is_connected: executor.send(values["发送信息"]) executor.read_until_prompt() elif event == 'Shell下发送': if self.save_config() and values["发送信息"]: for port_id, executor in self.executors.items(): if executor.is_connected: executor.send(values["发送信息"]) executor.read_until_prompt() elif event == '定时发送': if self.save_config() and values["发送信息"] and values["interval"]: try: interval = float(values["interval"]) if interval <= 0: raise ValueError("间隔必须大于0") loop_msg = values["发送信息"] threading.Thread( target=self.periodic_send, args=(loop_msg, interval), daemon=True ).start() except Exception as e: sg.popup(f"无效的间隔: {str(e)}") else: sg.popup("请输入发送信息和有效间隔") elif event == "停止定时发送": for executor in self.executors.values(): executor.stop_execution() elif event == '接收1s': for executor in self.executors.values(): if executor.is_connected: executor.read_until_prompt(timeout=1) elif event == 'dump寄存器': if values["start_addr"] and values["end_addr"]: for executor in self.executors.values(): if executor.is_connected: executor.send(f"dump {values['start_addr']} {values['end_addr']}") executor.read_until_prompt() elif event == '重新获取sdk.out': for executor in self.executors.values(): if executor.is_connected: executor.send("get_sdk.out") executor.read_until_prompt() elif event == '一键升级MT1': for executor in self.executors.values(): if executor.is_connected: executor.send("upgrade_mt1") executor.read_until_prompt() elif event == '一键升级MT2': for executor in self.executors.values(): if executor.is_connected: executor.send("upgrade_mt2") executor.read_until_prompt() elif event == '使用说明': self.show_help() self.running = False for executor in self.executors.values(): executor.disconnect() self.window.close() # 新增方法:执行自动操作 def perform_auto_operation(self, executor): """执行自动操作:检测引导字符串并执行Ctrl+T和watchdog命令""" # 尝试检测引导字符串 executor.detect_boot_string_and_auto_operation() # 如果未检测到,尝试多次检测 if not executor.boot_string_detected: for _ in range(10): # 最多尝试3次 time.sleep(1) executor.detect_boot_string_and_auto_operation() if executor.boot_string_detected: break # 更新状态 self.window['-STATUS-'].update(f"端口 {executor.port_id} 自动操作{'完成' if executor.auto_completed else '失败'}") def execute_project_upgrade(self, index, port_ids): """在指定端口上执行项目升级""" commands = self.project_manager.get_project_commands(index) if commands: # 重置进度显示 self.window['-CURRENT-COMMAND-'].update("") self.window['-COMMAND-PROGRESS-'].update("0/0") # 为每个端口启动升级线程 threads = [] for port_id in port_ids: executor = self.executors[port_id] thread = threading.Thread( target=self._execute_upgrade_on_port, args=(executor, commands, port_id), daemon=True ) thread.start() threads.append(thread) # 等待所有线程完成 for thread in threads: thread.join(timeout=300) # 5分钟超时 self.window['-STATUS-'].update("升级任务已完成") else: for port_id in port_ids: self.executors[port_id].log_queue.put("错误:项目没有配置命令") def _execute_upgrade_on_port(self, executor, commands, port_id): """在单个端口上执行升级命令""" self.window['-STATUS-'].update(f"端口 {port_id} 开始升级...") success = executor.execute_commands(commands) self.window['-STATUS-'].update(f"端口 {极id} 升级{'成功' if success else '失败'}") def periodic_send(self, message, interval): """周期性发送消息(优化版)""" while not self.executors[1].stop_event.is_set(): for executor in self.executors.values(): if executor.is_connected: executor.send(message) executor.read_until_prompt() time.sleep(interval) def show_help(self): """显示使用说明(多端口版)""" help_text = f""" === 远程单板连接工具使用说明 (多端口) === 1. 多端口连接管理: - 支持同时连接 {MAX_PORTS} 个端口 - 每个端口独立显示连接状态(红:未连接, 绿:已连接) - 每个端口独立保存日志文件 2. 自动操作功能: - 点击"自动"按钮执行特殊引导操作 - 检测"Press 'Ctrl+T' to skip boot"字符串 - 自动发送Ctrl+T组合键 - 5秒后发送"watchdog -close"命令 - 状态指示器: ○未执行 ●已检测 ✓已完成 3. 项目升级: - 选择项目后,会弹出端口选择对话框 - 可以为每个选择的端口设置独立的日志文件 - 日志文件保存在指定目录,文件名格式:项目名_portX_时间戳.log 4. 日志管理: - 默认日志目录:{self.default_data.get('log_dir', os.getcwd())} - 可以随时修改日志保存目录 - 每个端口连接会记录详细的操作日志 5. 命令执行控制: - 暂停执行:暂停所有端口的命令执行 - 继续执行:继续所有端口的命令执行 - 停止执行:停止所有端口的命令执行 6. 发送机制: - 命令发送到所有已连接的端口 - 每个端口独立处理响应 - 支持定时发送到所有端口 """ sg.popup(help_text, title="使用说明", font=("Arial", 11)) def create_template_file(self): """创建项目配置模板""" try: template = { "projects": [ { "name": "项目A", "commands": [ "ifconfig eth0 192.168.0.100", "ftpget -u user -p pass 192.168.0.1 /home/firmware.bin firmware.bin", "flash -w 0 0x0000000 0xa000000 0x2000000" ], "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, { "name": "项目B", "commands": [ "ifconfig eth0 192.168.0.101", "ping 192.168.0.1", "reboot" ], "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } ] } with open("projects_template.json", 'w') as f: json.dump(template, f, indent=4) return True except Exception as e: logger.error(f"创建模板文件失败: {e}") return False # ============== 主程序入口 ============== if __name__ == "__main__": # 启动应用 try: app = RemoteControlApp() app.run() except Exception as e: logger.exception("应用程序崩溃") sg.popup_error(f"应用程序发生致命错误: {str(e)}") 请解析这份代码中的“自动”操作,如何做到抓到字符串,并且模拟键盘敲击Ctrl+T,最后输入watchdog -close的操作的

你理解错了,我使用langgraph dev,就会自动打开https://smith.langchain.com/studio/thread?baseUrl=https%3A%2F%2Fsiteproxy.ruqli.workers.dev%3A443%2Fhttp%2F127.0.0.1%3A2024&mode=graph&assistantId=fe096781-5601-53d2-b2f6-0d3403f7e9ca,这个界面,输入的问题是从state.input,获取的,输出也要正常返回到网页端中,这是代码:"""LangGraph single-node graph template. Returns a predefined response. Replace logic and configuration as needed. """ from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, TypedDict import asyncio from langchain_core.runnables import RunnableConfig from langgraph.graph import StateGraph from openai import OpenAI class Configuration(TypedDict): """Configurable parameters for the agent. Set these when creating assistants OR when invoking the graph. See: https://langchain-ai.github.io/langgraph/cloud/how-tos/configuration_cloud/ """ my_configurable_param: str @dataclass class State: """Input state for the agent. Defines the initial structure of incoming data. See: https://langchain-ai.github.io/langgraph/concepts/low_level/#state """ input: str = "example" # 更直观地命名字段为 input # 自定义 OpenAI 客户端以支持异步调用 class CustomOpenAI(OpenAI): def sync_call_model(self, model_name: str, prompt: str, config: Dict[str, Any]) -> Dict[str, Any]: response = self.chat.completions.create( model=model_name, messages=[{"role": "user", "content": prompt}] ) print(response.choices[0].message.content) return {"output": response.choices[0].message.content} async def call_model(self, model_name: str, prompt: str, config: Dict[str, Any]) -> Dict[str, Any]: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self.sync_call_model, model_name, prompt, config) # 初始化自定义 OpenAI 客户端 client = CustomOpenAI( api_key="sk-826b4412358f422c960eebdfb881e46c", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) model = "qwen-plus" async def call_model(state: State, config: RunnableConfig) -> Dict[str, Any]: """Process input and returns output. Can use runtime configuration to alter behavior. """ configuration = config["configurable"] result = await client.call_model( model, state.input, configuration, ) return result # 返回值已是一个 dict # Define the graph graph = ( StateGraph(State, config_schema=Configuration) .add_node(call_model) .add_edge("__start__", "call_model") .compile(name="New Graph") ),修改后给出完整代码

把import asyncio import websockets import json import struct import datetime import logging import traceback import os import sys from contextlib import contextmanager from typing import Any, Dict, List, Optional import mysql.connector from mysql.connector import pooling from mysql.connector import errorcode # ====================== # 增强的配置参数 # ====================== CONFIG = { "TIME_FORMATS": [ "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y%m%d%H%M%S", "%d-%b-%Y %H:%M:%S" ], "WEBSOCKET_HOST": "0.0.0.0", # 监听所有接口 "WEBSOCKET_PORT": 8765, "MAX_CONNECTIONS": 100, "DB_WRITE_BATCH_SIZE": 50, "DB_WRITE_TIMEOUT": 5.0, "LOG_FILE": "sensor_server.log", "LOG_LEVEL": "DEBUG", # 增加日志级别配置 # MySQL数据库配置 "MYSQL": { "HOST": "localhost", "PORT": 3306, "USER": "root", "PASSWORD": "Adu@123.", "DATABASE": "CF_HIDB", "POOL_SIZE": 5, "POOL_NAME": "sensor_pool", "POOL_RESET_SESSION": True, "CONNECT_TIMEOUT": 5 # 连接超时时间(秒) } } # ====================== # 增强的日志系统 # ====================== class SensitiveDataFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: if not hasattr(record, 'msg') or not record.msg: return True if not isinstance(record.msg, str): try: record.msg = str(record.msg) except Exception: return True # 过滤MySQL密码 password = CONFIG["MYSQL"]["PASSWORD"] if password: record.msg = record.msg.replace(password, "[REDACTED]") return True def setup_logging() -> logging.Logger: sensitive_filter = SensitiveDataFilter() # 配置日志级别 log_level = getattr(logging, CONFIG.get("LOG_LEVEL", "INFO").upper(), logging.INFO) logger = logging.getLogger("SensorServer") logger.setLevel(log_level) # 清除所有现有处理器 for handler in logger.handlers[:]: logger.removeHandler(handler) # 文件处理器 file_handler = logging.FileHandler(CONFIG["LOG_FILE"]) file_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) file_handler.addFilter(sensitive_filter) logger.addHandler(file_handler) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) console_handler.addFilter(sensitive_filter) logger.addHandler(console_handler) return logger # 全局日志器 logger = setup_logging() # ====================== # 增强的MySQL数据库管理 (带超时处理) # ====================== class MySQLDatabaseManager: def __init__(self): self.data_buffer = [] self.last_write_time = datetime.datetime.now() self.connection_pool = None self.initialize_pool() self.initialize_db() def initialize_pool(self): """安全地初始化连接池""" try: logger.debug("尝试创建MySQL连接池...") self.connection_pool = pooling.MySQLConnectionPool( pool_name=CONFIG["MYSQL"]["POOL_NAME"], pool_size=CONFIG["MYSQL"]["POOL_SIZE"], pool_reset_session=CONFIG["MYSQL"]["POOL_RESET_SESSION"], host=CONFIG["MYSQL"]["HOST"], port=CONFIG["MYSQL"]["PORT"], user=CONFIG["MYSQL"]["USER"], password=CONFIG["MYSQL"]["PASSWORD"], database=CONFIG["MYSQL"]["DATABASE"], connect_timeout=CONFIG["MYSQL"]["CONNECT_TIMEOUT"] ) logger.info("MySQL连接池创建成功") except mysql.connector.Error as err: logger.error(f"创建MySQL连接池失败: {err}") # 尝试重新连接 self.reconnect() except Exception as e: logger.critical(f"连接池初始化意外错误: {e}") sys.exit(1) def reconnect(self): """尝试重新连接数据库""" logger.warning("尝试重新连接数据库...") retries = 3 for attempt in range(1, retries + 1): try: self.initialize_pool() logger.info("数据库重新连接成功") return except Exception as e: logger.error(f"重新连接尝试 {attempt}/{retries} 失败: {e}") if attempt < retries: logger.info(f"{5 * attempt}秒后重试...") time.sleep(5 * attempt) logger.critical("无法连接到数据库,服务器退出") sys.exit(1) @contextmanager def _get_connection(self): """安全地获取数据库连接""" if not self.connection_pool: self.reconnect() try: conn = self.connection_pool.get_connection() if not conn.is_connected(): conn.reconnect() yield conn except mysql.connector.Error as err: logger.error(f"获取数据库连接失败: {err}") if err.errno == errorcode.CR_SERVER_GONE_ERROR: self.reconnect() raise finally: if 'conn' in locals() and conn.is_connected(): conn.close() def initialize_db(self): """初始化数据库表结构 (带重试机制)""" max_attempts = 3 for attempt in range(1, max_attempts + 1): try: with self._get_connection() as conn: cursor = conn.cursor() # 创建设备表 cursor.execute(''' CREATE TABLE IF NOT EXISTS devices ( id INT AUTO_INCREMENT PRIMARY KEY, device_id VARCHAR(50) NOT NULL UNIQUE, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ''') # 创建传感器数据表 cursor.execute(''' CREATE TABLE IF NOT EXISTS sensor_data ( id BIGINT AUTO_INCREMENT PRIMARY KEY, device_id INT NOT NULL, timestamp TIMESTAMP NOT NULL, sensor_values JSON NOT NULL, received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_device_id (device_id), INDEX idx_timestamp (timestamp), FOREIGN KEY (device_id) REFERENCES devices(id) ON DELETE CASCADE ) ''') conn.commit() logger.info("数据库表结构初始化成功") return except Exception as e: logger.error(f"数据库初始化失败 (尝试 {attempt}/{max_attempts}): {e}") if attempt < max_attempts: logger.info(f"{2 * attempt}秒后重试...") time.sleep(2 * attempt) logger.critical("数据库初始化失败,服务器退出") sys.exit(1) def insert_sensor_data(self, device_id: str, timestamp: str, sensor_values: List[float]) -> None: """将数据添加到缓冲区,批量写入数据库""" self.data_buffer.append({ 'device_id': device_id, 'timestamp': timestamp, 'sensor_values': sensor_values }) # 检查是否满足批量写入条件 now = datetime.datetime.now() buffer_full = len(self.data_buffer) >= CONFIG["DB_WRITE_BATCH_SIZE"] timeout_reached = (now - self.last_write_time).total_seconds() >= CONFIG["DB_WRITE_TIMEOUT"] if buffer_full or timeout_reached: self.flush_buffer() def flush_buffer(self) -> None: """安全地将缓冲区数据写入数据库""" if not self.data_buffer: return try: with self._get_connection() as conn: cursor = conn.cursor() # 批量写入设备数据 device_ids = [] for data in self.data_buffer: # 获取或创建设备ID device_db_id = self._get_or_create_device(conn, data['device_id']) device_ids.append(device_db_id) # 准备批量插入传感器数据 sensor_data = [] for i, data in enumerate(self.data_buffer): sensor_data.append(( device_ids[i], data['timestamp'], json.dumps(data['sensor_values']) )) # 批量插入传感器数据 cursor.executemany( "INSERT INTO sensor_data (device_id, timestamp, sensor_values) " "VALUES (%s, %s, %s)", sensor_data ) conn.commit() logger.info(f"成功写入 {len(self.data_buffer)} 条数据到MySQL数据库") # 清空缓冲区 self.data_buffer.clear() self.last_write_time = datetime.datetime.now() except Exception as e: logger.error(f"数据库写入失败: {e}") logger.error(f"异常详情:\n{traceback.format_exc()}") # 保留数据以便稍后重试 def _get_or_create_device(self, conn, device_id: str) -> int: """获取或创建设备记录 (带错误处理)""" cursor = conn.cursor() try: cursor.execute( "SELECT id FROM devices WHERE device_id = %s", (device_id,) ) device = cursor.fetchone() if device: device_id = device[0] cursor.execute( "UPDATE devices SET last_seen = CURRENT_TIMESTAMP WHERE id = %s", (device_id,) ) return device_id else: cursor.execute( "INSERT INTO devices (device_id) VALUES (%s)", (device_id,) ) return cursor.lastrowid except mysql.connector.Error as err: logger.error(f"设备操作失败: {err}") # 简单处理:返回一个虚拟ID,实际应用中应根据需要处理 return -1 # ====================== # 数据包解析 # ====================== def parse_timestamp(timestamp_bytes: bytes) -> str: """解析时间戳字节数据为字符串""" try: timestamp_str = timestamp_bytes.decode('ascii').strip() except UnicodeDecodeError: try: timestamp_str = timestamp_bytes.decode('latin-1').strip() except UnicodeDecodeError: timestamp_str = timestamp_bytes.hex() logger.warning(f"时间戳包含无效字符,使用十六进制: {timestamp_str}") return timestamp_str cleaned = ''.join(c for c in timestamp_str if c.isdigit() or c in '- :/') for fmt in CONFIG["TIME_FORMATS"]: try: dt = datetime.datetime.strptime(cleaned, fmt) return dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: continue logger.warning(f"无法解析时间戳: {timestamp_str} (清理后: {cleaned})") return cleaned def parse_sensor_values(data: bytes, start_index: int) -> List[float]: """解析传感器值列表""" values = [] index = start_index max_count = (len(data) - index) // 4 for _ in range(max_count): if index + 4 > len(data): break value_bytes = data[index:index + 4] index += 4 try: value = struct.unpack('<f', value_bytes)[0] values.append(value) except struct.error: try: value = struct.unpack('>f', value_bytes)[0] values.append(value) except struct.error: continue return values def parse_binary_packet(raw_data: bytes) -> Optional[Dict]: """解析二进制数据包""" try: HEADER = b'$\x00\x08\x00\x00\x00' if not raw_data.startswith(HEADER): return None DEVICE_ID_START = len(HEADER) DEVICE_ID_END = DEVICE_ID_START + 8 if len(raw_data) < DEVICE_ID_END: return None device_id_bytes = raw_data[DEVICE_ID_START:DEVICE_ID_END] TIMESTAMP_START = DEVICE_ID_END TIMESTAMP_END = TIMESTAMP_START + 19 if len(raw_data) < TIMESTAMP_END: return None timestamp_bytes = raw_data[TIMESTAMP_START:TIMESTAMP_END] SENSOR_DATA_START = TIMESTAMP_END sensor_values = parse_sensor_values(raw_data, SENSOR_DATA_START) return { 'device_id': decode_with_fallback(device_id_bytes), 'timestamp': parse_timestamp(timestamp_bytes), 'sensor_values': sensor_values } except Exception as e: logger.error(f"解析错误: {str(e)}") return None def decode_with_fallback(data: bytes) -> str: """带回退机制的字节解码""" try: return data.decode('ascii').strip() except UnicodeDecodeError: try: return data.decode('latin-1').strip() except UnicodeDecodeError: return data.hex() # ====================== # 增强的WebSocket服务器 (带连接监控) # ====================== class SensorWebSocketServer: def __init__(self, host: str, port: int, db_manager: MySQLDatabaseManager): self.host = host self.port = port self.db_manager = db_manager self.connections = set() self.server = None self.stats = { 'total_connections': 0, 'total_messages': 0, 'valid_messages': 0, 'invalid_messages': 0 } self.start_time = None self.connection_monitor_task = None async def handler(self, websocket, path: str) -> None: """处理WebSocket连接""" client_ip = websocket.remote_address[0] if websocket.remote_address else "unknown" self.connections.add(websocket) self.stats['total_connections'] += 1 logger.info(f"客户端连接: {client_ip},当前连接数: {len(self.connections)}") try: async for message in websocket: self.stats['total_messages'] += 1 if not isinstance(message, bytes): self.stats['invalid_messages'] += 1 continue parsed_data = parse_binary_packet(message) if parsed_data: self.stats['valid_messages'] += 1 self.db_manager.insert_sensor_data( parsed_data['device_id'], parsed_data['timestamp'], parsed_data['sensor_values'] ) else: self.stats['invalid_messages'] += 1 except websockets.exceptions.ConnectionClosed: logger.info(f"客户端断开: {client_ip}") except Exception as e: logger.error(f"处理客户端 {client_ip} 时出错: {str(e)}") finally: if websocket in self.connections: self.connections.remove(websocket) logger.info(f"客户端断开,剩余连接数: {len(self.connections)}") async def start(self) -> None: """启动WebSocket服务器""" self.server = await websockets.serve( self.handler, self.host, self.port, max_size=2 ** 20, # 1MB ping_interval=30, # 更频繁的ping ping_timeout=15, close_timeout=10, max_queue=CONFIG["MAX_CONNECTIONS"] ) self.start_time = datetime.datetime.now() logger.info(f"WebSocket服务器启动在 ws://{self.host}:{self.port}") # 启动连接监控任务 self.connection_monitor_task = asyncio.create_task(self.monitor_connections()) async def monitor_connections(self): """监控连接状态""" while True: await asyncio.sleep(60) logger.debug(f"连接状态: {len(self.connections)} 个活跃连接") async def stop(self) -> None: """停止WebSocket服务器""" if self.connection_monitor_task: self.connection_monitor_task.cancel() if self.server: self.server.close() await self.server.wait_closed() logger.info("WebSocket服务器已停止") def get_stats(self) -> Dict: """获取服务器统计信息""" return { **self.stats, 'current_connections': len(self.connections), 'uptime': str(datetime.datetime.now() - self.start_time) if self.start_time else "0:00:00" } # ====================== # 增强的主程序 (带健康检查) # ====================== async def main(): # 初始化MySQL数据库管理器 logger.debug("初始化数据库管理器...") db_manager = MySQLDatabaseManager() # 创建WebSocket服务器 logger.debug("创建WebSocket服务器...") server = SensorWebSocketServer( CONFIG["WEBSOCKET_HOST"], CONFIG["WEBSOCKET_PORT"], db_manager ) # 启动服务器 logger.debug("启动WebSocket服务器...") await server.start() try: # 初始健康检查 logger.debug("执行初始健康检查...") await health_check(server, db_manager) # 运行主循环 logger.info("服务器进入主循环") while True: await asyncio.sleep(5) db_manager.flush_buffer() # 每分钟健康检查 now = datetime.datetime.now() if now.second < 5: # 每分钟前5秒执行 await health_check(server, db_manager) except asyncio.CancelledError: logger.info("服务器停止请求") except KeyboardInterrupt: logger.info("收到停止信号,正在关闭服务器...") finally: # 确保所有缓冲数据都写入数据库 db_manager.flush_buffer() await server.stop() async def health_check(server, db_manager): """服务器健康检查""" stats = server.get_stats() # 数据库连接检查 db_status = "OK" try: with db_manager._get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") if cursor.fetchone()[0] != 1: db_status = "ERROR" except Exception: db_status = "ERROR" logger.info( f"健康检查 | " f"连接数: {stats['current_connections']} | " f"消息: {stats['total_messages']} | " f"数据库: {db_status} | " f"运行时间: {stats['uptime']}" ) # 启动程序 if __name__ == "__main__": # 记录启动配置 logger.info("=" * 50) logger.info("启动传感器数据采集服务器 (增强版)") logger.info(f"时间: {datetime.datetime.now()}") logger.info(f"监听地址: {CONFIG['WEBSOCKET_HOST']}:{CONFIG['WEBSOCKET_PORT']}") logger.info(f"MySQL主机: {CONFIG['MYSQL']['HOST']}:{CONFIG['MYSQL']['PORT']}") logger.info(f"数据库: {CONFIG['MYSQL']['DATABASE']}") logger.info("=" * 50) try: # 设置事件循环策略 (解决Windows上的问题) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main()) except Exception as e: logger.critical(f"服务器意外停止: {e}") logger.critical(f"异常详情:\n{traceback.format_exc()}") finally: logger.info("=" * 50) logger.info("服务器已停止运行") logger.info("=" * 50) 部署到docker中

# mcp_server.py from datetime import datetime from mcp.server.fastmcp import FastMCP import logging import os import asyncio import hashlib import json import threading import time import numpy as np import faiss from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.vectorstores import FAISS from langchain_community.llms import OpenAIChat from langchain.chains import RetrievalQA from ollama_embeding import CustomEmbeding from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, Docx2txtLoader, UnstructuredPowerPointLoader, UnstructuredExcelLoader, CSVLoader, UnstructuredHTMLLoader, UnstructuredMarkdownLoader, UnstructuredEmailLoader, UnstructuredFileLoader ) # 配置日志记录器 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # 创建 FastMCP 实例 mcp = FastMCP("VectorService") class VectorService: def __init__(self): self.embedding_function = CustomEmbeding('shaw/dmeta-embedding-zh') self.docstore = InMemoryDocstore() self.index = faiss.IndexFlatL2(768) self.vector_store = None self.existing_index_path = "E:/llm_rag/faiss_index/index.faiss" self.existing_index_pkl_path = "E:/llm_rag/faiss_index/index.pkl" self.is_processing = False self.last_processed_count = 0 self.is_initialized = False # 添加初始化完成标志 self.load_or_init_vector_store() # 初始化向量存储 self.is_initialized = True # 初始化完成 def load_or_init_vector_store(self): if self.vector_store is not None: return self.vector_store # 已初始化 if os.path.exists(self.existing_index_path) and os.path.exists(self.existing_index_pkl_path): vector_store = FAISS.load_local( "E:/llm_rag/faiss_index", embeddings=self.embedding_function, allow_dangerous_deserialization=True ) logger.info("Loaded existing vector store.") self.vector_store = vector_store return vector_store else: vector_store = FAISS( embedding_function=self.embedding_function, index=self.index, docstore=self.docstore, index_to_docstore_id={} ) logger.info("Initialized new vector store.") self.vector_store = vector_store return vector_store def get_id(self, file_path): """Generate file id""" return hashlib.md5(file_path.encode()).hexdigest() def load_document(self, file_path: str): file_ext = file_path.split('.')[-1].lower() logger.info(f"Loading document from {file_path}") loader_map = { 'txt': TextLoader, 'pdf': PyPDFLoader, 'docx': Docx2txtLoader, 'pptx': UnstructuredPowerPointLoader, 'xlsx': UnstructuredExcelLoader, 'csv': CSVLoader, 'html': UnstructuredHTMLLoader, 'htm': UnstructuredHTMLLoader, 'md': UnstructuredMarkdownLoader, 'eml': UnstructuredEmailLoader, 'msg': UnstructuredEmailLoader } if file_ext not in loader_map: logger.warning(f"Unsupported file type: {file_ext}") return None loader_class = loader_map.get(file_ext, UnstructuredFileLoader) loader = loader_class(file_path) try: documents = loader.load() logger.info(f"Loaded {len(documents)} documents from {file_path}") return documents except Exception as e: logger.error(f"Error loading {file_path}: {str(e)}") return None def _add_vector_metadata(self, file_name, file_name_path): """ 添加文件元数据 :return: """ docs = [] metadatas = [] try: file_stats = os.stat(file_name_path) file_size = file_stats.st_size res = self.load_document(file_name_path) if res: # 生成文件唯一标识(使用文件路径的哈希值) id = self.get_id(file_name_path) for doc in res: # 合并用户提供的元数据和文档自身的元数据 doc_metadata = doc.metadata.copy() doc_metadata.update({ "source": file_name, "file_path": file_name_path, "id": id, "upload_time": datetime.now().isoformat() }) # docs.append(doc.page_content.strip()) # 将文件名融入内容(提高文件名的权重) enhanced_content = f"文件名: {file_name}\n内容: {doc.page_content.strip()}" docs.append(enhanced_content) metadatas.append(doc_metadata) logger.info(f"Processed {file_name} ({file_size / (1024 * 1024.0):.2f} MB)") except Exception as e: logger.error(f"Error processing {file_name_path}: {str(e)}") return docs, metadatas def process_documents(self, data_path: str): """把所有文件进行批量向量化,添加文件唯一标识""" try: self.is_processing = True all_docs = [] all_metadatas = [] for root, dirs, files in os.walk(data_path): for file_name in files: file_name_path = os.path.join(root, file_name) logger.info(f"Processing file: {file_name_path}") # 调用 _add_vector_metadata 处理文件 docs, metadatas = self._add_vector_metadata( file_name=file_name, file_name_path=file_name_path ) # 累积结果 all_docs.extend(docs) all_metadatas.extend(metadatas) # 保存所有文件的向量数据 self._save_data_vector(docs=all_docs, metadatas=all_metadatas) self.last_processed_count = len(all_docs) self.is_processing = False return { "status": "success", "message": "Documents processed successfully", "document_count": len(all_docs) } except Exception as e: logger.error(f"Error processing documents: {str(e)}") self.is_processing = False return {"status": "error", "message": str(e)} def _save_data_vector(self, docs, metadatas): """Save the data vector to faiss""" self.vector_store = self.load_or_init_vector_store() docs = [doc for doc in docs if doc] try: logger.info("Starting embedding process...") self.vector_store.add_texts(texts=docs, metadatas=metadatas) logger.info("Embedding process completed.") except Exception as e: logger.error(f"An error occurred during embedding: {str(e)}") try: logger.info("Saving updated vector store...") self.vector_store.save_local("E:/llm_rag/faiss_index") logger.info("Updated vector store saved to E:/llm_rag/faiss_index.") except Exception as e: logger.error(f"An error occurred during saving: {str(e)}") return docs def check_process_status(self): """检查处理状态""" if self.is_processing: return { "status": "processing", "message": "Documents are being processed" } else: if os.path.exists(self.existing_index_path) and os.path.exists(self.existing_index_pkl_path): if self.last_processed_count > 0: return { "status": "success", "message": "Vector data has been updated", "last_processed_count": self.last_processed_count } else: return { "status": "ready", "message": "Vector store exists but no new data processed" } else: return { "status": "empty", "message": "No vector store exists" } def add_vector(self, new_file_name_path: str, new_file_name: str): """添加单个文件的向量""" try: self.is_processing = True docs, metadatas = self._add_vector_metadata( file_name=new_file_name, file_name_path=new_file_name_path ) self._save_data_vector(docs=docs, metadatas=metadatas) self.last_processed_count = len(docs) self.is_processing = False return { "status": "success", "message": "Vector added successfully" } except Exception as e: logger.error(f"Error adding vector: {str(e)}") self.is_processing = False return { "status": "error", "message": str(e) } vector_service = VectorService() @mcp.tool() def process_documents(data_path: str): """处理指定路径下的所有文档并生成向量存储""" logger.info(f"Starting to process documents in {data_path}") return vector_service.process_documents(data_path) @mcp.tool() def check_process_status(): """检查处理状态""" logger.info("Checking process status") return vector_service.check_process_status() @mcp.tool() def add_vector(new_file_name_path: str, new_file_name: str): """添加单个文件的向量""" logger.info(f"Adding vector for file: {new_file_name_path}") return vector_service.add_vector(new_file_name_path, new_file_name) @mcp.tool(name="searchfile", description=f"根据关键词搜索文件并返回匹配的内容") def search_answer(query: str): """ 获取检索相关的文件 :param query: 用户问题 :return: 返回检索到的文档 """ if not vector_service.is_initialized: logger.info("Server is not initialized yet. Please wait.") return {"status": "error", "message": "Server is not initialized yet. Please wait."} logger.info(f"Searching for relevant documents: {query}") try: retriever = FAISS.load_local( "E:/llm_rag/faiss_index", CustomEmbeding('shaw/dmeta-embedding-zh'), allow_dangerous_deserialization=True ).as_retriever(search_kwargs={"k": 10}) docs = retriever.get_relevant_documents(query) logger.info(f"找到 {len(docs)} 个相关文档块") logger.info(f"docs:{docs}") # return docs results = [] for doc in docs: metadata = doc.metadata file_path = metadata.get("file_path", "") # 安全检查:确保文件在允许的目录内 allowed_dir = "E:\\llm_rag\\data\\" if file_path and file_path.startswith(allowed_dir): # 生成相对路径并构建下载URL download_url = os.path.relpath(file_path, allowed_dir) results.append({ "content": doc.page_content, # 文档内容 "download_url": download_url # 下载链接 }) return results except Exception as e: logger.error(f"搜索出错: {str(e)}") return {"status": "error", "message": str(e)} if __name__ == "__main__": mcp.settings.port = 8880 logger.info("Starting mcp server through MCP") mcp.run(transport="sse") # 使用标准输入输出通信 报了这个 + Exception Group Traceback (most recent call last): | File "E:\llm_rag\.venv\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 403, in run_asgi | result = await app( # type: ignore[func-returns-value] | File "E:\llm_rag\.venv\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 60, in __call__ | return await self.app(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\applications.py", line 112, in __call__ | await self.middleware_stack(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\middleware\errors.py", line 187, in __call__ | raise exc | File "E:\llm_rag\.venv\lib\site-packages\starlette\middleware\errors.py", line 165, in __call__ | await self.app(scope, receive, _send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\middleware\exceptions.py", line 62, in __call__ | await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 53, in wrapped_app | raise exc | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 42, in wrapped_app | await app(scope, receive, sender) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 714, in __call__ | await self.middleware_stack(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 734, in app | await route.handle(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 288, in handle | await self.app(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 76, in app | await wrap_app_handling_exceptions(app, request)(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 53, in wrapped_app | raise exc | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 42, in wrapped_app | await app(scope, receive, sender) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 73, in app | response = await f(request) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\fastmcp\server.py", line 747, in sse_endpoint | return await handle_sse(request.scope, request.receive, request._send) # type: ignore[reportPrivateUsage] | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\fastmcp\server.py", line 680, in handle_sse | async with sse.connect_sse( | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 217, in __aexit__ | await self.gen.athrow(typ, value, traceback) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\sse.py", line 146, in connect_sse | async with anyio.create_task_group() as tg: | File "E:\llm_rag\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 772, in __aexit__ | raise BaseExceptionGroup( | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Exception Group Traceback (most recent call last): | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\sse.py", line 165, in connect_sse | yield (read_stream, write_stream) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\fastmcp\server.py", line 685, in handle_sse | await self._mcp_server.run( | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\lowlevel\server.py", line 500, in run | async with AsyncExitStack() as stack: | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 714, in __aexit__ | raise exc_details[1] | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 217, in __aexit__ | await self.gen.athrow(typ, value, traceback) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\lowlevel\server.py", line 125, in lifespan | yield {} | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 697, in __aexit__ | cb_suppress = await cb(*exc_details) | File "E:\llm_rag\.venv\lib\site-packages\mcp\shared\session.py", line 223, in __aexit__ | return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) | File "E:\llm_rag\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 772, in __aexit__ | raise BaseExceptionGroup( | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Traceback (most recent call last): | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\session.py", line 147, in _receive_loop | await super()._receive_loop() | File "E:\llm_rag\.venv\lib\site-packages\mcp\shared\session.py", line 374, in _receive_loop | await self._received_request(responder) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\session.py", line 175, in _received_request | raise RuntimeError( | RuntimeError: Received request before initialization was complete 如何解决

package com.example.demoapplication; import android.Manifest; // 录音权限相关 import android.content.pm.PackageManager; // 权限检查相关 import android.media.AudioFormat; // 音频格式定义 import android.media.AudioRecord; // 音频录制功能 import android.media.MediaRecorder; // 媒体录制配置 import android.os.Bundle; // Activity生命周期数据 import android.os.Handler; // 线程通信机制 import android.os.Looper; // 主线程消息循环 import android.os.Message; // 消息传递对象 import android.widget.Button; // UI按钮控件 import android.widget.Toast; // 短时提示信息 import androidx.annotation.NonNull; // 非空注解 import androidx.appcompat.app.AppCompatActivity; // 兼容Activity基类 import androidx.core.app.ActivityCompat; // 动态权限请求 import androidx.core.content.ContextCompat; // 权限状态查询 import java.io.BufferedReader; // 文本流读取 import java.io.BufferedWriter; // 文本流写入 import java.io.IOException; // IO异常处理 import java.io.InputStreamReader; // 字节转字符流 import java.io.OutputStreamWriter; // 字节转字符流 import java.net.ServerSocket; // 服务端监听套接字 import java.net.Socket; // 客户端连接套接字 import java.util.concurrent.ExecutorService; // 线程池管理 import java.util.concurrent.Executors; // 线程池工厂 import java.util.concurrent.ScheduledExecutorService; // 定时任务调度 import java.util.concurrent.TimeUnit; // 时间单位定义 import android.util.Base64; // Base64编码 import org.json.JSONException; import org.json.JSONObject; // JSON对象 public class MainActivity extends AppCompatActivity { private Button startRecordButton; // 开始录音按钮 private Button stopRecordButton; // 停止录音按钮 private Button uploadButton; // 上传文件按钮 private AudioRecord audioRecord; // 音频录制对象 private static final int SAMPLE_RATE = 44100; // 采样率:44.1kHz private static final int BUFFER_SIZE; // 缓冲区大小 static { // 使用更安全的缓冲区大小计算方式 int minBufferSize = AudioRecord.getMinBufferSize(SAMPLE_RATE, AudioFormat.CHANNEL_IN_MONO, AudioFormat.ENCODING_PCM_16BIT); // 确保缓冲区大小是帧大小的整数倍(2通道*2字节) BUFFER_SIZE = ((minBufferSize / (2 * 2)) + 1) * (2 * 2); } private ScheduledExecutorService scheduler; // 录音定时器 private boolean isRecording = false; // 录音状态标志 private static final int PERMISSION_REQUEST_CODE = 1; // 权限请求码 private final ExecutorService executorService = Executors.newCachedThreadPool(); // 通用线程池 private ServerSocket serverSocket; // TCP服务端Socket private volatile boolean isServerRunning = true; // 服务运行状态 private Socket clientSocket; // 当前客户端连接 // 主线程消息处理器 private Handler handler = new Handler(Looper.getMainLooper()) { @Override public void handleMessage(@NonNull Message msg) { switch (msg.what) { case 0x11: // 客户端连接成功 Toast.makeText(MainActivity.this, "客户端已连接", Toast.LENGTH_SHORT).show(); break; case 0x12: // 数据接收完成 Toast.makeText(MainActivity.this, "开始录音", Toast.LENGTH_SHORT).show(); break; case 0x13: // 上传错误 Toast.makeText(MainActivity.this, "录音数据已发送", Toast.LENGTH_SHORT).show(); break; case 0x14: // 自定义消息类型,用于停止录音提示 Toast.makeText(MainActivity.this, "停止录音", Toast.LENGTH_SHORT).show(); break; case 0x15: // 接收到控制指令 Toast.makeText(MainActivity.this, "收到控制指令:" + msg.obj.toString(), Toast.LENGTH_SHORT).show(); break; } } }; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); initViews(); // 初始化视图组件 checkPermissions(); // 检查权限状态 setupClickListeners(); // 设置点击事件监听 startServer(30000); // 启动TCP服务器,监听30000端口 } /** * 视图初始化方法 * 绑定布局中的UI组件并设置初始状态 */ private void initViews() { startRecordButton = findViewById(R.id.startRecordButton); stopRecordButton = findViewById(R.id.stopRecordButton); uploadButton = findViewById(R.id.uploadButton); stopRecordButton.setEnabled(false); // 初始禁用停止按钮 uploadButton.setEnabled(false); // 初始禁用上传按钮 } /** * 权限检查方法 * 如果未授予录音权限则发起请求 */ private void checkPermissions() { if (ContextCompat.checkSelfPermission(this, Manifest.permission.RECORD_AUDIO) != PackageManager.PERMISSION_GRANTED) { ActivityCompat.requestPermissions(this, new String[]{Manifest.permission.RECORD_AUDIO}, PERMISSION_REQUEST_CODE); } } /** * 按钮点击事件绑定 * 设置各按钮的响应逻辑 */ private void setupClickListeners() { startRecordButton.setOnClickListener(v -> startRecording()); // 开始录音 stopRecordButton.setOnClickListener(v -> stopRecording()); // 停止录音 uploadButton.setOnClickListener(v -> uploadRecording()); // 上传录音 } /** * 开始录音方法 * 初始化AudioRecord并启动录制 */ private void startRecording() { // 添加状态检查和异常处理 if (isRecording || audioRecord != null) { return; // 防止重复启动 } try { audioRecord = new AudioRecord(MediaRecorder.AudioSource.MIC, SAMPLE_RATE, AudioFormat.CHANNEL_IN_MONO, AudioFormat.ENCODING_PCM_16BIT, BUFFER_SIZE); if (audioRecord.getState() != AudioRecord.STATE_INITIALIZED) { throw new IllegalStateException("AudioRecord初始化失败"); } audioRecord.startRecording(); isRecording = true; startRecordButton.setEnabled(false); stopRecordButton.setEnabled(true); uploadButton.setEnabled(false); // 创建定时任务发送音频数据 scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(this::uploadAudioData, 0, 160, TimeUnit.MILLISECONDS); handler.sendEmptyMessage(0x12); // 发送开始录音的消息 // 添加发送 { "type": "startRecorder", "data": null } 的逻辑 if (clientSocket != null && !clientSocket.isClosed()) { try { JSONObject startPacket = new JSONObject(); startPacket.put("type", "startRecorder"); startPacket.put("data", JSONObject.NULL); // 设置 data 为 null BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(clientSocket.getOutputStream(), "UTF-8")); writer.write(startPacket.toString()); writer.write("\n\n"); // 双换行作为结束标识 writer.flush(); } catch (IOException | JSONException e) { e.printStackTrace(); Message msg = handler.obtainMessage(0x13, e.getMessage()); handler.sendMessage(msg); } } else { Toast.makeText(this, "客户端未连接", Toast.LENGTH_SHORT).show(); } } catch (Exception e) { e.printStackTrace(); Toast.makeText(this, "录音启动失败: " + e.getMessage(), Toast.LENGTH_LONG).show(); releaseAudioResources(); } } /** * 停止录音方法 * 释放录音资源并清理状态 */ private void stopRecording() { isRecording = false; releaseAudioResources(); stopRecordButton.setEnabled(false); uploadButton.setEnabled(true); handler.sendEmptyMessage(0x14); // 发送停止录音的消息 // 添加发送 { "type": "stopRecor", "data": null } 的逻辑 if (clientSocket != null && !clientSocket.isClosed()) { try { JSONObject stopPacket = new JSONObject(); stopPacket.put("type", "stopRecor"); stopPacket.put("data", JSONObject.NULL); // 设置 data 为 null BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(clientSocket.getOutputStream(), "UTF-8")); writer.write(stopPacket.toString()); writer.write("\n\n"); // 双换行作为结束标识 writer.flush(); } catch (IOException | JSONException e) { e.printStackTrace(); Message msg = handler.obtainMessage(0x13, e.getMessage()); handler.sendMessage(msg); } } } /** * 释放音频资源 */ private void releaseAudioResources() { if (audioRecord != null) { try { audioRecord.stop(); } catch (IllegalStateException e) { // 忽略可能的非法状态异常 } try { audioRecord.release(); } finally { audioRecord = null; } } if (scheduler != null) { try { scheduler.shutdownNow(); } finally { scheduler = null; } } } /** * 实时上传音频数据 * 将当前缓冲区数据通过Socket发送给客户端,并添加Base64编码的JSON格式数据 */ private void uploadAudioData() { if (!isRecording || clientSocket == null || clientSocket.isClosed()) return; byte[] buffer = new byte[BUFFER_SIZE]; try { int bytesRead = audioRecord.read(buffer, 0, BUFFER_SIZE); if (bytesRead > 0) { // 使用固定大小缓冲区确保完整性 byte[] validData = new byte[bytesRead]; System.arraycopy(buffer, 0, validData, 0, bytesRead); // 将音频数据转换为Base64编码字符串 String base64Data = Base64.encodeToString(validData, Base64.DEFAULT); // 构建JSON格式字符串 JSONObject json = new JSONObject(); json.put("type", "recording"); json.put("data", base64Data); // 获取输出流并发送数据 BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(clientSocket.getOutputStream(), "UTF-8")); writer.write(json.toString()); writer.write("\n\n"); // 添加双换行作为结束标识 writer.flush(); handler.sendEmptyMessage(0x13); // 发送录音数据的消息 } } catch (IOException | JSONException e) { Message msg = handler.obtainMessage(0x13, e.getMessage()); handler.sendMessage(msg); } catch (Exception e) { e.printStackTrace(); Message msg = handler.obtainMessage(0x13, "音频读取异常: " + e.getMessage()); handler.sendMessage(msg); } } /** * 上传完整录音文件(当前未使用) * 提示该模式下为实时传输无需手动上传 */ private void uploadRecording() { // 可选:上传录音完整文件逻辑,如有需要可添加实现 Toast.makeText(this, "该模式下无需上传文件,已实时发送", Toast.LENGTH_SHORT).show(); } /** * 启动TCP服务器 * 在指定端口监听客户端连接 * * @param port 监听端口号 */ private void startServer(int port) { executorService.execute(() -> { try { serverSocket = new ServerSocket(port); while (isServerRunning) { Socket socket = serverSocket.accept(); clientSocket = socket; handler.sendEmptyMessage(0x11); // 发送客户端连接成功的消息 // 启动双向通信处理 executorService.execute(() -> startCommunication(socket)); } } catch (IOException e) { e.printStackTrace(); runOnUiThread(() -> Toast.makeText(MainActivity.this, "服务器启动失败: " + e.getMessage(), Toast.LENGTH_LONG).show()); } }); } /** * 启动双向通信 * 处理客户端的连接和数据交互 * * @param socket 客户端Socket连接 */ private void startCommunication(Socket socket) { try { BufferedReader reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), "UTF-8")); StringBuilder packetBuilder = new StringBuilder(); int c; while ((c = reader.read()) != -1) { char ch = (char) c; packetBuilder.append(ch); // 检测到连续两个换行符,表示一个完整的数据包结束 if (packetBuilder.length() >= 2 && packetBuilder.charAt(packetBuilder.length() - 2) == '\n' && packetBuilder.charAt(packetBuilder.length() - 1) == '\n') { String packet = packetBuilder.toString().trim(); // 去除首尾空白字符 packetBuilder.setLength(0); // 清空构建器 if (!packet.isEmpty()) { try { JSONObject jsonObject = new JSONObject(packet); handleReceivedPacket(jsonObject); // 处理接收到的数据包 } catch (JSONException e) { e.printStackTrace(); } } } } } catch (IOException e) { e.printStackTrace(); runOnUiThread(() -> Toast.makeText(MainActivity.this, "通信中断: " + e.getMessage(), Toast.LENGTH_SHORT).show()); } finally { try { socket.close(); } catch (IOException ignored) {} // 重置客户端socket if (socket == clientSocket) { clientSocket = null; } } } /** * 处理接收到的数据包 * 根据数据包类型执行相应操作 * * @param jsonObject 接收到的JSON数据包 */ private void handleReceivedPacket(JSONObject jsonObject) { try { String type = jsonObject.getString("type"); Object data = jsonObject.opt("data"); // 发送消息到主线程进行Toast显示 Message msg = handler.obtainMessage(0x15, type + ": " + data.toString()); handler.sendMessage(msg); // 根据不同类型执行不同操作 switch (type) { case "start_recording": if (!isRecording) { runOnUiThread(this::startRecording); } break; case "stop_recording": if (isRecording) { runOnUiThread(this::stopRecording); } break; case "ping": sendResponse("pong"); // 发送pong响应 break; // 可以添加更多类型的处理 } } catch (JSONException e) { e.printStackTrace(); } } /** * 发送响应数据包 * 使用统一的JSON格式并通过双换行结尾 * * @param responseType 响应类型 */ private void sendResponse(String responseType) { if (clientSocket == null || clientSocket.isClosed()) return; try { JSONObject response = new JSONObject(); response.put("type", responseType); response.put("data", ""); BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(clientSocket.getOutputStream(), "UTF-8")); writer.write(response.toString()); writer.write("\n\n"); writer.flush(); } catch (JSONException | IOException e) { e.printStackTrace(); } } /** * 发送特定类型的数据包 * 用于发送控制指令或其他消息 * * @param type 数据包类型 * @param data 数据内容 */ private void sendDataPacket(String type, Object data) { if (clientSocket == null || clientSocket.isClosed()) return; try { JSONObject packet = new JSONObject(); packet.put("type", type); packet.put("data", data); BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(clientSocket.getOutputStream(), "UTF-8")); writer.write(packet.toString()); writer.write("\n\n"); writer.flush(); } catch (JSONException | IOException e) { e.printStackTrace(); } } @Override protected void onDestroy() { super.onDestroy(); isServerRunning = false; try { if (serverSocket != null) serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } executorService.shutdownNow(); if (audioRecord != null) { audioRecord.release(); audioRecord = null; } if (scheduler != null) { scheduler.shutdownNow(); } try { if (clientSocket != null && !clientSocket.isClosed()) clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void onRequestPermissionsResult(int requestCode, @NonNull String[] permissions, @NonNull int[] grantResults) { if (requestCode == PERMISSION_REQUEST_CODE) { if (grantResults.length > 0 && grantResults[0] == PackageManager.PERMISSION_GRANTED) { Toast.makeText(this, "权限已授予", Toast.LENGTH_SHORT).show(); } else { Toast.makeText(this, "录音权限被拒绝", Toast.LENGTH_SHORT).show(); } } super.onRequestPermissionsResult(requestCode, permissions, grantResults); } } import socket import subprocess import threading import time import json import base64 import numpy as np import queue import pyaudio # 配置参数 CHUNK = 1024 * 2 # 缓冲区大小 FORMAT = pyaudio.paInt16 # 16位PCM格式 CHANNELS = 1 # 单声道 RATE = 44100 # 采样率 SERVER_PORT = 30000 # TCP监听端口 class ADBExecutor: """ADB命令执行器""" ADB_PATH = "adb" HOST_PORT = 35000 ANDROID_PORT = 30000 def __init__(self, adb_path="adb"): self.adb_path = adb_path def get_connected_device(self): """获取当前连接的第一个设备""" try: result = subprocess.check_output([self.adb_path, "devices"], text=True) lines = result.strip().split("\n")[1:] devices = [line.split("\t")[0] for line in lines if "device" in line] return devices[0] if devices else None except Exception as e: print(f"ADB命令执行错误: {e}") return None def setup_port_forwarding(self, device_id): """设置端口转发""" try: subprocess.run( [self.adb_path, "-s", device_id, "forward", f"tcp:{self.HOST_PORT}", f"tcp:{self.ANDROID_PORT}"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) print(f"端口转发已设置: {self.HOST_PORT} <-> {self.ANDROID_PORT}") return True except Exception as e: print(f"端口转发失败: {e}") return False class AdbDeviceListener: """ADB设备监听器""" def __init__(self, adb_executor, interval=2, on_device_callback=None): self.adb_executor = adb_executor self.interval = interval self.on_device_callback = on_device_callback self.running = False self.current_device = None def start(self): """启动设备监听线程""" self.running = True threading.Thread(target=self._listen_loop, daemon=True).start() print("设备监听已启动") def stop(self): """停止设备监听""" self.running = False print("设备监听已停止") def _listen_loop(self): """设备监听主循环""" while self.running: try: device = self.adb_executor.get_connected_device() # 检测设备连接变化 if device and device != self.current_device: if self.current_device: self._handle_event("disconnected", self.current_device) self.current_device = device self._handle_event("connected", device) elif not device and self.current_device: self._handle_event("disconnected", self.current_device) self.current_device = None time.sleep(self.interval) except Exception as e: print(f"设备监听错误: {e}") time.sleep(5) def _handle_event(self, event_type, device_id): """处理设备事件""" if self.on_device_callback: self.on_device_callback(event_type, device_id) print(f"[设备事件] {event_type}: {device_id}") class AudioReceiver: """音频接收器,处理JSON格式的音频数据包""" def __init__(self, host_port): self.host_port = host_port self.socket = None self.running = False self.buffer = bytearray() # 接收缓冲区 self.audio_callback = None self.command_callback = None def connect(self): """连接到设备""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect(("localhost", self.host_port)) print(f"已连接到端口 {self.host_port}") return True except Exception as e: print(f"连接失败: {e}") self.socket = None return False def disconnect(self): """断开连接""" if self.socket: try: self.socket.close() except: pass self.socket = None def start_receiving(self, audio_callback=None, command_callback=None): """开始接收数据""" self.running = True self.audio_callback = audio_callback self.command_callback = command_callback threading.Thread(target=self._receive_loop, daemon=True).start() print("音频接收已启动") def stop_receiving(self): """停止接收数据""" self.running = False self.disconnect() print("音频接收已停止") def _receive_loop(self): """接收主循环""" while self.running: if not self.socket or not self._is_connected(): if not self.connect(): time.sleep(2) continue try: data = self.socket.recv(4096) if not data: print("连接断开,尝试重连...") self.disconnect() continue self.buffer.extend(data) self._parse_packets() except Exception as e: print(f"接收错误: {e}") self.disconnect() time.sleep(2) def _parse_packets(self): """解析接收到的数据包""" end_marker = b'\n\n' while True: marker_pos = self.buffer.find(end_marker) if marker_pos == -1: break # 没有完整数据包 # 提取完整数据包 packet_bytes = self.buffer[:marker_pos + len(end_marker)] self.buffer = self.buffer[marker_pos + len(end_marker):] try: # 解析JSON json_bytes = packet_bytes[:-len(end_marker)] json_str = json_bytes.decode('utf-8') packet = json.loads(json_str) # 处理不同类型的数据包 self._handle_packet(packet) except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") except Exception as e: print(f"数据包处理错误: {e}") def _handle_packet(self, packet): """处理解析后的数据包""" packet_type = packet.get("type") if packet_type == "recording": # 处理录音数据 if self.audio_callback: audio_data = base64.b64decode(packet.get("data", "")) self.audio_callback(audio_data) elif packet_type in ["startRecorder", "stopRecord"]: # 处理命令 if self.command_callback: self.command_callback(packet_type) else: print(f"未知数据包类型: {packet_type}") def _is_connected(self): """检查连接状态""" if not self.socket: return False try: self.socket.sendall(b'') return True except: return False # 发送命令到安卓设备 def send_command(self, command_type, data=None): """发送命令到安卓设备""" if not self.socket or not self._is_connected(): print(f"发送失败: 未连接到设备") return False try: # 构建JSON数据包 packet = { "type": command_type, "data": base64.b64encode(data).decode('ascii') if data else None } # 转换为JSON字符串并添加结束标志 json_str = json.dumps(packet) packet_bytes = (json_str + "\n\n").encode('utf-8') # 发送数据包 self.socket.sendall(packet_bytes) print(f"已发送命令: {command_type}") return True except Exception as e: print(f"发送错误: {e}") return False def pcm_to_base64_str(pcm_data: bytes) -> str: """将PCM转为Base64编码字符串""" return base64.b64encode(pcm_data).decode('ascii') def pcm_to_utf8(pcm_data: bytearray) -> str: """将16位PCM音频数据转为UTF-8字符串""" def validate_pcm(data: bytearray) -> bool: """验证PCM数据有效性""" return len(data) % 2 == 0 # 16位PCM需为偶数长度 if not validate_pcm(pcm_data): raise ValueError("无效的PCM数据长度,16位PCM需为偶数长度") try: # 转为16位有符号整数数组(小端序) samples = np.frombuffer(pcm_data, dtype='<i2') # 标准化到0-255范围 normalized = ((samples - samples.min()) * (255 / (samples.max() - samples.min()))).astype(np.uint8) # 转换为UTF-8字符串 return bytes(normalized).decode('utf-8', errors='replace') except Exception as e: raise RuntimeError(f"转换失败: {str(e)}") def audio_data_handler(audio_data): """音频数据处理回调""" print(f"收到音频数据: {len(audio_data)} 字节") # 打印前32字节的十六进制表示 if len(audio_data) > 0: hex_preview = ' '.join(f'{b:02x}' for b in audio_data[:32]) print(f"前32字节: {hex_preview}...") # 调试用:将PCM转为UTF-8 if len(audio_data) < 1024: try: utf8_data = pcm_to_utf8(audio_data) print(f"UTF-8预览: {utf8_data[:30]}...") except: pass def command_handler(command_type): """命令处理回调""" if command_type == "startRecorder": print("[命令] 开始录音") elif command_type == "stopRecord": print("[命令] 停止录音") def on_device_callback(event, device_name): """设备事件回调""" if event == "connected": print(f"[设备已连接] {device_name}") # 设置端口转发 adb_executor = ADBExecutor() if adb_executor.setup_port_forwarding(device_name): # 连接并开始接收音频 global audio_receiver audio_receiver = AudioReceiver(ADBExecutor.HOST_PORT) if audio_receiver.connect(): audio_receiver.start_receiving( audio_callback=audio_data_handler, command_callback=command_handler ) print("已开始接收音频数据") else: print("连接失败") else: print("端口转发设置失败") elif event == "disconnected": print(f"[设备已断开连接] {device_name}") if 'audio_receiver' in globals(): audio_receiver.stop_receiving() # 主函数 def run_listener(): adb_executor = ADBExecutor() listener = AdbDeviceListener(adb_executor, interval=2, on_device_callback=on_device_callback) listener.start() try: while True: time.sleep(1) except KeyboardInterrupt: print("程序关闭,停止监听") listener.stop() if 'audio_receiver' in globals(): audio_receiver.stop_receiving() if __name__ == '__main__': run_listener() 把脚本和安卓端优化下 让安卓当客户端 python 当服务端去接收,现在是相反

我们将在飞牛NAS上部署一个Docker容器,该容器运行一个WebSocket服务器,要求:1.可以不用密钥验证功能2.接收数据并存储到SQLite数据库3.提供API接口用于查询实时数据、历史数据4.支持根据指定Tag生成历史曲线图##解决方案###1.项目结构创建项目目录ws-server,结构如下:ws-server/├──Dockerfile├──requirements.txt├──app/│├──main.py#主程序,包含WebSocket和API服务│├──auth.py#密钥验证│├──database.py#数据库操作│└──plotter.py#绘图功能###2.代码实现####2.1密钥验证(auth.py)pythonfromfastapiimportHTTPException,HeaderSECRET_KEY="your_secure_key_123"#实际部署时使用环境变量asyncdefverify_token(authorization:str=Header(None)):ifauthorizationisNoneorauthorization!=f"Bearer{SECRET_KEY}":raiseHTTPException(status_code=403,detail="Invalidauthenticationtoken")####2.2数据库操作(database.py)pythonimportsqlite3importosDB_PATH=os.getenv('DB_PATH','sensor_data.db')definit_db():conn=sqlite3.connect(DB_PATH)c=conn.cursor()c.execute('''CREATETABLEIFNOTEXISTSsensor_data(idINTEGERPRIMARYKEYAUTOINCREMENT,timestampDATETIMEDEFAULTCURRENT_TIMESTAMP,tag_idINTEGER,tag_nameTEXT,valueREAL,data_timeTEXT)''')conn.commit()conn.close()definsert_data(tag_id:int,tag_name:str,value:float,data_time:str):conn=sqlite3.connect(DB_PATH)c=conn.cursor()c.execute("INSERTINTOsensor_data(tag_id,tag_name,value,data_time)VALUES(?,?,?,?)",(tag_id,tag_name,value,data_time))conn.commit()conn.close()defget_latest_data(tag_id:int,limit:int=1):conn=sqlite3.connect(DB_PATH)c=conn.cursor()c.execute("SELECT*FROMsensor_dataWHEREtag_id=?ORDERBYtimestampDESCLIMIT?",(tag_id,limit))result=c.fetchall()conn.close()returnresultdefget_historical_data(tag_id:int,hours:int=24):conn=sqlite3.connect(DB_PATH)c=conn.cursor()#获取最近hours小时的数据c.execute("SELECT*FROMsensor_dataWHEREtag_id=?ANDtimestamp>=datetime('now',?)ORDERBYtimestamp",(tag_id,f'-{hours}hours'))result=c.fetchall()conn.close()returnresult####2.3绘图功能(plotter.py)pythonimportmatplotlib.pyplotaspltimportnumpyasnpfromdatetimeimportdatetimeimportosfrom.databaseimportget_historical_dataPLOT_DIR='plots'defgenerate_plot(tag_id:int,hours:int=24):#确保绘图目录存在ifnotos.path.exists(PLOT_DIR):os.makedirs(PLOT_DIR)#获取数据data=get_historical_data(tag_id,hours)ifnotdata:returnNonetimestamps=[row[1]forrowindata]#时间戳values=[row[4]forrowindata]#值#将时间字符串转换为datetime对象timestamps=[datetime.strptime(ts,'%Y-%m-%d%H:%M:%S')fortsintimestamps]plt.figure(figsize=(10,6))plt.plot(timestamps,values,'b-')plt.title(f'TagID:{tag_id}-Last{hours}Hours')plt.xlabel('Time')plt.ylabel('Value')plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()#保存图像plot_path=os.path.join(PLOT_DIR,f'tag_{tag_id}_{hours}h.png')plt.savefig(plot_path)plt.close()returnplot_path####2.4主程序(main.py)pythonimportasyncioimportwebsocketsimportjsonimportosfromfastapiimportFastAPI,APIRouter,Depends,HTTPExceptionfromfastapi.responsesimportFileResponsefrom.importauth,database,plotterapp=FastAPI()api_router=APIRouter()#初始化数据库database.init_db()#WebSocket服务器asyncdefsensor_server(websocket,path):try:#第一步:接收密钥进行验证token=awaitwebsocket.recv()iftoken!=auth.SECRET_KEY:awaitwebsocket.close(code=1008,reason="Authenticationfailed")return#第二步:接收数据asyncformessageinwebsocket:try:data=json.loads(message)#数据格式:{"TagId":1,"TagName":"1#注浆A液压力","Value":0.76,"DataTime":"2025-6-182:28"}database.insert_data(tag_id=data['TagId'],tag_name=data['TagName'],value=data['Value'],data_time=data['DataTime'])print(f"Inserteddatafortag{data['TagId']}")exceptExceptionase:print(f"Errorprocessingmessage:{e}")exceptwebsockets.exceptions.ConnectionClosedase:print(f"Clientdisconnected:{e}")#API路由@api_router.get("/data/latest/{tag_id}")asyncdefget_latest_data(tag_id:int,_:str=Depends(auth.verify_token)):data=database.get_latest_data(tag_id)ifnotdata:raiseHTTPException(status_code=404,detail="Datanotfound")returndata[0]#返回最新一条@api_router.get("/data/history/{tag_id}")asyncdefget_history_data(tag_id:int,hours:int=24,_:str=Depends(auth.verify_token)):data=database.get_historical_data(tag_id,hours)ifnotdata:raiseHTTPException(status_code=404,detail="Datanotfound")returndata@api_router.get("/plot/{tag_id}")asyncdefget_plot(tag_id:int,hours:int=24,_:str=Depends(auth.verify_token)):plot_path=plotter.generate_plot(tag_id,hours)ifplot_pathisNone:raiseHTTPException(status_code=404,detail="Nodataavailableforplotting")returnFileResponse(plot_path)app.include_router(api_router,prefix="/api")#启动WebSocket服务器和FastAPI服务asyncdefstart_servers():#启动WebSocket服务器ws_server=awaitwebsockets.serve(sensor_server,"0.0.0.0",8765)print("WebSocketserverstartedonport8765")#注意:FastAPI服务需要单独启动,这里我们使用uvicorn,在Dockerfile中启动if__name__=="__main__":asyncio.run(start_servers())###3.Docker配置####3.1DockerfiledockerfileFROMpython:3.9-slimWORKDIR/app#安装依赖RUNapt-getupdate&&apt-getinstall-ylibsqlite3-devCOPYrequirements.txt.RUNpipinstall--no-cache-dir-rrequirements.txtCOPY..#设置环境变量(密钥通过环境变量传入)ENVSECRET_KEY=your_actual_key_here#启动命令:同时启动WebSocket服务器和FastAPI服务CMD["sh","-c","uvicornapp.main:app--host0.0.0.0--port8000&python-mapp.main"]####3.2requirements.txtfastapiuvicornwebsocketssqlalchemymatplotlibpython-multipart###4.在飞牛NAS上部署1.**构建镜像**:将整个ws-server目录上传到飞牛NAS,然后进入该目录执行:bashdockerbuild-tws-sensor-server.2.**运行容器**:bashdockerrun-d--namesensor-server-p8765:8765#WebSocket端口-p8000:8000#API端口-v/path/on/nas/data:/app/data#挂载数据库和绘图目录-eSECRET_KEY=“your_actual_key_here”\ws-sensor-server3.**飞牛NAS图形界面操作**:-打开飞牛NAS的Docker管理界面-选择“镜像”并导入构建好的ws-sensor-server镜像-创建容器,配置端口映射:8765和8000-设置环境变量:SECRET_KEY=your_actual_key_here-挂载数据卷:将NAS上的目录挂载到容器内的/app/data-启动容器###5.功能测试####5.1WebSocket数据发送pythonimportasyncioimportwebsocketsimportjsonasyncdefsend_data():asyncwithwebsockets.connect(‘ws://your_nas_ip:8765’)asws:#先发送密钥awaitws.send(“your_actual_key_here”)#发送数据data={“TagId”:1,“TagName”:“1#注浆A液压力”,“Value”:0.76,“DataTime”:“2025-06-1802:28:00”}awaitws.send(json.dumps(data))print(“Datasent”)asyncio.run(send_data())####5.2API接口调用-获取最新数据:GEThttp://your_nas_ip:8000/api/data/latest/1需在Header中添加:Authorization:Beareryour_actual_key_here-获取历史数据:GEThttp://your_nas_ip:8000/api/data/history/1?hours=24-获取曲线图:GEThttp://your_nas_ip:8000/api/plot/1?hours=72###6.注意事项1.密钥管理:在实际部署中,应通过环境变量设置密钥,避免硬编码。2.数据持久化:通过挂载卷将数据库文件和绘图目录保存在NAS上,避免容器重启数据丢失。3.性能优化:对于高频数据,考虑使用批量插入和数据库索引优化。4.安全性:建议在生产环境中使用HTTPS加密API和WebSocket通信(可通过Nginx反向代理实现)。§§相关问题§§1.如何为SQLite数据库添加索引以提高查询性能?2.如何在Docker容器中安全地管理密钥?3.如何通过Nginx为WebSocket和API服务添加HTTPS加密?4.如何扩展此系统以支持多客户端并发连接?5.如何实现数据存储的分区或分表以优化大量历史数据的查询?

PS D:\conda_Test\baidu_spider\baidu_spider> scrapy crawl baidu -o realtime.csv 2025-06-26 20:37:39 [scrapy.utils.log] INFO: Scrapy 2.11.1 started (bot: baidu_spider) 2025-06-26 20:37:39 [scrapy.utils.log] INFO: Versions: lxml 5.2.1.0, libxml2 2.13.1, cssselect 1.2.0, parsel 1.8.1, w3lib 2.1.2, Twisted 23.10.0, Python 3.12.7 | packaged by Anaconda, Inc. | (main, Oct 4 2024, 13:17:27) [MSC v.1929 64 bit (AMD64)], pyOpenSSL 24.2.1 (OpenSSL 3.0.16 11 Feb 2025), cryptography 43.0.0, Platform Windows-11-10.0.22631-SP0 2025-06-26 20:37:39 [scrapy.addons] INFO: Enabled addons: [] 2025-06-26 20:37:39 [asyncio] DEBUG: Using selector: SelectSelector 2025-06-26 20:37:39 [scrapy.utils.log] DEBUG: Using reactor: twisted.internet.asyncioreactor.AsyncioSelectorReactor 2025-06-26 20:37:39 [scrapy.utils.log] DEBUG: Using asyncio event loop: asyncio.windows_events._WindowsSelectorEventLoop 2025-06-26 20:37:39 [scrapy.extensions.telnet] INFO: Telnet Password: 40e94de686f0a93d 2025-06-26 20:37:39 [scrapy.middleware] INFO: Enabled extensions: ['scrapy.extensions.corestats.CoreStats', 'scrapy.extensions.telnet.TelnetConsole', 'scrapy.extensions.feedexport.FeedExporter', 'scrapy.extensions.logstats.LogStats'] 2025-06-26 20:37:39 [scrapy.crawler] INFO: Overridden settings: {'BOT_NAME': 'baidu_spider', 'FEED_EXPORT_ENCODING': 'utf-8', 'NEWSPIDER_MODULE': 'baidu_spider.spiders', 'REQUEST_FINGERPRINTER_IMPLEMENTATION': '2.7', 'ROBOTSTXT_OBEY': True, 'SPIDER_MODULES': ['baidu_spider.spiders'], 'TWISTED_REACTOR': 'twisted.internet.asyncioreactor.AsyncioSelectorReactor'} 2025-06-26 20:37:40 [scrapy.middleware] INFO: Enabled downloader middlewares: ['scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware', 'scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware', 'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware', 'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware', 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware', 'scrapy.downloadermiddlewares.retry.RetryMiddleware', 'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware', 'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware', 'scrapy.downloadermiddlewares.redirect.RedirectMiddleware', 'scrapy.downloadermiddlewares.cookies.CookiesMiddleware', 'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware', 'scrapy.downloadermiddlewares.stats.DownloaderStats'] 2025-06-26 20:37:40 [scrapy.middleware] INFO: Enabled spider middlewares: ['scrapy.spidermiddlewares.httperror.HttpErrorMiddleware', 'scrapy.spidermiddlewares.offsite.OffsiteMiddleware', 'scrapy.spidermiddlewares.referer.RefererMiddleware', 'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware', 'scrapy.spidermiddlewares.depth.DepthMiddleware'] 2025-06-26 20:37:40 [scrapy.middleware] INFO: Enabled item pipelines: ['baidu_spider.pipelines.BaiduSpiderPrintPipeline', 'baidu_spider.pipelines.BaiduSpiderPipeline'] 2025-06-26 20:37:40 [scrapy.core.engine] INFO: Spider opened 2025-06-26 20:37:40 [scrapy.core.engine] INFO: Closing spider (shutdown) 2025-06-26 20:37:40 [baidu] INFO: 执行了close_spider方法,项目已经关闭 2025-06-26 20:37:40 [scrapy.utils.signal] ERROR: Error caught on signal handler: <bound method CoreStats.spider_closed of <scrapy.extensions.corestats.CoreStats object at 0x000001BB483C0470>> Traceback (most recent call last): File "D:\anaconda3\Lib\site-packages\scrapy\crawler.py", line 160, in crawl yield self.engine.open_spider(self.spider, start_requests) NameError: name 'baidu_spider' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "D:\anaconda3\Lib\site-packages\scrapy\utils\defer.py", line 348, in maybeDeferred_coro result = f(*args, **kw) File "D:\anaconda3\Lib\site-packages\pydispatch\robustapply.py", line 55, in robustApply return receiver(*arguments, **named) File "D:\anaconda3\Lib\site-packages\scrapy\extensions\corestats.py", line 30, in spider_closed elapsed_time = finish_time - self.start_time TypeError: unsupported operand type(s) for -: 'datetime.datetime' and 'NoneType' 2025-06-26 20:37:40 [scrapy.statscollectors] INFO: Dumping Scrapy stats: {'log_count/DEBUG': 3, 'log_count/ERROR': 1, 'log_count/INFO': 9} 2025-06-26 20:37:40 [scrapy.core.engine] INFO: Spider closed (shutdown) Unhandled error in Deferred: 2025-06-26 20:37:40 [twisted] CRITICAL: Unhandled error in Deferred: Traceback (most recent call last): File "D:\anaconda3\Lib\site-packages\twisted\internet\defer.py", line 874, in callback self._startRunCallbacks(result) File "D:\anaconda3\Lib\site-packages\twisted\internet\defer.py", line 981, in _startRunCallbacks self._runCallbacks() File "D:\anaconda3\Lib\site-packages\twisted\internet\defer.py", line 1075, in _runCallbacks current.result = callback( # type: ignore[misc] File "D:\anaconda3\Lib\site-packages\twisted\internet\defer.py", line 1946, in _gotResultInlineCallbacks _inlineCallbacks(r, gen, status, context) --- <exception caught here> --- File "D:\anaconda3\Lib\site-packages\twisted\internet\defer.py", line 2000, in _inlineCallbacks result = context.run(gen.send, result) File "D:\anaconda3\Lib\site-packages\scrapy\crawler.py", line 160, in crawl yield self.engine.open_spider(self.spider, start_requests) builtins.NameError: name 'baidu_spider' is not defined 2025-06-26 20:37:40 [twisted] CRITICAL: Traceback (most recent call last): File "D:\anaconda3\Lib\site-packages\twisted\internet\defer.py", line 2000, in _inlineCallbacks result = context.run(gen.send, result) File "D:\anaconda3\Lib\site-packages\scrapy\crawler.py", line 160, in crawl yield self.engine.open_spider(self.spider, start_requests) NameError: name 'baidu_spider' is not defined PS D:\conda_Test\baidu_spider\baidu_spider> 如何解决

import json from typing import Dict, List, Any from datetime import datetime # 假设 process_item 函数的实现 async def process_item(item: Dict[str, Any], context: Any) -> Dict[str, Any]: # 这里只是简单返回原数据,实际中需要实现具体的处理逻辑 return item async def main(args:Args) -> Output: """元数据增强节点(输入变量名已更新)""" ret: Dict[str.any]= { "enhanced_batch": [], "metadata": { "batch_id": "unknown", "success_count": 0, "error_count": 0, "cost_seconds": 0.0 }, "error": None } # 定义 context 变量 context = { "config": { "enhancement_level": 2 } } try: # 输入参数解析(变量名改为converted_data) print("Getting converted_data with args.get('params', {}).get('converted_data', [])") params = args.get('params', {}) input_batch = params.get('converted_data', []) print("Converted_data retrieved:", input_batch) input_batch = [json.loads(item) if isinstance(item, str) else item for item in input_batch] # 防御性校验(同步更新错误提示) if not isinstance(input_batch, list): raise TypeError(f"converted_data参数格式错误,期望列表类型,实际为 {type(input_batch).__name__}") if not all(isinstance(item, dict) for item in input_batch): raise ValueError("输入数据项必须为字典类型") # 性能监控埋点 start_time = datetime.now() # 批量增强处理 processed_items = [] error_details = [] for idx, item in enumerate(input_batch): # 变量名同步修改 try: enhanced_item = await process_item(item,context) processed_items.append(enhanced_item) except Exception as e: error_details.append({ "index": idx, "id": item.get('id', 'unknown'), "error": str(e)[:200] }) # 构造标准化输出 ret["enhanced_batch"] = processed_items print("Getting batch_number with args.get('batch_number'

大家在看

recommend-type

基于ADS的微带滤波器设计

微波滤波器是用来分离不同频率微波信号的一种器件。它的主要作用是抑制不需要的信号,使其不能通过滤波器,只让需要的信号通过。在微波电路系统中,滤波器的性能对电路的性能指标有很大的影响,因此如何设计出一个具有高性能的滤波器,对设计微波电路系统具有很重要的意义。
recommend-type

Pixhawk4飞控驱动.zip

已安装成功
recommend-type

ztecfg中兴配置加解密工具3.0版本.rar

中兴光猫配置文件加解密工具3.0 .\ztecfg.exe -d AESCBC -i .\(要解密的文件名)db_user_cfg.xml -o (解密后文件名)123.cfg
recommend-type

配置车辆-feedback systems_an introduction for scientists and engineers

5.2 道路场景 从界面右侧的道路场景列表中,双击载入所需的道路场景(如 Fld_FreeWay)。 PanoSim提供了 ADAS标准(ISO、Euro NCAP)典型场景库,如高速公路、乡村道路、 城镇、坡道、换道、停车场、高速出入口等。我们可根据用户需要定制丰富场景库。 PanoSim提供专门的道路场景设计工具,可通过常用工具栏\Tools\FieldBuilder 来创建自己的道路场景。 5.3 天气和光照 从右侧的实验环境列表栏中,通过双击载入所需的实验天气和光照。天气有多 云、雾天、雨天、雪天、晴天,光照有白天和夜晚,相关实验信息(如所选场景、天 气、车辆等),可在左侧实验信息栏中查看。 5.4 配置车辆 点击“Forward”,进入实验参数设置主界面(图 5-2)。
recommend-type

xilinx.com_user_IIC_AXI_1.0.zip

可以直接用在vivado 2017.4版本里。查看各个寄存器就知道用来干什么了,一号寄存器分频系数,二号的start、stop信号,三号寄存器8bit数据,四号寄存器只读,返回IIC状态和ACK信号,其中二号的一个bit可以用来不等待从机ACK,方便使用。

最新推荐

recommend-type

开发界面语义化:声控 + 画图协同生成代码.doc

开发界面语义化:声控 + 画图协同生成代码.doc
recommend-type

LABVIEW与三菱PLC通信:实现数据批量读写的高效库解决方案

如何通过LabVIEW与三菱PLC建立高效的通信桥梁,实现数据批量读写。首先概述了LabVIEW和三菱PLC的基本概念及其在工业自动化中的重要性。接着重点讲解了利用Modbus RTU协议构建通信连接的具体步骤和技术细节,包括初始化通信、发送读写请求、处理响应数据和关闭连接等功能。文中还提供了一个简化的代码示例,展示了如何在LabVIEW环境中实现这一过程。最后对这项技术进行了总结和展望,强调其在提高数据交互效率方面的潜力以及未来的广泛应用前景。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些熟悉LabVIEW或三菱PLC的人士。 使用场景及目标:适用于需要频繁进行数据交互的工业控制系统,如生产线监控、设备状态监测等场合。主要目的是提升数据传输的速度和可靠性,从而优化整个系统的运行效率。 阅读建议:读者可以通过本文深入了解LabVIEW与三菱PLC通信的实现方法,掌握批量数据读写库的设计思路,并将其应用于实际工程项目中。建议边阅读边尝试动手实践相关代码,以便更好地理解和吸收所学知识。
recommend-type

欧姆龙PLC NJ系列模切机程序:高级伺服运动与张力控制的应用实例

欧姆龙PLC NJ系列模切机项目的编程细节及其关键技术。主要内容涵盖12轴EtherCAT总线伺服运动控制,包括回零、点动、定位和速度控制;张力控制采用PID算法并进行收放卷径计算;隔膜自动纠偏控制利用模拟量数据平均化处理;同步运动控制实现凸轮表追剪和裁切;以及结构化编程和ST语言功能块的使用。项目结构规范,注释详尽,有助于理解和维护代码。通过本项目的学习,可以掌握PLC高端复杂应用的实际操作技能。 适合人群:从事工业自动化领域的工程师和技术人员,特别是对PLC编程和伺服运动控制有浓厚兴趣的人群。 使用场景及目标:适用于需要深入了解PLC编程技巧和自动化控制系统原理的技术人员。目标是提升编程能力和对复杂自动化系统的工作机制的理解。 其他说明:本文不仅提供具体的编程指导,还强调了项目管理和代码规范的重要性,为读者提供了全面的学习体验。
recommend-type

大班主题性区域活动计划表.doc

大班主题性区域活动计划表.doc
recommend-type

Python程序TXLWizard生成TXL文件及转换工具介绍

### 知识点详细说明: #### 1. 图形旋转与TXL向导 图形旋转是图形学领域的一个基本操作,用于改变图形的方向。在本上下文中,TXL向导(TXLWizard)是由Esteban Marin编写的Python程序,它实现了特定的图形旋转功能,主要用于电子束光刻掩模的生成。光刻掩模是半导体制造过程中非常关键的一个环节,它确定了在硅片上沉积材料的精确位置。TXL向导通过生成特定格式的TXL文件来辅助这一过程。 #### 2. TXL文件格式与用途 TXL文件格式是一种基于文本的文件格式,它设计得易于使用,并且可以通过各种脚本语言如Python和Matlab生成。这种格式通常用于电子束光刻中,因为它的文本形式使得它可以通过编程快速创建复杂的掩模设计。TXL文件格式支持引用对象和复制对象数组(如SREF和AREF),这些特性可以用于优化电子束光刻设备的性能。 #### 3. TXLWizard的特性与优势 - **结构化的Python脚本:** TXLWizard 使用结构良好的脚本来创建遮罩,这有助于开发者创建清晰、易于维护的代码。 - **灵活的Python脚本:** 作为Python程序,TXLWizard 可以利用Python语言的灵活性和强大的库集合来编写复杂的掩模生成逻辑。 - **可读性和可重用性:** 生成的掩码代码易于阅读,开发者可以轻松地重用和修改以适应不同的需求。 - **自动标签生成:** TXLWizard 还包括自动为图形对象生成标签的功能,这在管理复杂图形时非常有用。 #### 4. TXL转换器的功能 - **查看.TXL文件:** TXL转换器(TXLConverter)允许用户将TXL文件转换成HTML或SVG格式,这样用户就可以使用任何现代浏览器或矢量图形应用程序来查看文件。 - **缩放和平移:** 转换后的文件支持缩放和平移功能,这使得用户在图形界面中更容易查看细节和整体结构。 - **快速转换:** TXL转换器还提供快速的文件转换功能,以实现有效的蒙版开发工作流程。 #### 5. 应用场景与技术参考 TXLWizard的应用场景主要集中在电子束光刻技术中,特别是用于设计和制作半导体器件时所需的掩模。TXLWizard作为一个向导,不仅提供了生成TXL文件的基础框架,还提供了一种方式来优化掩模设计,提高光刻过程的效率和精度。对于需要进行光刻掩模设计的工程师和研究人员来说,TXLWizard提供了一种有效的方法来实现他们的设计目标。 #### 6. 系统开源特性 标签“系统开源”表明TXLWizard遵循开放源代码的原则,这意味着源代码对所有人开放,允许用户自由地查看、修改和分发软件。开源项目通常拥有活跃的社区,社区成员可以合作改进软件,添加新功能,或帮助解决遇到的问题。这种开放性促进了技术创新,并允许用户根据自己的需求定制软件。 #### 7. 压缩包子文件的文件名称列表 文件名称列表中的“txlwizard-master”可能指的是TXLWizard项目的主版本库或主分支。这个名称表明了这是项目源代码的中心点,其他开发者会从这个主分支拉取代码进行合作开发或部署。以“-master”结尾通常是版本控制系统中表示主要开发线路的常见约定,例如Git中的master(现在更常被称为main)分支。 通过这些知识点的详细解释,我们可以看到TXLWizard不仅是一个用于生成TXL文件的工具,它还整合了一系列的功能,使得电子束光刻掩模的设计工作更为高效和直观。同时,作为一个开源项目,它能够借助社区的力量不断进步,为用户带来更多的便利和创新。
recommend-type

【创新图生成:扣子平台的技术前沿与创新思维】:引领图像生成技术的新潮流

# 1. 扣子平台的图像生成技术概述 在当今数字化时代,图像生成技术正成为推动技术创新和社会变革的关键力量。扣子平台正是这一技术革新的代表。本章将对扣子平台的图像生成技术进行一个全面概述,探究它如何将理论技术转化为实际应用,并带来显著的社会影响。 扣子平台运用了先进的图像生成技术,将现实世界和数字空间无缝融合,打造了沉浸式的视觉体验。其技术的核心是通过高度复杂的算法模型,快速生成高质
recommend-type

海康威视机器视觉工程师考核

<think>好的,我现在需要帮助用户了解海康威视机器视觉工程师的考核内容和要求。根据用户提供的引用资料,特别是引用[1]和[2],里面提到了考核素材包分为初级和中级,涵盖理论、算法、应用案例等。首先,我要整理这些信息,确保结构清晰,符合用户要求的格式。 接下来,我需要确认素材包的具体内容,比如初级和中级的不同点。引用[2]提到初级包含基础理论、算法实现和实际案例,中级则增加复杂算法和项目分析。这部分需要分点说明,方便用户理解层次。 另外,用户可能想知道如何准备考核,比如下载素材、学习顺序、模拟考核等,引用[2]中有使用说明和注意事项,这部分也要涵盖进去。同时要注意提醒用户考核窗口已关闭,
recommend-type

Linux环境下Docker Hub公共容器映像检测工具集

在给出的知识点中,我们需要详细解释有关Docker Hub、公共容器映像、容器编排器以及如何与这些工具交互的详细信息。同时,我们会涵盖Linux系统下的相关操作和工具使用,以及如何在ECS和Kubernetes等容器编排工具中运用这些检测工具。 ### Docker Hub 和公共容器映像 Docker Hub是Docker公司提供的一项服务,它允许用户存储、管理以及分享Docker镜像。Docker镜像可以视为应用程序或服务的“快照”,包含了运行特定软件所需的所有必要文件和配置。公共容器映像指的是那些被标记为公开可见的Docker镜像,任何用户都可以拉取并使用这些镜像。 ### 静态和动态标识工具 静态和动态标识工具在Docker Hub上用于识别和分析公共容器映像。静态标识通常指的是在不运行镜像的情况下分析镜像的元数据和内容,例如检查Dockerfile中的指令、环境变量、端口映射等。动态标识则需要在容器运行时对容器的行为和性能进行监控和分析,如资源使用率、网络通信等。 ### 容器编排器与Docker映像 容器编排器是用于自动化容器部署、管理和扩展的工具。在Docker环境中,容器编排器能够自动化地启动、停止以及管理容器的生命周期。常见的容器编排器包括ECS和Kubernetes。 - **ECS (Elastic Container Service)**:是由亚马逊提供的容器编排服务,支持Docker容器,并提供了一种简单的方式来运行、停止以及管理容器化应用程序。 - **Kubernetes**:是一个开源平台,用于自动化容器化应用程序的部署、扩展和操作。它已经成为容器编排领域的事实标准。 ### 如何使用静态和动态标识工具 要使用这些静态和动态标识工具,首先需要获取并安装它们。从给定信息中了解到,可以通过克隆仓库或下载压缩包并解压到本地系统中。之后,根据需要针对不同的容器编排环境(如Dockerfile、ECS、Kubernetes)编写配置,以集成和使用这些检测工具。 ### Dockerfile中的工具使用 在Dockerfile中使用工具意味着将检测工具的指令嵌入到构建过程中。这可能包括安装检测工具的命令、运行容器扫描的步骤,以及将扫描结果集成到镜像构建流程中,确保只有通过安全和合规检查的容器镜像才能被构建和部署。 ### ECS与Kubernetes中的工具集成 在ECS或Kubernetes环境中,工具的集成可能涉及到创建特定的配置文件、定义服务和部署策略,以及编写脚本或控制器来自动执行检测任务。这样可以在容器编排的过程中实现实时监控,确保容器编排器只使用符合预期的、安全的容器镜像。 ### Linux系统下的操作 在Linux系统下操作这些工具,用户可能需要具备一定的系统管理和配置能力。这包括使用Linux命令行工具、管理文件系统权限、配置网络以及安装和配置软件包等。 ### 总结 综上所述,Docker Hub上的静态和动态标识工具提供了一种方法来检测和分析公共容器映像,确保这些镜像的安全性和可靠性。这些工具在Linux开发环境中尤为重要,因为它们帮助开发人员和运维人员确保他们的容器映像满足安全要求。通过在Dockerfile、ECS和Kubernetes中正确使用这些工具,可以提高应用程序的安全性,减少由于使用不安全的容器镜像带来的风险。此外,掌握Linux系统下的操作技能,可以更好地管理和维护这些工具,确保它们能够有效地发挥作用。
recommend-type

【扣子平台图像艺术探究:理论与实践的完美结合】:深入学习图像生成的艺术

# 1. 图像艺术的理论基础 艺术领域的每一个流派和技巧都有其理论基础。在图像艺术中,理论基础不仅是对艺术表现形式的认知,也是掌握艺术创作内在逻辑的关键。深入理解图像艺术的理论基础,能够帮助艺术家们在创作过程中更加明确地表达自己的艺术意图,以及更好地与观众沟通。 图像艺术的理论
recommend-type

增广路定理的证明

<think>嗯,用户想查找增广路定理的证明,关于图论和网络流算法的。我得先理解增广路定理是什么,然后找到相关的证明方法。根据之前的引用,尤其是引用5提到最大流最小割定理,里面有三个等价条件,其中第二个是残余网络中没有增广路径时,流就是最大流。这可能和增广路定理相关,也就是当残余网络中没有增广路时,当前流就是最大流,这可能就是增广路定理的内容。 首先,我需要明确增广路定理的陈述。根据引用5,增广路定理可能指的是:一个流是最大流当且仅当残余网络中不存在增广路径。这个定理的证明需要用到最大流最小割定理,也就是第三个条件,即最大流的流量等于最小割的容量。 证明的步骤可能需要分为两个方向:必要性(