file-type

GitHub Pages中Markdown语法与Jekyll主题的应用

ZIP文件

下载需积分: 50 | 6KB | 更新于2025-09-05 | 160 浏览量 | 0 下载量 举报 收藏
download 立即下载
标题“knowledgebase:通用知识数据库”指出,此处将讨论的是与知识库相关的概念和技术细节。一个通用知识数据库(knowledgebase)通常指的是一种系统化的、电子化的资料集合,旨在组织、存储、检索和共享知识信息。在IT领域,知识库系统通常用于客户支持、内部信息共享、文档管理等多种用途。 描述中提到的几个关键知识点包括GitHub Pages、Markdown语言、Syntax Highlighting(代码块的语法高亮)、Markdown的标题和列表格式、加粗和斜体文本、链接和图片的插入方式以及Jekyll主题。 首先,GitHub Pages是GitHub提供的一个静态网站托管服务,允许用户通过GitHub仓库直接发布网站。用户可以维护和预览Markdown格式的文件,而每当向仓库提交更新时,GitHub Pages会自动运行并根据Markdown文件中的内容重建网站页面。这意味着Markdown文件作为编写网站内容的基础,用户无需直接处理HTML代码。 Markdown是一种轻量级标记语言,通过简单的语法来格式化文字。该语言的设计目标是尽可能地易于阅读和编写,同时易于转换成有效的HTML文档。例如,Markdown使用井号(#)来表示标题,井号的数量表示标题的层级。对于加粗文本,需要使用两个星号(**bold**)或者下划线(__bold__),而斜体文本则使用一个星号(*italic*)或下划线(_italic_)。列表的创建有无序列表(使用星号、加号或减号)和有序列表(使用数字和点号)。 Markdown还支持Syntax Highlighting,这是一种代码高亮技术,主要用于提高代码块的可读性。通常,代码块可以用反引号`` ` ``包裹起来,并指定语言来实现语法高亮,例如在支持的Markdown编辑器中输入以下代码: ```python def function(): return "Hello World" ``` 此外,描述中还提到了Jekyll,这是一个由Ruby编写的静态网站生成器,它允许用户使用Markdown或Textile等标记语言来编写内容,并将这些内容转换成一个完整的网站。Jekyll主题提供了网站的布局和样式,主题的名称通常保存在名为 `_config.yml` 的配置文件中,而Markdown文件中则通过插入相应的模板标记来引用主题。 从文件名称“knowledgebase-master”可以推测出,该知识库的文件是存放在名为“knowledgebase”的仓库中,并且该仓库可能拥有一个“master”分支。在Git版本控制中,“master”分支通常是默认的主分支,用于存储项目的正式或稳定版本的代码。 结合以上信息,本文涵盖的关键知识点包括: - GitHub Pages:一种静态网站托管服务,支持Markdown格式的文件。 - Markdown:一种用于文本样式化的轻量级标记语言。 - Syntax Highlighting:代码块的语法高亮技术。 - Markdown的语法:包括标题、列表、加粗、斜体、链接和图片等元素的格式。 - Jekyll主题:为静态网站提供布局和样式的预设模板。 - Git分支:特别是“master”分支,用于维护项目的主版本。 这些知识点对于了解如何使用Markdown文件来维护和预览网站内容,以及如何通过GitHub Pages和Jekyll主题来构建和管理网站来说至关重要。

相关推荐

filetype

E:\AI_System\agent\model_manager.py import os import logging from typing import Dict, Any class ModelManager: “”“模型管理器 - 修复初始化参数问题”“” def __init__(self, model_registry: Dict[str, str] = None, cache_dir: str = "model_cache", use_gpu: bool = True): """ 初始化模型管理器 :param model_registry: 模型路径注册表 :param cache_dir: 缓存目录 :param use_gpu: 是否使用GPU """ self.logger = logging.getLogger("ModelManager") self.model_registry = model_registry or {} self.cache_dir = cache_dir self.use_gpu = use_gpu self.loaded_models = {} # 确保缓存目录存在 os.makedirs(cache_dir, exist_ok=True) self.logger.info(f"✅ 模型管理器初始化完成 (GPU: {'启用' if use_gpu else '禁用'})") self.logger.info(f"模型注册表: {list(model_registry.keys())}") def load_model(self, model_name: str): """加载指定模型""" if model_name not in self.model_registry: self.logger.error(f"未知模型: {model_name}") return False model_path = self.model_registry[model_name] if not os.path.exists(model_path): self.logger.error(f"模型路径不存在: {model_path}") return False # 实际加载逻辑 self.logger.info(f"加载模型: {model_name} ({model_path})") self.loaded_models[model_name] = { "path": model_path, "status": "loaded" } return True def unload_model(self, model_name: str = None): """卸载模型""" if model_name: if model_name in self.loaded_models: self.logger.info(f"卸载模型: {model_name}") del self.loaded_models[model_name] return True return False else: self.logger.info("卸载所有模型") self.loaded_models.clear() return True def get_model_info(self, model_name: str) -> Dict[str, Any]: """获取模型信息""" if model_name in self.loaded_models: return self.loaded_models[model_name] elif model_name in self.model_registry: return {"status": "registered", "path": self.model_registry[model_name]} return {"status": "unknown"} def list_models(self): """列出所有注册模型""" return list(self.model_registry.keys()) E:\AI_System\agent\cognitive_architecture.py import os import sys import logging import json import time import abc from pathlib import Path from agent.base_module import CognitiveModule class CognitiveSystem(CognitiveModule): “”“核心认知系统实现”“” VERSION = "1.2.0" # 默认配置参数 DEFAULT_CONFIG = { "reasoning_depth": 3, # 推理深度级别(1-5) "memory_limit": 1000, # 短期记忆容量 "auto_reflection": True, # 是否启用自动反思 "learning_threshold": 0.8, # 学习触发阈值 "error_recovery": True, # 是否启用错误恢复机制 "max_concurrent_tasks": 5 # 最大并发任务数 } def __init__(self, name: str, model_manager, config: dict = None): """ 初始化认知系统 :param name: 认知系统名称 :param model_manager: 模型管理器实例 :param config: 可选配置字典,覆盖默认配置 """ super().__init__(name) self.model_manager = model_manager # 合并默认配置和用户配置 self.config = self.DEFAULT_CONFIG.copy() if config is not None: self.config.update(config) # 验证配置有效性 self._validate_config() # 初始化系统组件 self._initialize_components() self.mode = "TASK_EXECUTION" # 默认任务执行模式 self.command_handlers = { "help": self.handle_help, "hi": self.handle_greeting, "hello": self.handle_greeting, "你好": self.handle_greeting, "在吗": self.handle_greeting, "status": self.handle_status, "mode": self.handle_mode, "models": self.handle_models, } # 初始化记忆系统 self.memory = { "short_term": [], "long_term": {}, "last_accessed": time.time() } # 初始化日志 self.logger = logging.getLogger(f"CognitiveSystem.{name}") self.logger.info(f"✅ 认知系统初始化完成 (版本 {self.VERSION})") self.logger.info(f"当前模式: {self.mode}") self.logger.debug(f"系统配置: {self.config}") # 新增方法:处理用户命令 def process_command(self, command: str) -> str: """处理用户命令的核心方法""" try: self.logger.info(f"🧠 处理命令: {command}") # 分割命令和参数 parts = command.split(maxsplit=1) cmd = parts[0].lower() arg = parts[1] if len(parts) > 1 else "" # 查找命令处理器 handler = self.command_handlers.get(cmd, self.handle_default) return handler(arg) except Exception as e: self.logger.error(f"命令处理失败: {str(e)}", exc_info=True) return f"❌ 处理命令时出错: {str(e)}" # 命令处理函数 def handle_greeting(self, arg: str) -> str: """处理问候命令""" return f"你好,我是{self.name}!有什么可以帮您?" def handle_help(self, arg: str) -> str: """处理帮助命令""" return """ === 高级命令系统 === 基础命令: help - 显示此帮助信息 exit/quit - 退出系统 status - 查看系统状态 mode [mode]- 切换工作模式 (reflect, task, learn) 系统控制: models - 显示已加载模型 config [key] [value] - 修改配置 多行输入: 输入多行命令时,在最后一行以 ;; 结束 """ def handle_status(self, arg: str) -> str: """处理状态查询命令""" return ( f"系统状态:\n" f"- 认知系统: {self.name} v{self.VERSION}\n" f"- 当前模式: {self.mode}\n" f"- 最后访问: {self.memory['last_accessed']}\n" f"- 短期记忆: {len(self.memory['short_term'])}/{self.config['memory_limit']} 条" ) def handle_mode(self, arg: str) -> str: """处理模式切换命令""" if not arg: return "请指定模式: reflect, task, learn" mode_map = { "reflect": "SELF_REFLECTION", "task": "TASK_EXECUTION", "learn": "LEARNING" } new_mode = mode_map.get(arg.lower(), "") if new_mode: self.set_mode(new_mode) return f"已切换到 {new_mode} 模式" return f"❌ 无效模式: {arg} (可用选项: reflect, task, learn)" def handle_models(self, arg: str) -> str: """处理模型查询命令""" try: models = "\n".join([ f"- {name}: {path}" for name, path in self.model_manager.config.items() ]) return f"已配置模型:\n{models}" except Exception as e: return f"❌ 获取模型信息失败: {str(e)}" def handle_default(self, command: str) -> str: """默认命令处理器""" return f"正在处理您的请求: {command}..." # 以下为原有方法... def _validate_config(self): """验证配置参数有效性""" if not 1 <= self.config.get("reasoning_depth", 3) <= 5: raise ValueError("推理深度必须在1-5范围内") if self.config.get("memory_limit", 1000) < 100: raise ValueError("内存限制不能小于100") def _initialize_components(self): """初始化认知系统的各个子组件""" self.logger.debug("初始化推理引擎...") self.logger.debug("初始化记忆系统...") self.logger.debug("初始化学习系统...") self.logger.debug("初始化任务调度器...") def process_stimulus(self, stimulus: dict): """处理输入刺激""" try: self.logger.debug(f"处理刺激: {stimulus}") self.memory["last_accessed"] = time.time() if self.mode == "SELF_REFLECTION": return self._process_self_reflection(stimulus) elif self.mode == "LEARNING": return self._process_learning(stimulus) else: # TASK_EXECUTION return self._process_task(stimulus) except Exception as e: self.logger.error(f"处理刺激失败: {str(e)}", exc_info=True) return {"error": f"处理失败: {str(e)}"} def generate_response(self): """生成响应(保留方法)""" return {"status": "ready", "mode": self.mode} def get_current_mode(self): """获取当前模式""" return self.mode def set_mode(self, new_mode: str): """切换模式""" valid_modes = ["SELF_REFLECTION", "TASK_EXECUTION", "LEARNING"] if new_mode in valid_modes: self.mode = new_mode self.logger.info(f"切换到 {new_mode} 模式") return {"status": "success", "new_mode": new_mode} else: self.logger.warning(f"无效模式: {new_mode}") return {"status": "error", "message": f"无效模式: {new_mode}"} def _process_task(self, stimulus: dict): """处理任务执行""" task_type = stimulus.get("type", "general") content = stimulus.get("content", {}) self.logger.info(f"处理任务: {task_type}") if task_type == "question": return {"response": f"收到问题: {content.get('text', '')}"} elif task_type == "command": return {"response": f"执行命令: {content.get('text', '')}"} else: return {"response": f"处理通用任务: {json.dumps(content)}"} def _process_self_reflection(self, stimulus: dict): """处理自我反思""" self.logger.info("执行深度反思...") return {"reflection": "反思完成", "insights": []} def _process_learning(self, stimulus: dict): """处理学习任务""" self.logger.info("执行学习任务...") return {"learning": "学习完成", "knowledge": "新知识"} def save_state(self, path: str): """保存系统状态""" state = { "version": self.VERSION, "mode": self.mode, "last_accessed": self.memory["last_accessed"] } try: with open(path, 'w') as f: json.dump(state, f) self.logger.info(f"✅ 状态已保存到 {path}") return True except Exception as e: self.logger.error(f"保存状态失败: {str(e)}", exc_info=True) return False def load_state(self, path: str): """加载系统状态""" try: with open(path, 'r') as f: state = json.load(f) self.mode = state.get("mode", "TASK_EXECUTION") self.logger.info(f"✅ 状态已从 {path} 加载") return True except Exception as e: self.logger.error(f"加载状态失败: {str(e)}", exc_info=True) return False #E:\AI_System\main.py import os import sys import traceback import threading import time import logging from pathlib import Path from core.config import config from core.command_listener import start_command_listener from agent.model_manager import ModelManager from agent.cognitive_architecture import CognitiveSystem from agent.environment_interface import EnvironmentInterface def setup_logging(): “”“配置系统日志记录”“” log_dir = Path(config.get(“LOG_DIR”, “logs”)) log_dir.mkdir(parents=True, exist_ok=True) log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" log_level = config.get("ENVIRONMENT.LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, log_level), format=log_format, handlers=[ logging.FileHandler(log_dir / "ai_system.log", encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger("Main") logger.info("日志系统初始化完成 (级别: %s)", log_level) return logger def main(): “”“主函数”“” logger = setup_logging() logger.info("=" * 50) logger.info("🚀 启动AI系统 - 核心认知模式") logger.info("=" * 50) logger.info("系统配置摘要:") logger.info(f"项目根目录: {config.get('DIRECTORIES.PROJECT_ROOT', '未设置')}") logger.info(f"默认模型路径: {config.get('MODEL_PATHS.TEXT_BASE', '未设置')}") logger.info(f"日志级别: {config.get('ENVIRONMENT.LOG_LEVEL', 'INFO')}") # 初始化模型管理器 try: model_manager = ModelManager( model_registry=config.get("MODEL_PATHS", {}), cache_dir=config.get("MODEL_CACHE_DIR", "model_cache"), use_gpu=config.get("ENVIRONMENT.USE_GPU", True) ) logger.info("✅ 模型管理器初始化完成") base_model = config.get("MODEL_PATHS.TEXT_BASE") if base_model and model_manager.load_model("TEXT_BASE"): logger.info(f"✅ 基础模型已加载: {base_model}") except Exception as e: logger.error("❌ 模型管理器初始化失败: %s", str(e)) logger.error(traceback.format_exc()) return # 初始化认知系统接口 try: cognitive_system = CognitiveSystem( name=config.get("AGENT_NAME", "小蓝"), model_manager=model_manager, config=config.get("COGNITIVE_CONFIG", {}) ) logger.info("✅ 认知系统初始化完成 - 名称: %s", cognitive_system.name) except Exception as e: logger.error("❌ 认知系统初始化失败: %s", str(e)) logger.error(traceback.format_exc()) return # 初始化环境接口 try: environment_interface = EnvironmentInterface( name="环境接口", cognitive_system=cognitive_system, config={ "max_workers": config.get("MAX_WORKERS", 4), "response_timeout": config.get("AGENT_RESPONSE_TIMEOUT", 30.0), "log_level": config.get("ENVIRONMENT.LOG_LEVEL", "INFO") } ) logger.info("✅ 环境接口初始化完成") except Exception as e: logger.error("❌ 环境接口初始化失败: %s", str(e)) logger.error(traceback.format_exc()) return # 创建关闭处理函数 def shutdown_handler(): """系统关闭处理函数""" logger.info("🛑 收到关闭命令,开始关闭系统...") # 先停止命令监听器 if 'command_listener' in locals() and command_listener.running: command_listener.stop() # 然后停止环境接口 environment_interface.stop() logger.info("✅ 系统已完全关闭") sys.exit(0) # 自定义命令处理器 - 统一处理所有命令 def command_handler(command: str) -> dict: """处理用户命令 - 返回字典格式响应""" try: logger.info(f"🔄 处理命令: {command}") # 命令路由 cmd_lower = command.lower().strip() # 1. 系统命令处理 if cmd_lower in ["exit", "quit"]: return { "status": "success", "message": "接收到退出命令,正在关闭系统...", "action": "shutdown" } # 2. 其他命令转发给认知系统 logger.info("转发命令给认知系统: %s", command) response = cognitive_system.process_command(command) # 格式化输出响应 return { "status": "success", "content": response } except Exception as e: logger.error(f"❌ 命令处理错误: {str(e)}", exc_info=True) return { "status": "error", "error": f"处理命令时出错: {str(e)}", "command": command } # 启动命令监听器 - 作为唯一的输入来源 try: command_listener = start_command_listener( command_handler=command_handler, shutdown_handler=shutdown_handler ) logger.info("✅ 命令监听器已启动") except Exception as e: logger.error("❌ 命令监听器启动失败: %s", str(e)) logger.error(traceback.format_exc()) return logger.info("🌟 系统准备就绪! 输入命令控制模式 (输入 'help' 查看可用命令)") # 主循环:处理命令监听器的输出 try: while True: # 短暂休眠以避免CPU占用过高 time.sleep(0.1) # 检查是否收到退出信号 if not command_listener.running: break except KeyboardInterrupt: logger.info("🛑 收到中断信号,关闭系统...") shutdown_handler() except Exception as e: logger.error("❌ 主循环发生未预期错误: %s", str(e)) logger.error(traceback.format_exc()) shutdown_handler() if name == “main”: main() { “LOG_DIR”: “E:/AI_System/logs”, “CONFIG_DIR”: “E:/AI_System/config”, “MODEL_CACHE_DIR”: “E:/AI_System/model_cache”, “AGENT_NAME”: “小蓝”, “DEFAULT_USER”: “管理员”, “MAX_WORKERS”: 4, “AGENT_RESPONSE_TIMEOUT”: 30.0, “MODEL_BASE_PATH”: “E:/AI_Models”, // 模型路径 - 使用相对路径引用 “MODEL_PATHS”: { “TEXT_BASE”: “ M O D E L B A S E P A T H / Q w e n 2 − 7 B " , " T E X T C H A T " : " MODEL B ​ ASE P ​ ATH/Qwen2−7B","TEXT C ​ HAT":"{MODEL_BASE_PATH}/deepseek-7b-chat”, “MULTIMODAL”: “ M O D E L B A S E P A T H / d e e p s e e k − v l 2 " , " I M A G E G E N " : " MODEL B ​ ASE P ​ ATH/deepseek−vl2","IMAGE G ​ EN":"{MODEL_BASE_PATH}/sdxl”, “YI_VL”: “ M O D E L B A S E P A T H / y i − v l " , " S T A B L E D I F F U S I O N " : " MODEL B ​ ASE P ​ ATH/yi−vl","STABLE D ​ IFFUSION":"{MODEL_BASE_PATH}/stable-diffusion-xl-base-1.0” }, // 网络配置 - 扁平化处理 “NETWORK_HOST”: “0.0.0.0”, “FLASK_PORT”: 8000, “GRADIO_PORT”: 7860, // 数据库配置 - 敏感信息使用环境变量引用 “DB_HOST”: “localhost”, “DB_PORT”: 5432, “DB_NAME”: “ai_system”, “DB_USER”: “ai_user”, “DB_PASSWORD”: “${ENV:DB_PASSWORD}”, // 从环境变量获取 // 安全配置 - 密钥使用环境变量 “SECRET_KEY”: “${ENV:SECRET_KEY}”, // 从环境变量获取 // 环境配置 - 扁平化处理 “ENV_MODE”: “dev”, “LOG_LEVEL”: “DEBUG”, “USE_GPU”: true, // 目录配置 - 使用基础路径引用 “PROJECT_ROOT”: “E:/AI_System”, “DEFAULT_MODEL”: “ M O D E L B A S E P A T H / Q w e n 2 − 7 B " , " W E B U I D I R " : " MODEL B ​ ASE P ​ ATH/Qwen2−7B","WEB U ​ I D ​ IR":"{PROJECT_ROOT}/web_ui”, “AGENT_DIR”: “${PROJECT_ROOT}/agent” } 四个原文都发你了 改好后发我完整版!一定要完整 我直接复制 不想再思考了 懂?

filetype

我正在编写一个闲鱼客服,以上是XianyuAgent.py代码,以下是两个子代码文件:context_manager.py:import sqlite3 import os import json from datetime import datetime from loguru import logger class ChatContextManager: """ 聊天上下文管理器 负责存储和检索用户与商品之间的对话历史,使用SQLite数据库进行持久化存储。 支持按会话ID检索对话历史,以及议价次数统计。 """ def __init__(self, max_history=100, db_path="data/chat_history.db"): """ 初始化聊天上下文管理器 Args: max_history: 每个对话保留的最大消息数 db_path: SQLite数据库文件路径 """ self.max_history = max_history self.db_path = db_path self._init_db() def _init_db(self): """初始化数据库表结构""" # 确保数据库目录存在 db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 创建消息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, item_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, chat_id TEXT ) ''') # 检查是否需要添加chat_id字段(兼容旧数据库) cursor.execute("PRAGMA table_info(messages)") columns = [column[1] for column in cursor.fetchall()] if 'chat_id' not in columns: cursor.execute('ALTER TABLE messages ADD COLUMN chat_id TEXT') logger.info("已为messages表添加chat_id字段") # 创建索引以加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_user_item ON messages (user_id, item_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_chat_id ON messages (chat_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_timestamp ON messages (timestamp) ''') # 创建基于会话ID的议价次数表 cursor.execute(''' CREATE TABLE IF NOT EXISTS chat_bargain_counts ( chat_id TEXT PRIMARY KEY, count INTEGER DEFAULT 0, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 创建商品信息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS items ( item_id TEXT PRIMARY KEY, data TEXT NOT NULL, price REAL, description TEXT, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 创建人工标准回答表 cursor.execute(''' CREATE TABLE IF NOT EXISTS manual_answers ( id INTEGER PRIMARY KEY AUTOINCREMENT, question TEXT NOT NULL, answer TEXT NOT NULL, item_id TEXT NOT NULL, chat_id TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(question, item_id) ) ''') # 创建索引加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_manual_answers_question ON manual_answers (question) ''') # 创建索引加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_manual_answers_item_id ON manual_answers (item_id) ''') conn.commit() conn.close() logger.info(f"聊天历史数据库初始化完成: {self.db_path}") def save_item_info(self, item_id, item_data): """ 保存商品信息到数据库 Args: item_id: 商品ID item_data: 商品信息字典 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 从商品数据中提取有用信息 price = float(item_data.get('soldPrice', 0)) description = item_data.get('desc', '') # 将整个商品数据转换为JSON字符串 data_json = json.dumps(item_data, ensure_ascii=False) cursor.execute( """ INSERT INTO items (item_id, data, price, description, last_updated) VALUES (?, ?, ?, ?, ?) ON CONFLICT(item_id) DO UPDATE SET data = ?, price = ?, description = ?, last_updated = ? """, ( item_id, data_json, price, description, datetime.now().isoformat(), data_json, price, description, datetime.now().isoformat() ) ) conn.commit() logger.debug(f"商品信息已保存: {item_id}") except Exception as e: logger.error(f"保存商品信息时出错: {e}") conn.rollback() finally: conn.close() def get_item_info(self, item_id): """ 从数据库获取商品信息 Args: item_id: 商品ID Returns: dict: 商品信息字典,如果不存在返回None """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "SELECT data FROM items WHERE item_id = ?", (item_id,) ) result = cursor.fetchone() if result: return json.loads(result[0]) return None except Exception as e: logger.error(f"获取商品信息时出错: {e}") return None finally: conn.close() def add_message_by_chat(self, chat_id, user_id, item_id, role, content): """ 基于会话ID添加新消息到对话历史 Args: chat_id: 会话ID user_id: 用户ID (用户消息存真实user_id,助手消息存卖家ID) item_id: 商品ID role: 消息角色 (user/assistant) content: 消息内容 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 插入新消息,使用chat_id作为额外标识 cursor.execute( "INSERT INTO messages (user_id, item_id, role, content, timestamp, chat_id) VALUES (?, ?, ?, ?, ?, ?)", (user_id, item_id, role, content, datetime.now().isoformat(), chat_id) ) # 检查是否需要清理旧消息(基于chat_id) cursor.execute( """ SELECT id FROM messages WHERE chat_id = ? ORDER BY timestamp DESC LIMIT ?, 1 """, (chat_id, self.max_history) ) oldest_to_keep = cursor.fetchone() if oldest_to_keep: cursor.execute( "DELETE FROM messages WHERE chat_id = ? AND id < ?", (chat_id, oldest_to_keep[0]) ) conn.commit() except Exception as e: logger.error(f"添加消息到数据库时出错: {e}") conn.rollback() finally: conn.close() def get_context_by_chat(self, chat_id): """ 基于会话ID获取对话历史 Args: chat_id: 会话ID Returns: list: 包含对话历史的列表 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( """ SELECT role, content FROM messages WHERE chat_id = ? ORDER BY timestamp ASC LIMIT ? """, (chat_id, self.max_history) ) messages = [{"role": role, "content": content} for role, content in cursor.fetchall()] # 获取议价次数并添加到上下文中 bargain_count = self.get_bargain_count_by_chat(chat_id) if bargain_count > 0: messages.append({ "role": "system", "content": f"议价次数: {bargain_count}" }) except Exception as e: logger.error(f"获取对话历史时出错: {e}") messages = [] finally: conn.close() return messages def increment_bargain_count_by_chat(self, chat_id): """ 基于会话ID增加议价次数 Args: chat_id: 会话ID """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 使用UPSERT语法直接基于chat_id增加议价次数 cursor.execute( """ INSERT INTO chat_bargain_counts (chat_id, count, last_updated) VALUES (?, 1, ?) ON CONFLICT(chat_id) DO UPDATE SET count = count + 1, last_updated = ? """, (chat_id, datetime.now().isoformat(), datetime.now().isoformat()) ) conn.commit() logger.debug(f"会话 {chat_id} 议价次数已增加") except Exception as e: logger.error(f"增加议价次数时出错: {e}") conn.rollback() finally: conn.close() def get_bargain_count_by_chat(self, chat_id): """ 基于会话ID获取议价次数 Args: chat_id: 会话ID Returns: int: 议价次数 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "SELECT count FROM chat_bargain_counts WHERE chat_id = ?", (chat_id,) ) result = cursor.fetchone() return result[0] if result else 0 except Exception as e: logger.error(f"获取议价次数时出错: {e}") return 0 finally: conn.close() def update_manual_answer(self, question, new_answer, item_id, chat_id=None): """ 更新已保存的人工标准回答 Args: question: 原始问题 new_answer: 新的回答内容 item_id: 商品ID chat_id: 会话ID(可选) Returns: bool: 更新是否成功 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "UPDATE manual_answers SET answer = ?, created_at = ? WHERE question = ? AND item_id = ?", (new_answer, datetime.now().isoformat(), question, item_id) ) conn.commit() if cursor.rowcount > 0: logger.info(f"人工标准回答已更新: {question[:30]}...") return True else: logger.warning(f"未找到匹配的人工标准回答: {question[:30]}...") return False except Exception as e: logger.error(f"更新人工标准回答时出错: {e}") conn.rollback() return False finally: conn.close() def save_manual_answer(self, question, answer, item_id, chat_id=None): """ 保存人工标准回答到数据库 Args: question: 用户问题 answer: 人工回答 item_id: 商品ID chat_id: 会话ID(可选) """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "INSERT OR IGNORE INTO manual_answers (question, answer, item_id, chat_id, created_at) VALUES (?, ?, ?, ?, ?)", (question, answer, item_id, chat_id, datetime.now().isoformat()) ) conn.commit() logger.info(f"人工标准回答已保存: {question[:30]}...") except Exception as e: logger.error(f"保存人工标准回答时出错: {e}") conn.rollback() finally: conn.close() def get_manual_answer(self, question, item_id): """ 从数据库获取人工标准回答 Args: question: 用户问题 item_id: 商品ID Returns: str: 匹配的人工回答,如果没有找到则返回None """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 使用LIKE进行模糊匹配 cursor.execute( "SELECT answer FROM manual_answers WHERE item_id = ? AND question LIKE ? ORDER BY created_at DESC LIMIT 1", (item_id, f"%{question}%") ) result = cursor.fetchone() return result[0] if result else None except Exception as e: logger.error(f"获取人工标准回答时出错: {e}") return None finally: conn.close() main.py: import base64 import json import asyncio import time import os import websockets from loguru import logger from dotenv import load_dotenv from XianyuApis import XianyuApis import sys from utils.xianyu_utils import generate_mid, generate_uuid, trans_cookies, generate_device_id, decrypt from XianyuAgent import XianyuReplyBot from context_manager import ChatContextManager class XianyuLive: def __init__(self, cookies_str): self.xianyu = XianyuApis() self.base_url = 'wss://wss-goofish.dingtalk.com/' self.cookies_str = cookies_str self.cookies = trans_cookies(cookies_str) self.xianyu.session.cookies.update(self.cookies) # 直接使用 session.cookies.update self.myid = self.cookies['unb'] self.device_id = generate_device_id(self.myid) self.context_manager = ChatContextManager() # 心跳相关配置 self.heartbeat_interval = int(os.getenv("HEARTBEAT_INTERVAL", "15")) # 心跳间隔,默认15秒 self.heartbeat_timeout = int(os.getenv("HEARTBEAT_TIMEOUT", "5")) # 心跳超时,默认5秒 self.last_heartbeat_time = 0 self.last_heartbeat_response = 0 self.heartbeat_task = None self.ws = None # Token刷新相关配置 self.token_refresh_interval = int(os.getenv("TOKEN_REFRESH_INTERVAL", "3600")) # Token刷新间隔,默认1小时 self.token_retry_interval = int(os.getenv("TOKEN_RETRY_INTERVAL", "300")) # Token重试间隔,默认5分钟 self.last_token_refresh_time = 0 self.current_token = None self.token_refresh_task = None self.connection_restart_flag = False # 连接重启标志 # 人工接管相关配置 self.manual_mode_conversations = set() # 存储处于人工接管模式的会话ID self.manual_mode_timeout = int(os.getenv("MANUAL_MODE_TIMEOUT", "3600")) # 人工接管超时时间,默认1小时 self.manual_mode_timestamps = {} # 记录进入人工模式的时间 # 消息过期时间配置 self.message_expire_time = int(os.getenv("MESSAGE_EXPIRE_TIME", "300000")) # 消息过期时间,默认5分钟 # 人工接管关键词,从环境变量读取 self.toggle_keywords = os.getenv("TOGGLE_KEYWORDS", "。") async def refresh_token(self): """刷新token""" try: logger.info("开始刷新token...") # 获取新token(如果Cookie失效,get_token会直接退出程序) token_result = self.xianyu.get_token(self.device_id) if 'data' in token_result and 'accessToken' in token_result['data']: new_token = token_result['data']['accessToken'] self.current_token = new_token self.last_token_refresh_time = time.time() logger.info("Token刷新成功") return new_token else: logger.error(f"Token刷新失败: {token_result}") return None except Exception as e: logger.error(f"Token刷新异常: {str(e)}") return None async def token_refresh_loop(self): """Token刷新循环""" while True: try: current_time = time.time() # 检查是否需要刷新token if current_time - self.last_token_refresh_time >= self.token_refresh_interval: logger.info("Token即将过期,准备刷新...") new_token = await self.refresh_token() if new_token: logger.info("Token刷新成功,准备重新建立连接...") # 设置连接重启标志 self.connection_restart_flag = True # 关闭当前WebSocket连接,触发重连 if self.ws: await self.ws.close() break else: logger.error("Token刷新失败,将在{}分钟后重试".format(self.token_retry_interval // 60)) await asyncio.sleep(self.token_retry_interval) # 使用配置的重试间隔 continue # 每分钟检查一次 await asyncio.sleep(60) except Exception as e: logger.error(f"Token刷新循环出错: {e}") await asyncio.sleep(60) async def send_msg(self, ws, cid, toid, text): text = { "contentType": 4, "html": { "html": text } } text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8') msg = { "lwp": "/r/MessageSend/sendByReceiverScope", "headers": { "mid": generate_mid() }, "body": [ { "uuid": generate_uuid(), "cid": f"{cid}@goofish", "conversationType": 1, "content": { "contentType": 101, "custom": { "type": 1, "data": text_base64 } }, "redPointPolicy": 0, "extension": { "extJson": "{}" }, "ctx": { "appVersion": "1.0", "platform": "web" }, "mtags": {}, "msgReadStatusSetting": 1 }, { "actualReceivers": [ f"{toid}@goofish", f"{self.myid}@goofish" ] } ] } await ws.send(json.dumps(msg)) async def init(self, ws): # 如果没有token或者token过期,获取新token if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: logger.info("获取初始token...") await self.refresh_token() if not self.current_token: logger.error("无法获取有效token,初始化失败") raise Exception("Token获取失败") msg = { "lwp": "/reg", "headers": { "cache-header": "app-key token ua wv", "app-key": "444e9908a51d1cb236a27862abc769c9", "token": self.current_token, "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5", "dt": "j", "wv": "im:3,au:3,sy:6", "sync": "0,0;0;0;", "did": self.device_id, "mid": generate_mid() } } await ws.send(json.dumps(msg)) # 等待一段时间,确保连接注册完成 await asyncio.sleep(1) msg = {"lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": "5701741704675979 0"}, "body": [ {"pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0, "pts": int(time.time() * 1000) * 1000, "seq": 0, "timestamp": int(time.time() * 1000)}]} await ws.send(json.dumps(msg)) logger.info('连接注册完成') def is_chat_message(self, message): """判断是否为用户聊天消息""" try: return ( isinstance(message, dict) and "1" in message and isinstance(message["1"], dict) # 确保是字典类型 and "10" in message["1"] and isinstance(message["1"]["10"], dict) # 确保是字典类型 and "reminderContent" in message["1"]["10"] ) except Exception: return False def is_sync_package(self, message_data): """判断是否为同步包消息""" try: return ( isinstance(message_data, dict) and "body" in message_data and "syncPushPackage" in message_data["body"] and "data" in message_data["body"]["syncPushPackage"] and len(message_data["body"]["syncPushPackage"]["data"]) > 0 ) except Exception: return False def is_typing_status(self, message): """判断是否为用户正在输入状态消息""" try: return ( isinstance(message, dict) and "1" in message and isinstance(message["1"], list) and len(message["1"]) > 0 and isinstance(message["1"][0], dict) and "1" in message["1"][0] and isinstance(message["1"][0]["1"], str) and "@goofish" in message["1"][0]["1"] ) except Exception: return False def is_system_message(self, message): """判断是否为系统消息""" try: return ( isinstance(message, dict) and "3" in message and isinstance(message["3"], dict) and "needPush" in message["3"] and message["3"]["needPush"] == "false" ) except Exception: return False def check_toggle_keywords(self, message): """检查消息是否包含切换关键词""" message_stripped = message.strip() return message_stripped in self.toggle_keywords def is_manual_mode(self, chat_id): """检查特定会话是否处于人工接管模式""" if chat_id not in self.manual_mode_conversations: return False # 检查是否超时 current_time = time.time() if chat_id in self.manual_mode_timestamps: if current_time - self.manual_mode_timestamps[chat_id] > self.manual_mode_timeout: # 超时,自动退出人工模式 self.exit_manual_mode(chat_id) return False return True def enter_manual_mode(self, chat_id): """进入人工接管模式""" self.manual_mode_conversations.add(chat_id) self.manual_mode_timestamps[chat_id] = time.time() def exit_manual_mode(self, chat_id): """退出人工接管模式""" self.manual_mode_conversations.discard(chat_id) if chat_id in self.manual_mode_timestamps: del self.manual_mode_timestamps[chat_id] def toggle_manual_mode(self, chat_id): """切换人工接管模式""" if self.is_manual_mode(chat_id): self.exit_manual_mode(chat_id) return "auto" else: self.enter_manual_mode(chat_id) return "manual" async def handle_message(self, message_data, websocket): """处理所有类型的消息""" try: try: message = message_data ack = { "code": 200, "headers": { "mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(), "sid": message["headers"]["sid"] if "sid" in message["headers"] else '', } } if 'app-key' in message["headers"]: ack["headers"]["app-key"] = message["headers"]["app-key"] if 'ua' in message["headers"]: ack["headers"]["ua"] = message["headers"]["ua"] if 'dt' in message["headers"]: ack["headers"]["dt"] = message["headers"]["dt"] await websocket.send(json.dumps(ack)) except Exception as e: pass # 如果不是同步包消息,直接返回 if not self.is_sync_package(message_data): return # 获取并解密数据 sync_data = message_data["body"]["syncPushPackage"]["data"][0] # 检查是否有必要的字段 if "data" not in sync_data: logger.debug("同步包中无data字段") return # 解密数据 try: data = sync_data["data"] try: data = base64.b64decode(data).decode("utf-8") data = json.loads(data) # logger.info(f"无需解密 message: {data}") return except Exception as e: # logger.info(f'加密数据: {data}') decrypted_data = decrypt(data) message = json.loads(decrypted_data) except Exception as e: logger.error(f"消息解密失败: {e}") return try: # 判断是否为订单消息,需要自行编写付款后的逻辑 if message['3']['redReminder'] == '等待买家付款': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'等待买家 {user_url} 付款') return elif message['3']['redReminder'] == '交易关闭': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'买家 {user_url} 交易关闭') return elif message['3']['redReminder'] == '等待卖家发货': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'交易成功 {user_url} 等待卖家发货') return except: pass # 判断消息类型 if self.is_typing_status(message): logger.debug("用户正在输入") return elif not self.is_chat_message(message): logger.debug("其他非聊天消息") logger.debug(f"原始消息: {message}") return # 处理聊天消息 create_time = int(message["1"]["5"]) send_user_name = message["1"]["10"]["reminderTitle"] send_user_id = message["1"]["10"]["senderUserId"] send_message = message["1"]["10"]["reminderContent"] # 时效性验证(过滤5分钟前消息) if (time.time() * 1000 - create_time) > self.message_expire_time: logger.debug("过期消息丢弃") return # 获取商品ID和会话ID url_info = message["1"]["10"]["reminderUrl"] item_id = url_info.split("itemId=")[1].split("&")[0] if "itemId=" in url_info else None chat_id = message["1"]["2"].split('@')[0] if not item_id: logger.warning("无法获取商品ID") return # 检查是否为卖家(自己)发送的控制命令 if send_user_id == self.myid: logger.debug("检测到卖家消息,检查是否为控制命令") # 检查切换命令 if self.check_toggle_keywords(send_message): mode = self.toggle_manual_mode(chat_id) if mode == "manual": logger.info(f"🔴 已接管会话 {chat_id} (商品: {item_id})") else: logger.info(f"🟢 已恢复会话 {chat_id} 的自动回复 (商品: {item_id})") return # 记录卖家人工回复 self.context_manager.add_message_by_chat(chat_id, self.myid, item_id, "assistant", send_message) logger.info(f"卖家人工回复 (会话: {chat_id}, 商品: {item_id}): {send_message}") # 获取用户问题并保存人工标准回答 context = self.context_manager.get_context_by_chat(chat_id) user_question = None for msg in reversed(context): if msg['role'] == 'user': user_question = msg['content'] break if user_question: self.context_manager.save_manual_answer(user_question, send_message, item_id) logger.info(f"已保存人工标准回答 (商品: {item_id}): 问题: {user_question}, 回答: {send_message}") return logger.info(f"用户: {send_user_name} (ID: {send_user_id}), 商品: {item_id}, 会话: {chat_id}, 消息: {send_message}") # 添加用户消息到上下文 self.context_manager.add_message_by_chat(chat_id, send_user_id, item_id, "user", send_message) # 如果当前会话处于人工接管模式,不进行自动回复 if self.is_manual_mode(chat_id): logger.info(f"🔴 会话 {chat_id} 处于人工接管模式,跳过自动回复") return if self.is_system_message(message): logger.debug("系统消息,跳过处理") return # 从数据库中获取商品信息,如果不存在则从API获取并保存 item_info = self.context_manager.get_item_info(item_id) if not item_info: logger.info(f"从API获取商品信息: {item_id}") api_result = self.xianyu.get_item_info(item_id) if 'data' in api_result and 'itemDO' in api_result['data']: item_info = api_result['data']['itemDO'] # 保存商品信息到数据库 self.context_manager.save_item_info(item_id, item_info) else: logger.warning(f"获取商品信息失败: {api_result}") return else: logger.info(f"从数据库获取商品信息: {item_id}") item_description = f"{item_info['desc']};当前商品售卖价格为:{str(item_info['soldPrice'])}" # 获取完整的对话上下文 context = self.context_manager.get_context_by_chat(chat_id) # 生成回复 bot_reply = bot.generate_reply( send_message, item_description, context=context ) # 检查是否为价格意图,如果是则增加议价次数 if bot.last_intent == "price": self.context_manager.increment_bargain_count_by_chat(chat_id) bargain_count = self.context_manager.get_bargain_count_by_chat(chat_id) logger.info(f"用户 {send_user_name} 对商品 {item_id} 的议价次数: {bargain_count}") # 添加机器人回复到上下文 self.context_manager.add_message_by_chat(chat_id, self.myid, item_id, "assistant", bot_reply) logger.info(f"机器人回复: {bot_reply}") await self.send_msg(websocket, chat_id, send_user_id, bot_reply) except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") logger.debug(f"原始消息: {message_data}") async def send_heartbeat(self, ws): """发送心跳包并等待响应""" try: heartbeat_mid = generate_mid() heartbeat_msg = { "lwp": "/!", "headers": { "mid": heartbeat_mid } } await ws.send(json.dumps(heartbeat_msg)) self.last_heartbeat_time = time.time() logger.debug("心跳包已发送") return heartbeat_mid except Exception as e: logger.error(f"发送心跳包失败: {e}") raise async def heartbeat_loop(self, ws): """心跳维护循环""" while True: try: current_time = time.time() # 检查是否需要发送心跳 if current_time - self.last_heartbeat_time >= self.heartbeat_interval: await self.send_heartbeat(ws) # 检查上次心跳响应时间,如果超时则认为连接已断开 if (current_time - self.last_heartbeat_response) > (self.heartbeat_interval + self.heartbeat_timeout): logger.warning("心跳响应超时,可能连接已断开") break await asyncio.sleep(1) except Exception as e: logger.error(f"心跳循环出错: {e}") break async def handle_heartbeat_response(self, message_data): """处理心跳响应""" try: if ( isinstance(message_data, dict) and "headers" in message_data and "mid" in message_data["headers"] and "code" in message_data and message_data["code"] == 200 ): self.last_heartbeat_response = time.time() logger.debug("收到心跳响应") return True except Exception as e: logger.error(f"处理心跳响应出错: {e}") return False async def main(self): while True: try: # 重置连接重启标志 self.connection_restart_flag = False headers = { "Cookie": self.cookies_str, "Host": "wss-goofish.dingtalk.com", "Connection": "Upgrade", "Pragma": "no-cache", "Cache-Control": "no-cache", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", "Origin": "https://www.goofish.com", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", } async with websockets.connect(self.base_url, extra_headers=headers) as websocket: self.ws = websocket await self.init(websocket) # 初始化心跳时间 self.last_heartbeat_time = time.time() self.last_heartbeat_response = time.time() # 启动心跳任务 self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket)) # 启动token刷新任务 self.token_refresh_task = asyncio.create_task(self.token_refresh_loop()) async for message in websocket: try: # 检查是否需要重启连接 if self.connection_restart_flag: logger.info("检测到连接重启标志,准备重新建立连接...") break message_data = json.loads(message) # 处理心跳响应 if await self.handle_heartbeat_response(message_data): continue # 发送通用ACK响应 if "headers" in message_data and "mid" in message_data["headers"]: ack = { "code": 200, "headers": { "mid": message_data["headers"]["mid"], "sid": message_data["headers"].get("sid", "") } } # 复制其他可能的header字段 for key in ["app-key", "ua", "dt"]: if key in message_data["headers"]: ack["headers"][key] = message_data["headers"][key] await websocket.send(json.dumps(ack)) # 处理其他消息 await self.handle_message(message_data, websocket) except json.JSONDecodeError: logger.error("消息解析失败") except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") logger.debug(f"原始消息: {message}") except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket连接已关闭") except Exception as e: logger.error(f"连接发生错误: {e}") finally: # 清理任务 if self.heartbeat_task: self.heartbeat_task.cancel() try: await self.heartbeat_task except asyncio.CancelledError: pass if self.token_refresh_task: self.token_refresh_task.cancel() try: await self.token_refresh_task except asyncio.CancelledError: pass # 如果是主动重启,立即重连;否则等待5秒 if self.connection_restart_flag: logger.info("主动重启连接,立即重连...") else: logger.info("等待5秒后重连...") await asyncio.sleep(5) if __name__ == '__main__': # 加载环境变量 load_dotenv() # 配置日志级别 log_level = os.getenv("LOG_LEVEL", "DEBUG").upper() logger.remove() # 移除默认handler logger.add( sys.stderr, level=log_level, format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>" ) logger.info(f"日志级别设置为: {log_level}") cookies_str = os.getenv("COOKIES_STR") bot = XianyuReplyBot() xianyuLive = XianyuLive(cookies_str) # 常驻进程 asyncio.run(xianyuLive.main()) 现在在我加了techagent知识库检索(knowledge_base中product_summary.excel)的功能之后,客服机器人的消息发布出去了,只是可以把消息回复记录进日志

A玩具爆款孙大帅
  • 粉丝: 32
上传资源 快速赚钱