file-type

Py_Trans: 探索自定义Python语法的语法糖

ZIP文件

下载需积分: 11 | 2KB | 更新于2024-12-25 | 110 浏览量 | 5 评论 | 0 下载量 举报 收藏
download 立即下载
知识点说明: 1. 自定义Python语法概念 自定义Python语法指的是对Python语言的基础语法规则进行修改或扩展,以适应特定的编程需求或个人喜好。这种自定义语法通常需要通过一些特定的工具或框架来实现,以便能够被编译器或解释器识别和处理。Py_Trans便是这样的一个工具,它允许用户使用一种新的、更加简洁的语法来编写Python代码。 2. 定制语法与原始语法的对比 在描述中,Py_Trans提供的定制语法与传统的Python原始语法有着明显的差异。定制语法中,函数定义使用了箭头表示法,例如: ``` inc = ( x ) => x + 1 add ( x , y ) => x + y ``` 这与Python传统的lambda函数定义方式对比: ``` inc = lambda x : x + 1 add = lambda x, y : x + y ``` 可以看出来,自定义语法通过省略了关键字`lambda`并改变了参数和返回值的写法,使得代码更加紧凑和简洁。 此外,定制语法中还包含了一些扩展的表达式,例如: ``` print ( ! [ inc => 1..6 ] ) ``` 这个表达式使用`!`操作符和范围生成器`1..6`来创建一个序列并应用于函数`inc`,这在原始Python语法中需要使用列表推导式或其他循环结构来完成。 3. 函数调用与异常处理的简化 在自定义语法中,函数的调用和异常处理也有特别的表示方法。例如异常处理: ``` try inc ( "1" ) Exception print ( "Error:" , err ) ``` 这个例子显示了如何在调用函数`inc`时处理可能出现的异常,如果发生异常,则执行紧跟在`Exception`关键字后面的代码块。这与原始Python中使用`try`和`except`块的写法有显著差异。 4. 条件表达式的替换 在自定义语法中还展示了一个条件表达式的替代写法: ``` print ( ( x || False ) ? "Done" : "Failed" ) ``` 这段代码演示了一种类似于三元运算符的结构,用来根据变量`x`的布尔值来输出不同的结果。在Python原始语法中,相同的功能可以通过使用条件表达式(也称为三元运算符)`"Done" if x else "Failed"`来实现。 5. 自定义语法的实现机制 要实现自定义语法,一般需要通过以下步骤: - 词法分析(Lexical Analysis):将代码文本分解成一个个记号(Tokens)。 - 语法分析(Syntax Analysis):根据语言的语法规则,将记号组成语法结构,如表达式、语句等。 - 语义分析(Semantic Analysis):检查语法结构是否有意义,例如变量是否已定义、类型是否匹配等。 - 代码生成(Code Generation):将语法结构转化为可执行的代码。 Py_Trans可能提供了一套预定义的转换规则,用于将自定义语法转换为Python解释器能够理解的原始语法。实现这一过程可能涉及到编译原理中的各种技术,如解析器生成器、抽象语法树(AST)操作等。 6. 对比标签含义 【标签】:"python dialect syntatic-sugar langtrans " - python:指的是这种自定义语法基于Python语言。 - dialect:表明这是一种Python的方言,即与标准Python有差异的编程语言风格。 - syntatic-sugar:语法糖,指的是一些增加语言表达能力的特性,使得代码更加易读和易写,但并不增加语言的功能。 - langtrans:代表语言转换,指的是在不同编程语言或语言风格之间转换的过程。 7. 压缩包子文件的文件名称列表说明 【压缩包子文件的文件名称列表】: Py_Trans-main 这个文件名列表暗示了自定义Python语法工具的压缩包中可能包含了主要的执行文件,文件名"Py_Trans-main"表明这可能是整个包的核心组件或入口文件。 总结以上知识点,Py_Trans工具允许用户通过一套新的语法规则来编写Python代码,这种方式旨在通过简化和优化语法结构来增强代码的可读性和编写效率。这种自定义语法可能涉及到一系列编译原理技术,如词法分析、语法分析和代码生成。通过Py_Trans,开发者可以在保持Python语言强大能力的同时,享受到更加个性化和高效的编程体验。

相关推荐

filetype
filetype

第一部分: # -*- coding: gbk -*- import os import glob import cv2, math import numpy as np from PIL import Image import random import numpy as np import h5py import os from PIL import Image import scipy.io IMG_EXTENSIONS = [ '.jpg', '.JPG', '.jpeg', '.JPEG', '.png', '.PNG', '.ppm', '.PPM', '.bmp', '.BMP', ] #图片转矩阵 def pil_to_np(img_PIL): '''Converts image in PIL format to np.array. From W x H x C [0...255] to C x W x H [0..1] ''' ar = np.array(img_PIL) if len(ar.shape) == 3: ar = ar.transpose(2, 0, 1) else: ar = ar[None, ...] return ar.astype(np.float32) / 255. #矩阵转图片 def np_to_pil(img_np): '''Converts image in np.array format to PIL image. From C x W x H [0..1] to W x H x C [0...255] ''' ar = np.clip(img_np * 255, 0, 255).astype(np.uint8) if img_np.shape[0] == 1: ar = ar[0] else: ar = ar.transpose(1, 2, 0) return Image.fromarray(ar) # 判断文件夹中是否有以上类型图片,没有则返回0 def is_image_file(filename): #如果不都为空、0、false,则any()返回true return any(filename.endswith(extension) for extension in IMG_EXTENSIONS) #返回文件夹内文件绝对路径组成的列表 def make_dataset(dir): images = [] assert os.path.isdir(dir), '%s is not a valid directory' % dir # os.walk(top[, topdown=True[, onerror=None[, followlinks=False]]]) 通过在目录树中游走输出在目录中的文件名,top返回三项(root,dirs,files),分别代表: # 当前正在遍历的这个文件夹的本身的地址; list类型,内容是该文件夹中所有的目录的名字(不包括子目录); list类型,内容是该文件夹中所有的文件(不包括子目录) for root, _, fnames in sorted(os.walk(dir)): for fname in fnames: #print(fname) #拼接出图片的地址,并加入到images列表 path = os.path.join(root, fname) images.append(path) return images def hazy_simu(img_name,depth_or_trans_name,save_dir,pert_perlin = 0,airlight=0.76,is_imdepth=1,visual_range = [0.05, 0.1, 0.2, 0.5, 1]): """ This is the function for haze simulation with the parameters given by the paper: HAZERD: an outdoor scene dataset and benchmark for single image dehazing IEEE Internation Conference on Image Processing, Sep 2017 The paper and additional information on the project are available at: https://labsites.rochester.edu/gsharma/research/computer-vision/hazerd/ If you use this code, please cite our paper. IMPORTANT NOTE: The code uses the convention that pixel locations with a depth value of 0 correspond to objects that are very far and for the simulation of haze these are placed a distance of 2 times the visual range. Authors: Yanfu Zhang: [email protected] Li Ding: [email protected] Gaurav Sharma: [email protected] Last update: May 2017 python version update : Aug 2021 Authors : Haoying Sun : [email protected] parse inputs and set default values Set default parameter values. Some of these are used only if they are not passed in :param img_name: the directory and name of a haze-free RGB image, the name should be in the format of ..._RGB.jpg :param depth_name: the corresponding directory and name of the depth map, in .mat file, the name should be in the format of ..._depth.mat :param save_dir: the directory to save the simulated images :param pert_perlin: 1 for adding perlin noise, default 0 :param airlight: 3*1 matrix in the range [0,1] :param visual_range: a vector of any size :return: image name of hazy image """

filetype

我正在编写一个闲鱼客服,以上是XianyuAgent.py代码,以下是两个子代码文件:context_manager.py:import sqlite3 import os import json from datetime import datetime from loguru import logger class ChatContextManager: """ 聊天上下文管理器 负责存储和检索用户与商品之间的对话历史,使用SQLite数据库进行持久化存储。 支持按会话ID检索对话历史,以及议价次数统计。 """ def __init__(self, max_history=100, db_path="data/chat_history.db"): """ 初始化聊天上下文管理器 Args: max_history: 每个对话保留的最大消息数 db_path: SQLite数据库文件路径 """ self.max_history = max_history self.db_path = db_path self._init_db() def _init_db(self): """初始化数据库表结构""" # 确保数据库目录存在 db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 创建消息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, item_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, chat_id TEXT ) ''') # 检查是否需要添加chat_id字段(兼容旧数据库) cursor.execute("PRAGMA table_info(messages)") columns = [column[1] for column in cursor.fetchall()] if 'chat_id' not in columns: cursor.execute('ALTER TABLE messages ADD COLUMN chat_id TEXT') logger.info("已为messages表添加chat_id字段") # 创建索引以加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_user_item ON messages (user_id, item_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_chat_id ON messages (chat_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_timestamp ON messages (timestamp) ''') # 创建基于会话ID的议价次数表 cursor.execute(''' CREATE TABLE IF NOT EXISTS chat_bargain_counts ( chat_id TEXT PRIMARY KEY, count INTEGER DEFAULT 0, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 创建商品信息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS items ( item_id TEXT PRIMARY KEY, data TEXT NOT NULL, price REAL, description TEXT, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 创建人工标准回答表 cursor.execute(''' CREATE TABLE IF NOT EXISTS manual_answers ( id INTEGER PRIMARY KEY AUTOINCREMENT, question TEXT NOT NULL, answer TEXT NOT NULL, item_id TEXT NOT NULL, chat_id TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(question, item_id) ) ''') # 创建索引加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_manual_answers_question ON manual_answers (question) ''') # 创建索引加速查询 cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_manual_answers_item_id ON manual_answers (item_id) ''') conn.commit() conn.close() logger.info(f"聊天历史数据库初始化完成: {self.db_path}") def save_item_info(self, item_id, item_data): """ 保存商品信息到数据库 Args: item_id: 商品ID item_data: 商品信息字典 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 从商品数据中提取有用信息 price = float(item_data.get('soldPrice', 0)) description = item_data.get('desc', '') # 将整个商品数据转换为JSON字符串 data_json = json.dumps(item_data, ensure_ascii=False) cursor.execute( """ INSERT INTO items (item_id, data, price, description, last_updated) VALUES (?, ?, ?, ?, ?) ON CONFLICT(item_id) DO UPDATE SET data = ?, price = ?, description = ?, last_updated = ? """, ( item_id, data_json, price, description, datetime.now().isoformat(), data_json, price, description, datetime.now().isoformat() ) ) conn.commit() logger.debug(f"商品信息已保存: {item_id}") except Exception as e: logger.error(f"保存商品信息时出错: {e}") conn.rollback() finally: conn.close() def get_item_info(self, item_id): """ 从数据库获取商品信息 Args: item_id: 商品ID Returns: dict: 商品信息字典,如果不存在返回None """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "SELECT data FROM items WHERE item_id = ?", (item_id,) ) result = cursor.fetchone() if result: return json.loads(result[0]) return None except Exception as e: logger.error(f"获取商品信息时出错: {e}") return None finally: conn.close() def add_message_by_chat(self, chat_id, user_id, item_id, role, content): """ 基于会话ID添加新消息到对话历史 Args: chat_id: 会话ID user_id: 用户ID (用户消息存真实user_id,助手消息存卖家ID) item_id: 商品ID role: 消息角色 (user/assistant) content: 消息内容 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 插入新消息,使用chat_id作为额外标识 cursor.execute( "INSERT INTO messages (user_id, item_id, role, content, timestamp, chat_id) VALUES (?, ?, ?, ?, ?, ?)", (user_id, item_id, role, content, datetime.now().isoformat(), chat_id) ) # 检查是否需要清理旧消息(基于chat_id) cursor.execute( """ SELECT id FROM messages WHERE chat_id = ? ORDER BY timestamp DESC LIMIT ?, 1 """, (chat_id, self.max_history) ) oldest_to_keep = cursor.fetchone() if oldest_to_keep: cursor.execute( "DELETE FROM messages WHERE chat_id = ? AND id < ?", (chat_id, oldest_to_keep[0]) ) conn.commit() except Exception as e: logger.error(f"添加消息到数据库时出错: {e}") conn.rollback() finally: conn.close() def get_context_by_chat(self, chat_id): """ 基于会话ID获取对话历史 Args: chat_id: 会话ID Returns: list: 包含对话历史的列表 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( """ SELECT role, content FROM messages WHERE chat_id = ? ORDER BY timestamp ASC LIMIT ? """, (chat_id, self.max_history) ) messages = [{"role": role, "content": content} for role, content in cursor.fetchall()] # 获取议价次数并添加到上下文中 bargain_count = self.get_bargain_count_by_chat(chat_id) if bargain_count > 0: messages.append({ "role": "system", "content": f"议价次数: {bargain_count}" }) except Exception as e: logger.error(f"获取对话历史时出错: {e}") messages = [] finally: conn.close() return messages def increment_bargain_count_by_chat(self, chat_id): """ 基于会话ID增加议价次数 Args: chat_id: 会话ID """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 使用UPSERT语法直接基于chat_id增加议价次数 cursor.execute( """ INSERT INTO chat_bargain_counts (chat_id, count, last_updated) VALUES (?, 1, ?) ON CONFLICT(chat_id) DO UPDATE SET count = count + 1, last_updated = ? """, (chat_id, datetime.now().isoformat(), datetime.now().isoformat()) ) conn.commit() logger.debug(f"会话 {chat_id} 议价次数已增加") except Exception as e: logger.error(f"增加议价次数时出错: {e}") conn.rollback() finally: conn.close() def get_bargain_count_by_chat(self, chat_id): """ 基于会话ID获取议价次数 Args: chat_id: 会话ID Returns: int: 议价次数 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "SELECT count FROM chat_bargain_counts WHERE chat_id = ?", (chat_id,) ) result = cursor.fetchone() return result[0] if result else 0 except Exception as e: logger.error(f"获取议价次数时出错: {e}") return 0 finally: conn.close() def update_manual_answer(self, question, new_answer, item_id, chat_id=None): """ 更新已保存的人工标准回答 Args: question: 原始问题 new_answer: 新的回答内容 item_id: 商品ID chat_id: 会话ID(可选) Returns: bool: 更新是否成功 """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "UPDATE manual_answers SET answer = ?, created_at = ? WHERE question = ? AND item_id = ?", (new_answer, datetime.now().isoformat(), question, item_id) ) conn.commit() if cursor.rowcount > 0: logger.info(f"人工标准回答已更新: {question[:30]}...") return True else: logger.warning(f"未找到匹配的人工标准回答: {question[:30]}...") return False except Exception as e: logger.error(f"更新人工标准回答时出错: {e}") conn.rollback() return False finally: conn.close() def save_manual_answer(self, question, answer, item_id, chat_id=None): """ 保存人工标准回答到数据库 Args: question: 用户问题 answer: 人工回答 item_id: 商品ID chat_id: 会话ID(可选) """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute( "INSERT OR IGNORE INTO manual_answers (question, answer, item_id, chat_id, created_at) VALUES (?, ?, ?, ?, ?)", (question, answer, item_id, chat_id, datetime.now().isoformat()) ) conn.commit() logger.info(f"人工标准回答已保存: {question[:30]}...") except Exception as e: logger.error(f"保存人工标准回答时出错: {e}") conn.rollback() finally: conn.close() def get_manual_answer(self, question, item_id): """ 从数据库获取人工标准回答 Args: question: 用户问题 item_id: 商品ID Returns: str: 匹配的人工回答,如果没有找到则返回None """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # 使用LIKE进行模糊匹配 cursor.execute( "SELECT answer FROM manual_answers WHERE item_id = ? AND question LIKE ? ORDER BY created_at DESC LIMIT 1", (item_id, f"%{question}%") ) result = cursor.fetchone() return result[0] if result else None except Exception as e: logger.error(f"获取人工标准回答时出错: {e}") return None finally: conn.close() main.py: import base64 import json import asyncio import time import os import websockets from loguru import logger from dotenv import load_dotenv from XianyuApis import XianyuApis import sys from utils.xianyu_utils import generate_mid, generate_uuid, trans_cookies, generate_device_id, decrypt from XianyuAgent import XianyuReplyBot from context_manager import ChatContextManager class XianyuLive: def __init__(self, cookies_str): self.xianyu = XianyuApis() self.base_url = 'wss://wss-goofish.dingtalk.com/' self.cookies_str = cookies_str self.cookies = trans_cookies(cookies_str) self.xianyu.session.cookies.update(self.cookies) # 直接使用 session.cookies.update self.myid = self.cookies['unb'] self.device_id = generate_device_id(self.myid) self.context_manager = ChatContextManager() # 心跳相关配置 self.heartbeat_interval = int(os.getenv("HEARTBEAT_INTERVAL", "15")) # 心跳间隔,默认15秒 self.heartbeat_timeout = int(os.getenv("HEARTBEAT_TIMEOUT", "5")) # 心跳超时,默认5秒 self.last_heartbeat_time = 0 self.last_heartbeat_response = 0 self.heartbeat_task = None self.ws = None # Token刷新相关配置 self.token_refresh_interval = int(os.getenv("TOKEN_REFRESH_INTERVAL", "3600")) # Token刷新间隔,默认1小时 self.token_retry_interval = int(os.getenv("TOKEN_RETRY_INTERVAL", "300")) # Token重试间隔,默认5分钟 self.last_token_refresh_time = 0 self.current_token = None self.token_refresh_task = None self.connection_restart_flag = False # 连接重启标志 # 人工接管相关配置 self.manual_mode_conversations = set() # 存储处于人工接管模式的会话ID self.manual_mode_timeout = int(os.getenv("MANUAL_MODE_TIMEOUT", "3600")) # 人工接管超时时间,默认1小时 self.manual_mode_timestamps = {} # 记录进入人工模式的时间 # 消息过期时间配置 self.message_expire_time = int(os.getenv("MESSAGE_EXPIRE_TIME", "300000")) # 消息过期时间,默认5分钟 # 人工接管关键词,从环境变量读取 self.toggle_keywords = os.getenv("TOGGLE_KEYWORDS", "。") async def refresh_token(self): """刷新token""" try: logger.info("开始刷新token...") # 获取新token(如果Cookie失效,get_token会直接退出程序) token_result = self.xianyu.get_token(self.device_id) if 'data' in token_result and 'accessToken' in token_result['data']: new_token = token_result['data']['accessToken'] self.current_token = new_token self.last_token_refresh_time = time.time() logger.info("Token刷新成功") return new_token else: logger.error(f"Token刷新失败: {token_result}") return None except Exception as e: logger.error(f"Token刷新异常: {str(e)}") return None async def token_refresh_loop(self): """Token刷新循环""" while True: try: current_time = time.time() # 检查是否需要刷新token if current_time - self.last_token_refresh_time >= self.token_refresh_interval: logger.info("Token即将过期,准备刷新...") new_token = await self.refresh_token() if new_token: logger.info("Token刷新成功,准备重新建立连接...") # 设置连接重启标志 self.connection_restart_flag = True # 关闭当前WebSocket连接,触发重连 if self.ws: await self.ws.close() break else: logger.error("Token刷新失败,将在{}分钟后重试".format(self.token_retry_interval // 60)) await asyncio.sleep(self.token_retry_interval) # 使用配置的重试间隔 continue # 每分钟检查一次 await asyncio.sleep(60) except Exception as e: logger.error(f"Token刷新循环出错: {e}") await asyncio.sleep(60) async def send_msg(self, ws, cid, toid, text): text = { "contentType": 4, "html": { "html": text } } text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8') msg = { "lwp": "/r/MessageSend/sendByReceiverScope", "headers": { "mid": generate_mid() }, "body": [ { "uuid": generate_uuid(), "cid": f"{cid}@goofish", "conversationType": 1, "content": { "contentType": 101, "custom": { "type": 1, "data": text_base64 } }, "redPointPolicy": 0, "extension": { "extJson": "{}" }, "ctx": { "appVersion": "1.0", "platform": "web" }, "mtags": {}, "msgReadStatusSetting": 1 }, { "actualReceivers": [ f"{toid}@goofish", f"{self.myid}@goofish" ] } ] } await ws.send(json.dumps(msg)) async def init(self, ws): # 如果没有token或者token过期,获取新token if not self.current_token or (time.time() - self.last_token_refresh_time) >= self.token_refresh_interval: logger.info("获取初始token...") await self.refresh_token() if not self.current_token: logger.error("无法获取有效token,初始化失败") raise Exception("Token获取失败") msg = { "lwp": "/reg", "headers": { "cache-header": "app-key token ua wv", "app-key": "444e9908a51d1cb236a27862abc769c9", "token": self.current_token, "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5", "dt": "j", "wv": "im:3,au:3,sy:6", "sync": "0,0;0;0;", "did": self.device_id, "mid": generate_mid() } } await ws.send(json.dumps(msg)) # 等待一段时间,确保连接注册完成 await asyncio.sleep(1) msg = {"lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": "5701741704675979 0"}, "body": [ {"pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0, "pts": int(time.time() * 1000) * 1000, "seq": 0, "timestamp": int(time.time() * 1000)}]} await ws.send(json.dumps(msg)) logger.info('连接注册完成') def is_chat_message(self, message): """判断是否为用户聊天消息""" try: return ( isinstance(message, dict) and "1" in message and isinstance(message["1"], dict) # 确保是字典类型 and "10" in message["1"] and isinstance(message["1"]["10"], dict) # 确保是字典类型 and "reminderContent" in message["1"]["10"] ) except Exception: return False def is_sync_package(self, message_data): """判断是否为同步包消息""" try: return ( isinstance(message_data, dict) and "body" in message_data and "syncPushPackage" in message_data["body"] and "data" in message_data["body"]["syncPushPackage"] and len(message_data["body"]["syncPushPackage"]["data"]) > 0 ) except Exception: return False def is_typing_status(self, message): """判断是否为用户正在输入状态消息""" try: return ( isinstance(message, dict) and "1" in message and isinstance(message["1"], list) and len(message["1"]) > 0 and isinstance(message["1"][0], dict) and "1" in message["1"][0] and isinstance(message["1"][0]["1"], str) and "@goofish" in message["1"][0]["1"] ) except Exception: return False def is_system_message(self, message): """判断是否为系统消息""" try: return ( isinstance(message, dict) and "3" in message and isinstance(message["3"], dict) and "needPush" in message["3"] and message["3"]["needPush"] == "false" ) except Exception: return False def check_toggle_keywords(self, message): """检查消息是否包含切换关键词""" message_stripped = message.strip() return message_stripped in self.toggle_keywords def is_manual_mode(self, chat_id): """检查特定会话是否处于人工接管模式""" if chat_id not in self.manual_mode_conversations: return False # 检查是否超时 current_time = time.time() if chat_id in self.manual_mode_timestamps: if current_time - self.manual_mode_timestamps[chat_id] > self.manual_mode_timeout: # 超时,自动退出人工模式 self.exit_manual_mode(chat_id) return False return True def enter_manual_mode(self, chat_id): """进入人工接管模式""" self.manual_mode_conversations.add(chat_id) self.manual_mode_timestamps[chat_id] = time.time() def exit_manual_mode(self, chat_id): """退出人工接管模式""" self.manual_mode_conversations.discard(chat_id) if chat_id in self.manual_mode_timestamps: del self.manual_mode_timestamps[chat_id] def toggle_manual_mode(self, chat_id): """切换人工接管模式""" if self.is_manual_mode(chat_id): self.exit_manual_mode(chat_id) return "auto" else: self.enter_manual_mode(chat_id) return "manual" async def handle_message(self, message_data, websocket): """处理所有类型的消息""" try: try: message = message_data ack = { "code": 200, "headers": { "mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(), "sid": message["headers"]["sid"] if "sid" in message["headers"] else '', } } if 'app-key' in message["headers"]: ack["headers"]["app-key"] = message["headers"]["app-key"] if 'ua' in message["headers"]: ack["headers"]["ua"] = message["headers"]["ua"] if 'dt' in message["headers"]: ack["headers"]["dt"] = message["headers"]["dt"] await websocket.send(json.dumps(ack)) except Exception as e: pass # 如果不是同步包消息,直接返回 if not self.is_sync_package(message_data): return # 获取并解密数据 sync_data = message_data["body"]["syncPushPackage"]["data"][0] # 检查是否有必要的字段 if "data" not in sync_data: logger.debug("同步包中无data字段") return # 解密数据 try: data = sync_data["data"] try: data = base64.b64decode(data).decode("utf-8") data = json.loads(data) # logger.info(f"无需解密 message: {data}") return except Exception as e: # logger.info(f'加密数据: {data}') decrypted_data = decrypt(data) message = json.loads(decrypted_data) except Exception as e: logger.error(f"消息解密失败: {e}") return try: # 判断是否为订单消息,需要自行编写付款后的逻辑 if message['3']['redReminder'] == '等待买家付款': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'等待买家 {user_url} 付款') return elif message['3']['redReminder'] == '交易关闭': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'买家 {user_url} 交易关闭') return elif message['3']['redReminder'] == '等待卖家发货': user_id = message['1'].split('@')[0] user_url = f'https://www.goofish.com/personal?userId={user_id}' logger.info(f'交易成功 {user_url} 等待卖家发货') return except: pass # 判断消息类型 if self.is_typing_status(message): logger.debug("用户正在输入") return elif not self.is_chat_message(message): logger.debug("其他非聊天消息") logger.debug(f"原始消息: {message}") return # 处理聊天消息 create_time = int(message["1"]["5"]) send_user_name = message["1"]["10"]["reminderTitle"] send_user_id = message["1"]["10"]["senderUserId"] send_message = message["1"]["10"]["reminderContent"] # 时效性验证(过滤5分钟前消息) if (time.time() * 1000 - create_time) > self.message_expire_time: logger.debug("过期消息丢弃") return # 获取商品ID和会话ID url_info = message["1"]["10"]["reminderUrl"] item_id = url_info.split("itemId=")[1].split("&")[0] if "itemId=" in url_info else None chat_id = message["1"]["2"].split('@')[0] if not item_id: logger.warning("无法获取商品ID") return # 检查是否为卖家(自己)发送的控制命令 if send_user_id == self.myid: logger.debug("检测到卖家消息,检查是否为控制命令") # 检查切换命令 if self.check_toggle_keywords(send_message): mode = self.toggle_manual_mode(chat_id) if mode == "manual": logger.info(f"🔴 已接管会话 {chat_id} (商品: {item_id})") else: logger.info(f"🟢 已恢复会话 {chat_id} 的自动回复 (商品: {item_id})") return # 记录卖家人工回复 self.context_manager.add_message_by_chat(chat_id, self.myid, item_id, "assistant", send_message) logger.info(f"卖家人工回复 (会话: {chat_id}, 商品: {item_id}): {send_message}") # 获取用户问题并保存人工标准回答 context = self.context_manager.get_context_by_chat(chat_id) user_question = None for msg in reversed(context): if msg['role'] == 'user': user_question = msg['content'] break if user_question: self.context_manager.save_manual_answer(user_question, send_message, item_id) logger.info(f"已保存人工标准回答 (商品: {item_id}): 问题: {user_question}, 回答: {send_message}") return logger.info(f"用户: {send_user_name} (ID: {send_user_id}), 商品: {item_id}, 会话: {chat_id}, 消息: {send_message}") # 添加用户消息到上下文 self.context_manager.add_message_by_chat(chat_id, send_user_id, item_id, "user", send_message) # 如果当前会话处于人工接管模式,不进行自动回复 if self.is_manual_mode(chat_id): logger.info(f"🔴 会话 {chat_id} 处于人工接管模式,跳过自动回复") return if self.is_system_message(message): logger.debug("系统消息,跳过处理") return # 从数据库中获取商品信息,如果不存在则从API获取并保存 item_info = self.context_manager.get_item_info(item_id) if not item_info: logger.info(f"从API获取商品信息: {item_id}") api_result = self.xianyu.get_item_info(item_id) if 'data' in api_result and 'itemDO' in api_result['data']: item_info = api_result['data']['itemDO'] # 保存商品信息到数据库 self.context_manager.save_item_info(item_id, item_info) else: logger.warning(f"获取商品信息失败: {api_result}") return else: logger.info(f"从数据库获取商品信息: {item_id}") item_description = f"{item_info['desc']};当前商品售卖价格为:{str(item_info['soldPrice'])}" # 获取完整的对话上下文 context = self.context_manager.get_context_by_chat(chat_id) # 生成回复 bot_reply = bot.generate_reply( send_message, item_description, context=context ) # 检查是否为价格意图,如果是则增加议价次数 if bot.last_intent == "price": self.context_manager.increment_bargain_count_by_chat(chat_id) bargain_count = self.context_manager.get_bargain_count_by_chat(chat_id) logger.info(f"用户 {send_user_name} 对商品 {item_id} 的议价次数: {bargain_count}") # 添加机器人回复到上下文 self.context_manager.add_message_by_chat(chat_id, self.myid, item_id, "assistant", bot_reply) logger.info(f"机器人回复: {bot_reply}") await self.send_msg(websocket, chat_id, send_user_id, bot_reply) except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") logger.debug(f"原始消息: {message_data}") async def send_heartbeat(self, ws): """发送心跳包并等待响应""" try: heartbeat_mid = generate_mid() heartbeat_msg = { "lwp": "/!", "headers": { "mid": heartbeat_mid } } await ws.send(json.dumps(heartbeat_msg)) self.last_heartbeat_time = time.time() logger.debug("心跳包已发送") return heartbeat_mid except Exception as e: logger.error(f"发送心跳包失败: {e}") raise async def heartbeat_loop(self, ws): """心跳维护循环""" while True: try: current_time = time.time() # 检查是否需要发送心跳 if current_time - self.last_heartbeat_time >= self.heartbeat_interval: await self.send_heartbeat(ws) # 检查上次心跳响应时间,如果超时则认为连接已断开 if (current_time - self.last_heartbeat_response) > (self.heartbeat_interval + self.heartbeat_timeout): logger.warning("心跳响应超时,可能连接已断开") break await asyncio.sleep(1) except Exception as e: logger.error(f"心跳循环出错: {e}") break async def handle_heartbeat_response(self, message_data): """处理心跳响应""" try: if ( isinstance(message_data, dict) and "headers" in message_data and "mid" in message_data["headers"] and "code" in message_data and message_data["code"] == 200 ): self.last_heartbeat_response = time.time() logger.debug("收到心跳响应") return True except Exception as e: logger.error(f"处理心跳响应出错: {e}") return False async def main(self): while True: try: # 重置连接重启标志 self.connection_restart_flag = False headers = { "Cookie": self.cookies_str, "Host": "wss-goofish.dingtalk.com", "Connection": "Upgrade", "Pragma": "no-cache", "Cache-Control": "no-cache", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", "Origin": "https://www.goofish.com", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", } async with websockets.connect(self.base_url, extra_headers=headers) as websocket: self.ws = websocket await self.init(websocket) # 初始化心跳时间 self.last_heartbeat_time = time.time() self.last_heartbeat_response = time.time() # 启动心跳任务 self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket)) # 启动token刷新任务 self.token_refresh_task = asyncio.create_task(self.token_refresh_loop()) async for message in websocket: try: # 检查是否需要重启连接 if self.connection_restart_flag: logger.info("检测到连接重启标志,准备重新建立连接...") break message_data = json.loads(message) # 处理心跳响应 if await self.handle_heartbeat_response(message_data): continue # 发送通用ACK响应 if "headers" in message_data and "mid" in message_data["headers"]: ack = { "code": 200, "headers": { "mid": message_data["headers"]["mid"], "sid": message_data["headers"].get("sid", "") } } # 复制其他可能的header字段 for key in ["app-key", "ua", "dt"]: if key in message_data["headers"]: ack["headers"][key] = message_data["headers"][key] await websocket.send(json.dumps(ack)) # 处理其他消息 await self.handle_message(message_data, websocket) except json.JSONDecodeError: logger.error("消息解析失败") except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") logger.debug(f"原始消息: {message}") except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket连接已关闭") except Exception as e: logger.error(f"连接发生错误: {e}") finally: # 清理任务 if self.heartbeat_task: self.heartbeat_task.cancel() try: await self.heartbeat_task except asyncio.CancelledError: pass if self.token_refresh_task: self.token_refresh_task.cancel() try: await self.token_refresh_task except asyncio.CancelledError: pass # 如果是主动重启,立即重连;否则等待5秒 if self.connection_restart_flag: logger.info("主动重启连接,立即重连...") else: logger.info("等待5秒后重连...") await asyncio.sleep(5) if __name__ == '__main__': # 加载环境变量 load_dotenv() # 配置日志级别 log_level = os.getenv("LOG_LEVEL", "DEBUG").upper() logger.remove() # 移除默认handler logger.add( sys.stderr, level=log_level, format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>" ) logger.info(f"日志级别设置为: {log_level}") cookies_str = os.getenv("COOKIES_STR") bot = XianyuReplyBot() xianyuLive = XianyuLive(cookies_str) # 常驻进程 asyncio.run(xianyuLive.main()) 现在在我加了techagent知识库检索(knowledge_base中product_summary.excel)的功能之后,客服机器人的消息发布出去了,只是可以把消息回复记录进日志

filetype

``` import os import pandas as pd import numpy as np # 设置主文件夹路径 main_folder = 'C:/Users\Lenovo\Desktop\crcw不同端12k在0负载下\风扇端' # 创建空列表,用于存储数据和标签 data_list = [] label_list = [] def processTarget(): # 遍历主文件夹中的每个子文件夹,并处理每个.csv文件 for folder_name in sorted(os.listdir(main_folder)): folder_path = os.path.join(main_folder, folder_name) if os.path.isdir(folder_path): csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] print(f"Processing folder: {folder_name}, found {len(csv_files)} CSV files.") # 打印 CSV 文件数量 # 遍历该类别文件夹中的.csv文件 for filename in sorted(csv_files): file_path = os.path.join(folder_path, filename) # 读取.csv文件 csv_data = pd.read_csv(file_path, header=None) # 检查数据形状,确保至少有4列 if csv_data.shape[1] >= 4: # 确保至少有4列 csv_data = csv_data.iloc[:, [0, 1, 2]].values # 只保留前3列,忽略第4列(RPM) else: print(f"Skipping file {filename}, unexpected shape: {csv_data.shape}") continue # 跳过不符合要求的文件 # 将当前文件的数据添加到 data_list 中 data_list.append(csv_data) # 添加为二维数组 # 添加相应的标签 if '内圈故障' in folder_name: class_label = 0 # 0: 内圈故障 elif '球故障' in folder_name: class_label = 1 # 1: 球故障 else: continue # 如果文件夹名称不符合预期,跳过 label_list.append(class_label) # 直接添加标签 # 确保 data_list 和 label_list 不为空 if data_list and label_list: # 将数据转换为三维 NumPy 数组 data = np.array(data_list) # shape: (文件数量, 1000, 3) label = np.array(label_list) # shape: (文件数量,) return data, label else: raise ValueError("没有有效的数据可供处理。") # 调用 processTarget 函数 try: data0, label0 = processTarget() print(data0.shape) # 输出数据形状 print(label0.shape) # 输出标签形状 except ValueError as e: print(e)```File "C:/Users/Lenovo/AppData/Roaming/JetBrains/PyCharmCE2020.2/scratches/scratch_21.py", line 23 self.trans_matrix_ = np.dot(inv_K, Pt.reshape(nt, 1)) ^ SyntaxError: invalid syntax

filetype

main() File "/cache/z00836720/code/code/flatquant/./PanGu/inference_vlm.py", line 471, in main train_utils.cali_flat_quant(args, model, train_data_iterator, utils.DEV, logger=logger) File "/cache/z00836720/code/code/flatquant/flatquant/train_utils.py", line 269, in cali_flat_quant quant_out = layer(fp_inps[:, index:index + args.cali_bsz, ], attention_mask=attention_mask, rotary_pos_emb=rotary_pos_emb)[0] File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/cache/z00836720/code/code/flatquant/PanGu/pangu/core/transformer/pangu_transformer_layer.py", line 344, in forward output, context = self.forward_body( File "/cache/z00836720/code/code/flatquant/PanGu/pangu/core/transformer/pangu_transformer_layer.py", line 244, in forward_body attention_output_with_bias = self.self_attention( File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/cache/z00836720/code/code/flatquant/flatquant/pangu_utils.py", line 228, in forward query, key, value = self.get_query_key_value_tensors(hidden_states, key_value_states) # query, key, value: (1024,1,32,128) File "/cache/z00836720/code/code/flatquant/flatquant/pangu_utils.py", line 180, in get_query_key_value_tensors query_states, key_states, value_states = self._trans_forward_after_ln(hidden_states) File "/cache/z00836720/code/code/flatquant/flatquant/pangu_utils.py", line 153, in _trans_forward_after_ln query_states = self.q_proj(hidden_states, qa_trans=self.ln_trans) File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/cache/z00836720/code/code/flatquant/flatquant/flat_linear.py", line 80, in forward return self._train_forward(hidden_states, qa_trans=qa_trans, out_trans=out_trans) File "/cache/z00836720/code/code/flatquant/flatquant/flat_linear.py", line 68, in _train_forward hidden_states = self.act_quantizer(hidden_states) File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/home/ma-user/miniconda3/envs/pangu0813/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/cache/z00836720/code/code/flatquant/flatquant/quant_utils.py", line 77, in forward fq_x = self.fake_quant(x) File "/cache/z00836720/code/code/flatquant/flatquant/quant_utils.py", line 82, in fake_quant scale, zero = self.get_scale_zero(x) File "/cache/z00836720/code/code/flatquant/flatquant/quant_utils.py", line 105, in get_scale_zero print(f"{self.layer} {self.name} xmax kurtosis:", {kurt_xmax}) TypeError: unhashable type: 'numpy.ndarray'

filetype

db_config.py里有HOST='localhost' USER='root' PWD='root' DB_PORT=3306 DB_NAME='test'def get_header(tab_name='course'): ''' 根据表名称返回表的标题 :param tab_name: 表的名称 :return: list, 表的标题行 ''' header=None if tab_name=='course': header=['Cno', 'Cname', 'Cpno', 'Ccredit'] elif tab_name=='student': header=['Sno', 'Sname', 'Ssex', 'Sage', 'Sdept'] elif tab_name=='sc': header=['Sno', 'Cno', 'Grade'] elif tab_name=='homework': header=['Sno','Sname','Cno','Hyime','Grade'] elif tab_name=='ducsum': header=['Sno','Sname','Algrade','Avagrade','Rankwork'] return header def trans_data(dict_results): ''' 将[ {'Cno': '01', 'Cname': '高等数学', 'Cpno': None, 'Ccredit': 4}, {'Cno': '02', 'Cname': '大学物理', 'Cpno': '01', 'Ccredit': 3} ] 转换为:[('01', '高等数学', None, 4), ('02', '大学物理', '01', 3) ] :param dict_results: :return: ''' ll=[] for dict in dict_results: ll.append(list(dict.values())) return ll fetch_data_mysql.py里有import pymysql.cursors import db_config def getData(tab_name='tbcourse'): # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) # 使用pymysql.cursors.Cursor时,结果类似于: ( ('01', '高等数学', None, 4), ('02', '大学物理', '01', 3) ) # 使用pymysql.cursors.DictCursor时,结果类似于:[ {'Cno': '01', 'Cname': '高等数学', 'Cpno': None, 'Ccredit': 4}, {'Cno': '02', 'Cname': '大学物理', 'Cpno': '01', 'Ccredit': 3} ] with connection: with connection.cursor() as cursor: # Read a single record sql = f"SELECT * FROM {tab_name};" # cursor.execute(sql, ('[email protected]',)) cursor.execute(sql) results = cursor.fetchall() return results 这里有问题(下面) def getData_aim(tab_name='tbcourse'): # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) # 使用pymysql.cursors.Cursor时,结果类似于: ( ('01', '高等数学', None, 4), ('02', '大学物理', '01', 3) ) # 使用pymysql.cursors.DictCursor时,结果类似于:[ {'Cno': '01', 'Cname': '高等数学', 'Cpno': None, 'Ccredit': 4}, {'Cno': '02', 'Cname': '大学物理', 'Cpno': '01', 'Ccredit': 3} ] with connection: with connection.cursor() as cursor: # Read a single record sql = f"SELECT * FROM {tab_name} where department = 'Sno';" # cursor.execute(sql, ('[email protected]',)) cursor.execute(sql) results = cursor.fetchall() return results def del_row(cno, tab_name='tbCourse'): # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) with connection: try: with connection.cursor() as cursor: # 表格名称不能使用占位符,只能拼接字符串 sql = f"DELETE FROM {tab_name} where CNO=%s;" affected_rows=cursor.execute(sql, (cno,)) # connection is not autocommit by default. So you must commit to save your changes. connection.commit() except: return 0 return affected_rows def insert_row(values, tab_name='tbCourse'): # 这里我们只处理输入值数目为4个的情况 if ((not isinstance(values, list)) and (not isinstance(values,tuple))) or len(values)!=4: raise Exception('type error or length incorrect!!!') # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) with connection: try: with connection.cursor() as cursor: # 表格名称不能使用占位符,只能拼接字符串 sql = f"insert into {tab_name} values (%s, %s, %s, %s);" affected_rows = cursor.execute(sql, values) # connection is not autocommit by default. So you must commit to save your changes. connection.commit() except: return 0 return affected_rows def insert_workrow(values, tab_name='homework'): # 这里我们只处理输入值数目为5个的情况 if ((not isinstance(values, list)) and (not isinstance(values,tuple))) or len(values)!=5: raise Exception('type error or length incorrect!!!') # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) with connection: try: with connection.cursor() as cursor: # 表格名称不能使用占位符,只能拼接字符串 sql = f"insert into {tab_name} values (%s, %s, %s, %s, %s);" affected_rows = cursor.execute(sql, values) # connection is not autocommit by default. So you must commit to save your changes. connection.commit() except: return 0 return affected_rows if __name__ == '__main__': results=getData() print(results) for row in results: print(row) main_gui.py文件里有import FreeSimpleGUI as sg #需要切换为MySQL数据库,请将fetch_data_mssql更改为:fetch_data_mysql from fetch_data_mysql import getData, del_row, insert_row,insert_workrow,getData_aim这里和fetch.data.mysql里的问题一样连一起 from db_config import get_header def open_homework_search(): sublayout_homesearch = [ [sg.T('请输入要查询的学号:', justification='right'), sg.Input('', size=(50, None), key='-sub_Rname-')], [sg.Button('查询', key='-btn_search-')] # 修正2:添加缺失的查询按钮 ] subwin_homeworksearch = sg.Window('搜索', sublayout_homesearch, font=('微软雅黑', 13), modal=True) while True: ev2, dict_vals2 = subwin_homeworksearch.read() # 修正4:read()应小写 if ev2 in (sg.WIN_CLOSED, 'Exit'): break elif ev2 == '-btn_search-': # 修正5:匹配正确的按钮key student_id = dict_vals2['-sub_Rname-'].strip() or None # 修正6:实际执行查询逻辑(原代码直接获取全部数据) result = search_homework_by_student(student_id) sg.popup(f"找到{len(result)}条记录" if result else "未找到记录") break subwin_homeworksearch.close() return result if 'result' in locals() else None def open_rank_system(): layout = [ [sg.T('输入分数:', size=(10,None), justification='right'), sg.Input('', (20, None), key='-INPUT_SCORE-')], [sg.T('等级结果:', size=(10,None), justification='right'), sg.Text('', size=(20,1), key='-OUTPUT_GRADE-')], [sg.Button('转换', key='-btn_rank_system', size=(10,None))] ] window = sg.Window('分数等级转换器', layout, font=('微软雅黑', 13)) while True: event, values = window.read() if event == sg.WIN_CLOSED: break elif event == '-btn_rank_system': try: score = float(values['-INPUT_SCORE-']) if score >= 90: grade = 'A' elif score >= 80: grade = 'B' elif score >= 70: grade = 'C' elif score >= 60: grade = 'D' else: grade = 'E' window['-OUTPUT_GRADE-'].update(grade) except ValueError: sg.popup_error('请输入有效数字!') window.close() def open_hundredsystem(): layout = [ [sg.T('原始数值:', size=(10,None), justification='right'), sg.Input('', (20, None), key='-INPUT_VALUE-')], [sg.T('百分制结果:', size=(10,None), justification='right'), sg.Text('', size=(20,1), key='-OUTPUT_PERCENT-')], [sg.Button('转换', key='-btn_hundred_system', size=(10,None))] ] window = sg.Window('百分制转换器', layout, font=('微软雅黑', 13)) while True: event, values = window.read() if event == sg.WIN_CLOSED: break elif event == '-btn_hundred_system': try: raw_value = float(values['-INPUT_VALUE-']) percent_value = raw_value * 0.1 *0.1 window['-OUTPUT_PERCENT-'].update(f"{percent_value:.2f}%") except ValueError: sg.popup_error('请输入有效数字!') window.close() def open_addHomeworkWindow(): sublayout_homework = [ [sg.T('学号:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Sno-' )], [sg.T('姓名:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Sname-' )], [sg.T('课后任务:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Cno-' )], [sg.T('提交日期:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Hyime-' )], [sg.T('评分:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Grade-' )], [sg.Ok('发布', key='-btn_sub_addhomework-',size=(10,None))] ] subwin_addhomework = sg.Window('添加新任务(作业)', sublayout_homework, font=('微软雅黑', 13), modal=True) add_affected_rows = 0 while True: ev2, dict_vals2 = subwin_addhomework.Read() if ev2 == sg.WIN_CLOSED or ev2 == 'Exit': break elif ev2 == '-btn_sub_addhomework-': dict_vals2['-sub_Sno-'] = None if dict_vals2['-sub_Sno-']=='' else dict_vals2['-sub_Sno-'] add_affected_rows = insert_workrow(list(dict_vals2.values()), tab_name='homework') break subwin_addhomework.close() if add_affected_rows>0: sg.popup_timed('发布成功!') if add_affected_rows<=0: sg.popup_timed('发布失败!') return add_affected_rows def open_addCourseWindow(): sublayout_course = [ [sg.T('课程号:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_cno-' )], [sg.T('课程名称:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_cname-' )], [sg.T('先修课:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_cpno-' )], [sg.T('学分:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_ccredit-' )], [sg.Ok('确定', key='-btn_sub_addcourse-',size=(10,None))] ] subwin_addCourse = sg.Window('添加课程信息', sublayout_course, font=('微软雅黑', 13), modal=True) add_affected_rows = 0 while True: ev2, dict_vals2 = subwin_addCourse.Read() if ev2 == sg.WIN_CLOSED or ev2 == 'Exit': break elif ev2 == '-btn_sub_addcourse-': dict_vals2['-sub_cpno-'] = None if dict_vals2['-sub_cpno-']=='' else dict_vals2['-sub_cpno-'] add_affected_rows = insert_row(list(dict_vals2.values()), tab_name='tbCourse') break subwin_addCourse.close() if add_affected_rows>0: sg.popup_timed('插入课程信息成功!') return add_affected_rows def refreshData(table_key='-TABLE_course-', tab_name='tbcourse'): # 刷新表的显示 data_courses = getData(tab_name=tab_name) window[table_key].update(data_courses) return data_courses if __name__ == '__main__': sg.theme('DefaultNoMoreNagging') data_ducsum = getData(tab_name='ducsum') header_ducsum = get_header(tab_name='ducsum') data_homework = getData(tab_name='homework') header_homework = get_header(tab_name='homework') data_courses = getData(tab_name='tbcourse') header_courses = get_header(tab_name='course') data_students = getData(tab_name='tbStuinfo') header_students = get_header(tab_name='student') header_sc = get_header(tab_name='sc') Alsum_layout = [ [sg.Button(button_text='导入'),sg.Button(button_text='导出')], [sg.Table(values=data_ducsum, headings=header_ducsum, size=(550,400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_course-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a course table') ] ] Grade_layout = [ [sg.Column([ [sg.Text('评分标准模式', justification='center', expand_x=True)], [sg.Push(), sg.Button('百分制',key='-btn_hundred_system'), sg.Push(), sg.Button('分级制',key='-btn_rank_system'), sg.Push()] ], expand_x=True)] ] homework_layout = [ [sg.InputText(),sg.Button(button_text='搜索',key='-btn_homework_search'), sg.Button(button_text='添加',key='-btn_add_homework'), sg.Button(button_text='删除'), sg.Button(button_text='修改') ], [sg.Table(values=data_homework, headings=header_homework, size=(550,400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_course-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a course table') ], [sg.Button(button_text='上一页', key='-btn_refresh_course-'), sg.Button(button_text='下一页', key='-btn_del_course-') ] ] course_layout = [ [sg.Table(values=data_courses, headings=header_courses, size=(550,400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_course-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a course table') ], [sg.Button(button_text='刷新', key='-btn_refresh_course-'), sg.Button(button_text='删除', key='-btn_del_course-'), sg.Button(button_text='添加', key='-btn_add_course-') ] ] student_layout = [ [sg.Table(values=data_students, headings=header_students, size=(550, 400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_Student-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a student table') ], [sg.Button(button_text='刷新', key='-btn_refresh_student-'), sg.Button(button_text='删除', key='-btn_del_student-'), sg.Button(button_text='添加', key='-btn_add_student-') ] ] SC_layout = [[sg.Table(values=[[]], headings=header_sc, # size=(550, 400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_SC-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a SC table') ], [sg.Button(button_text='刷新', key='-btn_refresh_sc-'), sg.Button(button_text='删除', key='-btn_del_sc-'), sg.Button(button_text='添加', key='-btn_add_sc-') ] ] layout = [[sg.TabGroup([[sg.Tab('首页', homework_layout, key='-tab_homework'), sg.Tab('设置', Grade_layout,key='-tab_course-'), sg.Tab('汇总', Alsum_layout,key='-tab_course-'), sg.Tab('课程表', course_layout, key='-tab_course-'), sg.Tab('学生表', student_layout, key='-tab_student-'), sg.Tab('选课表', SC_layout, key='-tab_sc-')]], key='-group2-', title_color='red', selected_title_color='green', tab_location='left') ]] window = sg.Window('课后作业评分系统', layout, default_element_size=(10, 1),size=(740,580), font=('微软雅黑', 14)) cur_cno=None #当前的选择的课程号 cur_course_row_number=None #当前课程表中被选择的行号,行号从0开始,-1为标题行 subwin_addCourse_active = False subwin_addCourse = None while True: event, values = window.read() # sg.popup_non_blocking(event, values) # print(event) # print(values) if event == sg.WIN_CLOSED: # always, always give a way out! break if event=='-btn_del_course-': # 删除选择的课程号 aff_rows = del_row(cno=cur_cno, tab_name='tbCourse') print('影响的行数:', aff_rows) if(aff_rows>0): data_courses = list(data_courses) data_courses.pop(cur_course_row_number) #从数据表格中移除选择的行数据 window['-TABLE_course-'].update(data_courses) cur_course_row_number = None cur_cno = None elif event=='-btn_add_course-': # 添加课程 # 先弹窗,获取信息 add_affected_rows = open_addCourseWindow() if add_affected_rows>0: data_courses = refreshData(table_key='-TABLE_course-', tab_name='tbcourse') elif event=='-btn_refresh_course-': # 刷新课程表的显示 data_courses = refreshData(table_key='-TABLE_course-', tab_name='tbcourse') elif event=='-btn_add_homework': add_affected_row = open_addHomeworkWindow() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') elif event=='-btn_hundred_system': add_affected_row = open_hundredsystem() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') elif event=='-btn_rank_system': add_affected_row = open_rank_system() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') elif event=='-btn_homework_search': add_affected_row = open_homework_search() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') if isinstance(event, tuple): # TABLE CLICKED Event has value in format ('-TABLE-', '+CLICKED+', (row,col)) if event[0] == '-TABLE_course-': cur_course_row_number = event[2][0] col_number = event[2][1] if cur_course_row_number is not None: # 获取鼠标点击行的数据 cur_row_value = data_courses[cur_course_row_number] # 获取点击行的Cno cur_cno = cur_row_value[0] print('当前选择的课程号:', cur_cno) window.close() 根据上述代码修改优化,使代码能成功运行,实现功能

filetype

# # 【MySQL】和【SQL Server】 二选一 # # 1. MySQL数据库连接信息 HOST='localhost' USER='root' PWD='root' DB_PORT=3306 DB_NAME='test' # 2. SQL server数据库连接信息 MSSQL_HOST=r'LUOGEPC1103\SQLEXPRESS2014' MSSQL_USER='sa' MSSQL_PWD='123456' MSSQL_PORT=1433 MSSQL_DB_NAME='test' MSSQL_Chartset='GB2312' def get_header(tab_name='course'): ''' 根据表名称返回表的标题 :param tab_name: 表的名称 :return: list, 表的标题行 ''' header=None if tab_name=='course': header=['Cno', 'Cname', 'Cpno', 'Ccredit'] elif tab_name=='student': header=['Sno', 'Sname', 'Ssex', 'Sage', 'Sdept'] elif tab_name=='sc': header=['Sno', 'Cno', 'Grade'] elif tab_name=='homework': header=['Sno','Sname','Cno','Hyime','Grade'] elif tab_name=='ducsum': header=['Sno','Sname','Algrade','Avagrade','Rankwork'] return header def trans_data(dict_results): ''' 将[ {'Cno': '01', 'Cname': '高等数学', 'Cpno': None, 'Ccredit': 4}, {'Cno': '02', 'Cname': '大学物理', 'Cpno': '01', 'Ccredit': 3} ] 转换为:[('01', '高等数学', None, 4), ('02', '大学物理', '01', 3) ] :param dict_results: :return: ''' ll=[] for dict in dict_results: ll.append(list(dict.values())) return llimport pymysql.cursors import db_config def getData(tab_name='tbcourse'): # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) # 使用pymysql.cursors.Cursor时,结果类似于: ( ('01', '高等数学', None, 4), ('02', '大学物理', '01', 3) ) # 使用pymysql.cursors.DictCursor时,结果类似于:[ {'Cno': '01', 'Cname': '高等数学', 'Cpno': None, 'Ccredit': 4}, {'Cno': '02', 'Cname': '大学物理', 'Cpno': '01', 'Ccredit': 3} ] with connection: with connection.cursor() as cursor: # Read a single record sql = f"SELECT * FROM {tab_name};" # cursor.execute(sql, ('[email protected]',)) cursor.execute(sql) results = cursor.fetchall() return results def getData_aim(Sno, tab_name='tbcourse'): # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) # 使用pymysql.cursors.Cursor时,结果类似于: ( ('01', '高等数学', None, 4), ('02', '大学物理', '01', 3) ) # 使用pymysql.cursors.DictCursor时,结果类似于:[ {'Cno': '01', 'Cname': '高等数学', 'Cpno': None, 'Ccredit': 4}, {'Cno': '02', 'Cname': '大学物理', 'Cpno': '01', 'Ccredit': 3} ] with connection: with connection.cursor() as cursor: # Read a single record sql = f"SELECT * FROM {tab_name}where Sno=%s;" affected_rows=cursor.execute(sql, (Sno,)) # cursor.execute(sql, ('[email protected]',)) cursor.execute(sql) results = cursor.fetchall() return results def del_row(cno, tab_name='tbCourse'): # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) with connection: try: with connection.cursor() as cursor: # 表格名称不能使用占位符,只能拼接字符串 sql = f"DELETE FROM {tab_name} where CNO=%s;" affected_rows = cursor.execute(sql, values) # connection is not autocommit by default. So you must commit to save your changes. connection.commit() except: return 0 return affected_rows def insert_row(values, tab_name='tbCourse'): # 这里我们只处理输入值数目为4个的情况 if ((not isinstance(values, list)) and (not isinstance(values,tuple))) or len(values)!=4: raise Exception('type error or length incorrect!!!') # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) with connection: try: with connection.cursor() as cursor: # 表格名称不能使用占位符,只能拼接字符串 sql = f"insert into {tab_name} values (%s, %s, %s, %s);" affected_rows = cursor.execute(sql, values) # connection is not autocommit by default. So you must commit to save your changes. connection.commit() except: return 0 return affected_rows def insert_workrow(values, tab_name='homework'): # 这里我们只处理输入值数目为5个的情况 if ((not isinstance(values, list)) and (not isinstance(values,tuple))) or len(values)!=5: raise Exception('type error or length incorrect!!!') # Connect to the database connection = pymysql.connect(host=db_config.HOST, user=db_config.USER, port=db_config.DB_PORT, password=db_config.PWD, database=db_config.DB_NAME, cursorclass=pymysql.cursors.Cursor) with connection: try: with connection.cursor() as cursor: # 表格名称不能使用占位符,只能拼接字符串 sql = f"insert into {tab_name} values (%s, %s, %s, %s, %s);" affected_rows = cursor.execute(sql, values) # connection is not autocommit by default. So you must commit to save your changes. connection.commit() except: return 0 return affected_rows if __name__ == '__main__': results=getData() print(results) for row in results: print(row) import FreeSimpleGUI as sg #需要切换为MySQL数据库,请将fetch_data_mssql更改为:fetch_data_mysql from fetch_data_mysql import getData, del_row, insert_row,insert_workrow,getData_aim from db_config import get_header def open_homework_search(): sublayout_homesearch = [ [sg.T('请输入要查询的学号:', justification='right'), sg.Input('', size=(50, None), key='-sub_Rname-')], [sg.Button('查询', key='-btn_search-')] ] subwin_homeworksearch = sg.Window('搜索', sublayout_homesearch, font=('微软雅黑', 13), modal=True) while True: ev2, dict_vals2 = subwin_homeworksearch.read() if ev2 in (sg.WIN_CLOSED, 'Exit'): break elif ev2 == '-btn_search-': student_id = dict_vals2['-sub_Rname-'].strip() or None result = getData_aim(student_id) sg.popup(f"找到{len(result)}条记录" if result else "未找到记录") break subwin_homeworksearch.close() return result if 'result' in locals() else None def open_rank_system(): layout = [ [sg.T('输入分数:', size=(10,None), justification='right'), sg.Input('', (20, None), key='-INPUT_SCORE-')], [sg.T('等级结果:', size=(10,None), justification='right'), sg.Text('', size=(20,1), key='-OUTPUT_GRADE-')], [sg.Button('转换', key='-btn_rank_system', size=(10,None))] ] window = sg.Window('分数等级转换器', layout, font=('微软雅黑', 13)) while True: event, values = window.read() if event == sg.WIN_CLOSED: break elif event == '-btn_rank_system': try: score = float(values['-INPUT_SCORE-']) if score >= 90: grade = 'A' elif score >= 80: grade = 'B' elif score >= 70: grade = 'C' elif score >= 60: grade = 'D' else: grade = 'E' window['-OUTPUT_GRADE-'].update(grade) except ValueError: sg.popup_error('请输入有效数字!') window.close() def open_hundredsystem(): layout = [ [sg.T('原始数值:', size=(10,None), justification='right'), sg.Input('', (20, None), key='-INPUT_VALUE-')], [sg.T('百分制结果:', size=(10,None), justification='right'), sg.Text('', size=(20,1), key='-OUTPUT_PERCENT-')], [sg.Button('转换', key='-btn_hundred_system', size=(10,None))] ] window = sg.Window('百分制转换器', layout, font=('微软雅黑', 13)) while True: event, values = window.read() if event == sg.WIN_CLOSED: break elif event == '-btn_hundred_system': try: raw_value = float(values['-INPUT_VALUE-']) percent_value = raw_value * 0.1 *0.1 window['-OUTPUT_PERCENT-'].update(f"{percent_value:.2f}%") except ValueError: sg.popup_error('请输入有效数字!') window.close() def open_addHomeworkWindow(): sublayout_homework = [ [sg.T('学号:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Sno-' )], [sg.T('姓名:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Sname-' )], [sg.T('课后任务:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Cno-' )], [sg.T('提交日期:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Hyime-' )], [sg.T('评分:', size=(None,None), justification='right'), sg.Input('', (50, None), key='-sub_Grade-' )], [sg.Ok('发布', key='-btn_sub_addhomework-',size=(10,None))] ] subwin_addhomework = sg.Window('添加新任务(作业)', sublayout_homework, font=('微软雅黑', 13), modal=True) add_affected_rows = 0 while True: ev2, dict_vals2 = subwin_addhomework.Read() if ev2 == sg.WIN_CLOSED or ev2 == 'Exit': break elif ev2 == '-btn_sub_addhomework-': dict_vals2['-sub_Sno-'] = None if dict_vals2['-sub_Sno-']=='' else dict_vals2['-sub_Sno-'] add_affected_rows = insert_workrow(list(dict_vals2.values()), tab_name='homework') break subwin_addhomework.close() if add_affected_rows>0: sg.popup_timed('发布成功!') if add_affected_rows<=0: sg.popup_timed('发布失败!') return add_affected_rows def open_addCourseWindow(): sublayout_course = [ [sg.T('课程号:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_cno-' )], [sg.T('课程名称:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_cname-' )], [sg.T('先修课:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_cpno-' )], [sg.T('学分:', size=(10,None), justification='right'), sg.Input('', (50, None), key='-sub_ccredit-' )], [sg.Ok('确定', key='-btn_sub_addcourse-',size=(10,None))] ] subwin_addCourse = sg.Window('添加课程信息', sublayout_course, font=('微软雅黑', 13), modal=True) add_affected_rows = 0 while True: ev2, dict_vals2 = subwin_addCourse.Read() if ev2 == sg.WIN_CLOSED or ev2 == 'Exit': break elif ev2 == '-btn_sub_addcourse-': dict_vals2['-sub_cpno-'] = None if dict_vals2['-sub_cpno-']=='' else dict_vals2['-sub_cpno-'] add_affected_rows = insert_row(list(dict_vals2.values()), tab_name='tbCourse') break subwin_addCourse.close() if add_affected_rows>0: sg.popup_timed('插入课程信息成功!') return add_affected_rows def refreshData(table_key='-TABLE_course-', tab_name='tbcourse'): # 刷新表的显示 data_courses = getData(tab_name=tab_name) window[table_key].update(data_courses) return data_courses if __name__ == '__main__': sg.theme('DefaultNoMoreNagging') data_ducsum = getData(tab_name='ducsum') header_ducsum = get_header(tab_name='ducsum') data_homework = getData(tab_name='homework') header_homework = get_header(tab_name='homework') data_courses = getData(tab_name='tbcourse') header_courses = get_header(tab_name='course') data_students = getData(tab_name='tbStuinfo') header_students = get_header(tab_name='student') header_sc = get_header(tab_name='sc') Alsum_layout = [ [sg.Button(button_text='导入'),sg.Button(button_text='导出')], [sg.Table(values=data_ducsum, headings=header_ducsum, size=(550,400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_course-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a course table') ] ] Grade_layout = [ [sg.Column([ [sg.Text('评分标准模式', justification='center', expand_x=True)], [sg.Push(), sg.Button('百分制',key='-btn_hundred_system'), sg.Push(), sg.Button('分级制',key='-btn_rank_system'), sg.Push()] ], expand_x=True)] ] homework_layout = [ [sg.InputText(),sg.Button(button_text='搜索',key='-btn_homework_search'), sg.Button(button_text='添加',key='-btn_add_homework'), sg.Button(button_text='删除'), sg.Button(button_text='修改') ], [sg.Table(values=data_homework, headings=header_homework, size=(550,400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_course-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a course table') ], [sg.Button(button_text='上一页', key='-btn_refresh_course-'), sg.Button(button_text='下一页', key='-btn_del_course-') ] ] course_layout = [ [sg.Table(values=data_courses, headings=header_courses, size=(550,400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_course-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a course table') ], [sg.Button(button_text='刷新', key='-btn_refresh_course-'), sg.Button(button_text='删除', key='-btn_del_course-'), sg.Button(button_text='添加', key='-btn_add_course-') ] ] student_layout = [ [sg.Table(values=data_students, headings=header_students, size=(550, 400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_Student-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a student table') ], [sg.Button(button_text='刷新', key='-btn_refresh_student-'), sg.Button(button_text='删除', key='-btn_del_student-'), sg.Button(button_text='添加', key='-btn_add_student-') ] ] SC_layout = [[sg.Table(values=[[]], headings=header_sc, # size=(550, 400), max_col_width=100, auto_size_columns=False, display_row_numbers=False, justification='left', num_rows=15, alternating_row_color='lightyellow', key='-TABLE_SC-', selected_row_colors='red on yellow', enable_events=True, expand_x=True, expand_y=True, enable_click_events=True, # Comment out to not enable header and other clicks tooltip='This is a SC table') ], [sg.Button(button_text='刷新', key='-btn_refresh_sc-'), sg.Button(button_text='删除', key='-btn_del_sc-'), sg.Button(button_text='添加', key='-btn_add_sc-') ] ] layout = [[sg.TabGroup([[sg.Tab('首页', homework_layout, key='-tab_homework'), sg.Tab('设置', Grade_layout,key='-tab_course-'), sg.Tab('汇总', Alsum_layout,key='-tab_course-'), sg.Tab('课程表', course_layout, key='-tab_course-'), sg.Tab('学生表', student_layout, key='-tab_student-'), sg.Tab('选课表', SC_layout, key='-tab_sc-')]], key='-group2-', title_color='red', selected_title_color='green', tab_location='left') ]] window = sg.Window('课后作业评分系统', layout, default_element_size=(10, 1),size=(740,580), font=('微软雅黑', 14)) cur_cno=None #当前的选择的课程号 cur_course_row_number=None #当前课程表中被选择的行号,行号从0开始,-1为标题行 subwin_addCourse_active = False subwin_addCourse = None while True: event, values = window.read() # sg.popup_non_blocking(event, values) # print(event) # print(values) if event == sg.WIN_CLOSED: # always, always give a way out! break if event=='-btn_del_course-': # 删除选择的课程号 aff_rows = del_row(cno=cur_cno, tab_name='tbCourse') print('影响的行数:', aff_rows) if(aff_rows>0): data_courses = list(data_courses) data_courses.pop(cur_course_row_number) #从数据表格中移除选择的行数据 window['-TABLE_course-'].update(data_courses) cur_course_row_number = None cur_cno = None elif event=='-btn_add_course-': # 添加课程 # 先弹窗,获取信息 add_affected_rows = open_addCourseWindow() if add_affected_rows>0: data_courses = refreshData(table_key='-TABLE_course-', tab_name='tbcourse') elif event=='-btn_refresh_course-': # 刷新课程表的显示 data_courses = refreshData(table_key='-TABLE_course-', tab_name='tbcourse') elif event=='-btn_add_homework': add_affected_row = open_addHomeworkWindow() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') elif event=='-btn_hundred_system': add_affected_row = open_hundredsystem() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') elif event=='-btn_rank_system': add_affected_row = open_rank_system() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') elif event=='-btn_homework_search': add_affected_row = open_homework_search() if add_affected_row>0: data_courses = refreshData(table_key='-TABLE_course-',tab_name='tbcourse') if isinstance(event, tuple): # TABLE CLICKED Event has value in format ('-TABLE-', '+CLICKED+', (row,col)) if event[0] == '-TABLE_course-': cur_course_row_number = event[2][0] col_number = event[2][1] if cur_course_row_number is not None: # 获取鼠标点击行的数据 cur_row_value = data_courses[cur_course_row_number] # 获取点击行的Cno cur_cno = cur_row_value[0] print('当前选择的课程号:', cur_cno) window.close() 修改上述代码,使之功能正常运行

filetype

问题没有解决“dbtype, host, port, user, passwd, dbName, charset,hasHadoop = config_read("config.ini") dbName=dbName.replace(" ","").strip() print(dbtype, host, port, user, passwd, dbName, charset) if dbtype == 'mysql': DATABASES = { 'default': { # 'ENGINE': 'django.db.backends.sqlite3', # 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), 'ENGINE': 'django.db.backends.mysql', 'OPTIONS': { 'sql_mode': 'traditional', 'init_command': "SET sql_mode='traditional'", # STRICT_TRANS_TABLES }, 'NAME': dbName, 'USER': user, 'PASSWORD': passwd, 'HOST': host, 'PORT': port, 'charset': charset, 'TEST': { 'CHARSET': charset, 'COLLATION': 'utf8_general_ci', }, 'CONN_MAX_AGE':60 }, } else: print("请使用mysql5.5数据库") os._exit(1) if 'spark' in hasHadoop or 'hive' in hasHadoop: jar_fielpath=os.path.join(BASE_DIR,"bin","mysql-connector-java-8.0.32.jar") print(jar_fielpath) jar_fielpath="D:/work/jiudian/djangoci6g6vu6/bin/mysql-connector-java-8.0.32.jar" print(jar_fielpath) if os.path.exists(jar_fielpath)==False: print('请检查mysql-connector-java-8.0.32.jar文件是否存在') os._exit(1) try: spark = SparkSession.builder.config("spark.jars",jar_fielpath).master("local").appName("django").getOrCreate() print('dddd', e) except Exception as e: print('请检查spark配置',e) os._exit(1) jdbc_url=f'jdbc:mysql://{host}:{port}/{dbName}?user={user}&password={passwd}&driver=com.mysql.jdbc.Driver' # Password validation # https://docs.djangoproject.com/en/2.0/ref/settings/#auth-password-validators ”

资源评论
用户头像
以墨健康道
2025.07.14
该文档深入浅出讲解了如何扩展Python语法。
用户头像
Mrs.Wong
2025.07.14
通过Py_Trans,能够以更直观的方式实现函数式编程。
用户头像
Xhinking
2025.07.02
探索Python自定义语法的乐趣与挑战。
用户头像
UEgood雪姐姐
2025.05.09
适合想深度自定义编程语言的开发者阅读。
用户头像
老光私享
2025.04.05
Py_Trans为Python爱好者提供语法定制新体验。
卡卡乐乐
  • 粉丝: 47
上传资源 快速赚钱