merged_str = dt_obj.strftime('%Y-%m-%d %H:%M:%S')

时间: 2024-06-01 11:06:18 浏览: 199
这是一个Python中的日期时间对象(dt_obj)转化为字符串(merged_str)的操作,具体地,它将日期时间对象按照"%Y-%m-%d %H:%M:%S"的格式进行格式化转换成字符串。其中,%Y表示四位数的年份,%m表示两位数的月份,%d表示两位数的日期,%H表示24小时制下的小时数,%M表示分钟数,%S表示秒数。 举个例子,如果dt_obj为datetime.datetime(2021, 12, 31, 23, 59, 59),那么merged_str就会被赋值为"2021-12-31 23:59:59"。
相关问题

@echo off set ROBOT_REPORT_DIR=reports robot --outputdir %ROBOT_REPORT_DIR% --variablefile "variables.py" --test test_* --output output1.xml "Goodmanage\Goodlistsuite.robot" robot --outputdir %ROBOT_REPORT_DIR% --variablefile "variables.py" --test test_* --output output2.xml "Goodmanage\Singalgoodcombi.robot" robot --outputdir %ROBOT_REPORT_DIR% --variablefile "variables.py" --test test_* --output output3.xml "Goodmanage\Goodgroupsuite.robot" rebot --outputdir %ROBOT_REPORT_DIR% --output final_report.html --merge %ROBOT_REPORT_DIR%\output1.xml %ROBOT_REPORT_DIR%\output2.xml %ROBOT_REPORT_DIR%\output3.xml Cannot merge outputs containing different root suites. Original suite is 'Goodlistsuite' and merged is 'Singalgoodcombi'

根据你提供的批处理文件内容以及错误信息,问题出在合并测试报告时,原因是合并的输出包含不同的根测试套件。 在你的批处理文件中,你分别执行了三个测试套件,并将它们的输出分别保存为`output1.xml`、`output2.xml`和`output3.xml`。然后你使用`rebot`命令合并这些输出文件生成最终的测试报告。 然而,错误信息提示无法合并包含不同根测试套件的输出。具体来说,原始测试套件是'Goodlistsuite',而被合并的是'Singalgoodcombi'。 要解决这个问题,你需要确保合并的输出文件具有相同的根测试套件。可以通过以下方式进行修改: 1. 检查测试套件文件中的根测试套件名称,确保它们的名称相同。 2. 确保`Goodlistsuite.robot`、`Singalgoodcombi.robot`和`Goodgroupsuite.robot`文件中的根测试套件名称相同。 3. 在执行`robot`命令时,使用`--name`选项为每个测试套件指定一个相同的根测试套件名称。 例如: ```bat @echo off set ROBOT_REPORT_DIR=reports robot --outputdir %ROBOT_REPORT_DIR% --variablefile "variables.py" --name "MyRootSuite" --test test_* --output output1.xml "Goodmanage\Goodlistsuite.robot" robot --outputdir %ROBOT_REPORT_DIR% --variablefile "variables.py" --name "MyRootSuite" --test test_* --output output2.xml "Goodmanage\Singalgoodcombi.robot" robot --outputdir %ROBOT_REPORT_DIR% --variablefile "variables.py" --name "MyRootSuite" --test test_* --output output3.xml "Goodmanage\Goodgroupsuite.robot" rebot --outputdir %ROBOT_REPORT_DIR% --output final_report.html --merge %ROBOT_REPORT_DIR%\output1.xml %ROBOT_REPORT_DIR%\output2.xml %ROBOT_REPORT_DIR%\output3.xml ``` 请确保在执行`robot`命令时,将`--name`选项的值设置为相同的根测试套件名称。然后使用`rebot`命令合并输出文件生成最终的测试报告。这样应该可以解决合并输出文件时出现不同根测试套件的问题。

import pandas as pd import numpy as np from datetime import datetime, timedelta, time import os from typing import List, Dict, Optional # ====================== 基础参数 ====================== CHANGE_TIME = 3.5 # 换模时间(小时) REST_PERIODS = [ # 每日休息时段(小时) (12, 12.5), # 12:00-12:30 (16.5, 17), # 16:30-17:00 (0, 1) # 0:00-1:00 ] # ====================== 类定义 ====================== class Order: def __init__(self, order_id, product_id, quantity, delivery_date, capacity, weight, attr): self.order_id = order_id # 订单号 self.product_id = product_id # 产品号 self.quantity = quantity # 数量(pcs) self.delivery_date = delivery_date # 发货日期(datetime) self.capacity = capacity # 产能(pcs/h) self.weight = weight # 产品重量(g) self.processing_hours = quantity / capacity # 生产工时(h) self.start_time = None # 开始时间 self.end_time = None # 结束时间 self.machine = None # 分配机台 self.delay_days = 0 # 延期天数 self.attr = attr # 产品属性(1:易搭配,3:难生产) self.merged_id = None # 合单序号 self.plan_number = None # 计划序号 self.forward_plan = None # 前置任务计划序号 class MergedOrder: def __init__(self, product_id, total_quantity, delivery_dates, capacity, weight, attr, original_orders, merged_id): self.product_id = product_id # 产品号 self.total_quantity = total_quantity # 合并后总数量 self.delivery_dates = delivery_dates # 原始订单发货日期列表 self.capacity = capacity # 产能 self.weight = weight # 产品重量 self.processing_hours = total_quantity / capacity # 总生产工时 self.attr = attr # 产品属性 self.original_orders = original_orders # 原始订单对象列表 self.start_time = None # 开始时间 self.end_time = None # 结束时间 self.machine = None # 分配机台 self.merged_id = merged_id # 合单序号 self.plan_number = None # 计划序号 class Machine: def __init__(self, machine_id, initial_time=None): self.machine_id = machine_id # 机台号 self.available_time = initial_time or datetime(2025, 3, 1) # 初始可用时间 self.last_product = None # 上一生产产品 self.adjacent = [] # 相邻机台列表 self.schedule = [] # 排程计划(任务列表) self.last_plan_number = None # 上一任务计划序号 # ====================== 数据加载 ====================== def load_official_data(data_dir: str = "."): """ 加载附件数据 需修改data_dir为附件所在目录(例如:"C:/Users/你的用户名/Desktop/附件") """ try: # 读取附件1 attachment1 = pd.ExcelFile(os.path.join(data_dir, "附件1(1).xlsx")) orders_df = attachment1.parse( sheet_name="订单表", parse_dates=["发货日期(DeliveryDate)"], usecols=["订单号", "产品号", "订单数量", "发货日期(DeliveryDate)", "Weight/pcs(g)"] ) machine_initial_df = attachment1.parse( sheet_name="机台初始工作状态表", parse_dates=["生产开始时间"] ) holidays_df = attachment1.parse(sheet_name="放假日期表") holidays = {pd.to_datetime(row["放假日期"]).date() for _, row in holidays_df.iterrows()} # 读取附件2 attachment2 = pd.ExcelFile(os.path.join(data_dir, "附件2(1).xlsx")) product_capacity_df = attachment2.parse(sheet_name="产品工时计算参数表") product_machine_df = attachment2.parse(sheet_name="产品机台生产关系表") machine_relation_df = attachment2.parse(sheet_name="机台关系表") product_attr_df = attachment2.parse(sheet_name="产品属性表") # 构建数据映射 product_capacity = {row["产品号"]: row["Capacity(pcs/h)"] for _, row in product_capacity_df.iterrows()} product_weight = {row["产品号"]: row["Weight/pcs(g)"] for _, row in product_capacity_df.iterrows()} product_machines = {} for _, row in product_machine_df.iterrows(): valid_machines = [col for col in product_machine_df.columns if col != "产品号" and row[col] == 1] product_machines[row["产品号"]] = valid_machines machine_adjacent = {} for _, row in machine_relation_df.iterrows(): adjacent_machines = [col for col in machine_relation_df.columns if col != "机台号" and row[col] == 1] machine_adjacent[row["机台号"]] = adjacent_machines product_attrs = {row["产品号"]: row["属性"] for _, row in product_attr_df.iterrows()} # 补充订单表产能和重量 orders_df["Capacity(pcs/h)"] = orders_df["产品号"].map(product_capacity) orders_df["Weight/pcs(g)"] = orders_df["产品号"].map(product_weight) # 初始化机台 machine_initial_times = {row["机台号"]: row["生产开始时间"] for _, row in machine_initial_df.iterrows()} machines = [] for mid in machine_initial_times.keys(): machine = Machine(mid, machine_initial_times[mid]) machine.adjacent = machine_adjacent.get(mid, []) machines.append(machine) return orders_df, product_machines, product_attrs, machines, holidays except Exception as e: print(f"数据加载失败,请检查附件路径和格式:{e}") raise # ====================== 合单逻辑 ====================== def merge_orders(orders: List[Order], merge_days: int) -> tuple[List[MergedOrder], int]: merged_tasks = [] merge_count = 0 merged_id_counter = 1 product_groups = {} for order in orders: if order.product_id not in product_groups: product_groups[order.product_id] = [] product_groups[order.product_id].append(order) for product_id, group_orders in product_groups.items(): group_orders.sort(key=lambda x: x.delivery_date) current_merge = [group_orders[0]] for i in range(1, len(group_orders)): date_diff = (group_orders[i].delivery_date - current_merge[0].delivery_date).days if date_diff <= merge_days: current_merge.append(group_orders[i]) else: merged_tasks.append(MergedOrder( product_id=product_id, total_quantity=sum(o.quantity for o in current_merge), delivery_dates=[o.delivery_date for o in current_merge], capacity=current_merge[0].capacity, weight=current_merge[0].weight, attr=current_merge[0].attr, original_orders=current_merge, merged_id=merged_id_counter )) merge_count += 1 if len(current_merge) > 1 else 0 merged_id_counter += 1 current_merge = [group_orders[i]] merged_tasks.append(MergedOrder( product_id=product_id, total_quantity=sum(o.quantity for o in current_merge), delivery_dates=[o.delivery_date for o in current_merge], capacity=current_merge[0].capacity, weight=current_merge[0].weight, attr=current_merge[0].attr, original_orders=current_merge, merged_id=merged_id_counter )) if len(current_merge) > 1: merge_count += 1 merged_id_counter += 1 return merged_tasks, merge_count # ====================== 排程逻辑 ====================== def calculate_end_time(start_time: datetime, processing_hours: float, holidays: set) -> datetime: current = start_time remaining = processing_hours while remaining > 0: if current.date() in holidays: current = datetime.combine(current.date() + timedelta(days=1), time(0, 0)) continue day_start = datetime.combine(current.date(), time(0, 0)) intervals = [] prev_end = day_start for rest_start, rest_end in REST_PERIODS: rest_start_time = day_start + timedelta(hours=rest_start) rest_end_time = day_start + timedelta(hours=rest_end) if prev_end < rest_start_time: intervals.append((prev_end, rest_start_time)) prev_end = rest_end_time if prev_end < day_start + timedelta(hours=24): intervals.append((prev_end, day_start + timedelta(hours=24))) for (s, e) in intervals: if current < s: current = s if current >= e: continue available = (e - current).total_seconds() / 3600 use = min(remaining, available) current += timedelta(hours=use) remaining -= use if remaining <= 0: return current current = datetime.combine(current.date() + timedelta(days=1), time(0, 0)) return current def problem3_scheduling(merged_tasks: List[MergedOrder], product_machines: Dict, machines: List[Machine], holidays: set) -> tuple[List[Order], List[Dict]]: merged_tasks.sort(key=lambda x: (x.attr, min(x.delivery_dates))) machine_map = {m.machine_id: m for m in machines} plan_number = 1 detailed_results = [] for task in merged_tasks: candidate_machines = [m for m in machines if m.machine_id in product_machines.get(task.product_id, [])] if task.attr == 1: m03 = next((m for m in candidate_machines if m.machine_id == "M03"), None) if m03: candidate_machines = [m03] + [m for m in candidate_machines if m.machine_id != "M03"] best_machine = None best_end = None best_start = None for machine in candidate_machines: change = CHANGE_TIME if machine.last_product != task.product_id else 0 initial_start = machine.available_time + timedelta(hours=change) end_time = calculate_end_time(initial_start, task.processing_hours, holidays) adjusted_start = initial_start if task.attr == 3: for adj_id in machine.adjacent: adj_machine = machine_map.get(adj_id) if adj_machine and adj_machine.schedule: last_task = adj_machine.schedule[-1] if (last_task["product_attr"] == 3 and not (end_time <= last_task["start"] or adjusted_start >= last_task["end"])): adjusted_start = max(adjusted_start, last_task["end"]) end_time = calculate_end_time(adjusted_start, task.processing_hours, holidays) if best_end is None or end_time < best_end: best_end = end_time best_start = adjusted_start best_machine = machine if best_machine: task.start_time = best_start task.end_time = best_end task.machine = best_machine.machine_id task.plan_number = plan_number for original_order in task.original_orders: original_order.start_time = best_start original_order.end_time = best_end original_order.machine = best_machine.machine_id original_order.merged_id = task.merged_id original_order.plan_number = plan_number original_order.forward_plan = best_machine.last_plan_number original_order.delay_days = max(0, (best_end.date() - original_order.delivery_date.date()).days) detailed_results.append({ "计划序号(Plan Number)": plan_number, "生产计划安排机台号": best_machine.machine_id, "订单号PO": original_order.order_id, "产品号": original_order.product_id, "需要工时 (Requested Time--Hours)": round(original_order.processing_hours, 2), "生产计划开始时间": original_order.start_time.strftime("%Y-%m-%d %H:%M:%S"), "生产计划预计完成时间": original_order.end_time.strftime("%Y-%m-%d %H:%M:%S"), "发货日期(DeliveryDate)": original_order.delivery_date.strftime("%Y-%m-%d"), "订单数量": original_order.quantity, "Weight/pcs(g)": original_order.weight, "Capacity(pcs/h)": original_order.capacity, "最迟开始时间 (Late Start Time)": (original_order.delivery_date - timedelta( hours=original_order.processing_hours )).strftime("%Y-%m-%d %H:%M:%S"), "合单序号(Joined Order Number)": task.merged_id, "前置任务生产计划序号": original_order.forward_plan or "", "是否延期": "是" if original_order.delay_days > 0 else "否", "延期天数": original_order.delay_days }) best_machine.available_time = best_end best_machine.last_product = task.product_id best_machine.last_plan_number = plan_number best_machine.schedule.append({ "product": task.product_id, "start": best_start, "end": best_end, "product_attr": task.attr }) plan_number += 1 return detailed_results # ====================== 导出结果 ====================== def export_results(detailed_results: List[Dict], output_path: str = "附件5_生产排程计划表.xlsx"): """ 导出完整排程结果到Excel 需修改output_path为期望的导出路径(例如:"C:/Users/你的用户名/Desktop/附件5_结果.xlsx") """ df = pd.DataFrame(detailed_results) column_order = [ "计划序号(Plan Number)", "生产计划安排机台号", "订单号PO", "产品号", "需要工时 (Requested Time--Hours)", "生产计划开始时间", "生产计划预计完成时间", "发货日期(DeliveryDate)", "订单数量", "Weight/pcs(g)", "Capacity(pcs/h)", "最迟开始时间 (Late Start Time)", "合单序号(Joined Order Number)", "前置任务生产计划序号", "是否延期", "延期天数" ] df = df[column_order] df.to_excel(output_path, index=False) print(f"结果已导出至:{os.path.abspath(output_path)}") # ====================== 主函数 ====================== def main(): # ---------------------- 需要您修改的部分 ---------------------- # 1. 附件所在目录:将"."改为附件实际存放路径(例如:"C:/Users/你的用户名/Desktop/附件") DATA_DIR = "." # 2. 导出文件路径:将"附件5_生产排程计划表.xlsx"改为期望的导出路径(例如:"C:/Users/你的用户名/Desktop/结果.xlsx") OUTPUT_PATH = "附件5_生产排程计划表.xlsx" # 3. 合单天数:可根据需要修改(当前为7天,可改为15或30天) MERGE_DAYS = 7 # ------------------------------------------------------------- try: orders_df, product_machines, product_attrs, machines, holidays = load_official_data(DATA_DIR) original_orders = [] for _, row in orders_df.iterrows(): try: order = Order( order_id=row["订单号"], product_id=row["产品号"], quantity=row["订单数量"], delivery_date=row["发货日期(DeliveryDate)"], capacity=row["Capacity(pcs/h)"], weight=row["Weight/pcs(g)"], attr=product_attrs.get(row["产品号"], 2) ) original_orders.append(order) except Exception as e: print(f"订单初始化失败: {e} - 行数据: {row}") if not original_orders: raise ValueError("没有有效订单,请检查数据格式") merged_tasks, merge_count = merge_orders(original_orders, MERGE_DAYS) print(f"合单完成,共合并 {merge_count} 次") detailed_results = problem3_scheduling(merged_tasks, product_machines, machines, holidays) print(f"排程完成,共生成 {len(detailed_results)} 条计划") export_results(detailed_results, OUTPUT_PATH) except Exception as e: print(f"程序执行失败: {e}") if __name__ == "__main__": main() 最终结果呈现应该要有计划序号生产计划安排机台号,订单号,产品号需要工时生产,计划开始时间,生产计划预计完成时间,发货日期,订单数量,重量,产能,最迟开始时间,合单序号,前置任务生产计划序号,是否延期,延期天数

<think>我们有一个已经编写好的程序,用于解决生产排程问题。现在,我们需要按照要求导出结果,并且确保结果中包含指定的列。 根据要求,最终结果需要包含以下列: 1. 计划序号(Plan Number) 2. 生产计划安排机台号 3. 订单号PO 4. 产品号 5. 需要工时 (Requested Time--Hours) 6. 生产计划开始时间 7. 生产计划预计完成时间 8. 发货日期(DeliveryDate) 9. 订单数量 10. Weight/pcs(g) 11. Capacity(pcs/h) 12. 最迟开始时间 (Late Start Time) 13. 合单序号(Joined Order Number) 14. 前置任务生产计划序号 15. 是否延期 16. 延期天数 在程序中的`problem3_scheduling`函数里,我们已经为每个原始订单生成了一个字典,包含了这些信息,并添加到`detailed_results`列表中。 然后通过`export_results`函数导出为Excel。 但是,我们注意到在`detailed_results`的字典中,我们使用了以下键名: - "计划序号(Plan Number)": plan_number - "生产计划安排机台号": best_machine.machine_id - "订单号PO": original_order.order_id - "产品号": original_order.product_id - "需要工时 (Requested Time--Hours)": round(original_order.processing_hours, 2) - "生产计划开始时间": original_order.start_time.strftime("%Y-%m-%d %H:%M:%S") - "生产计划预计完成时间": original_order.end_time.strftime("%Y-%m-%d %H:%M:%S") - "发货日期(DeliveryDate)": original_order.delivery_date.strftime("%Y-%m-%d") - "订单数量": original_order.quantity - "Weight/pcs(g)": original_order.weight - "Capacity(pcs/h)": original_order.capacity - "最迟开始时间 (Late Start Time)": (original_order.delivery_date - timedelta(hours=original_order.processing_hours)).strftime("%Y-%m-%d %H:%M:%S") - "合单序号(Joined Order Number)": task.merged_id - "前置任务生产计划序号": original_order.forward_plan or "" (如果为None则用空字符串) - "是否延期": "是" if original_order.delay_days > 0 else "否" - "延期天数": original_order.delay_days 因此,我们只需要确保在导出时,列的顺序按照上述要求排列即可。 在`export_results`函数中,我们已经指定了列的顺序,所以导出没有问题。 但是,我们注意到在`main`函数中,我们调用了`export_results(detailed_results, OUTPUT_PATH)`,其中`detailed_results`就是包含所有字典的列表。 因此,我们只需要运行程序,并确保附件路径正确,即可生成结果。 另外,我们注意到在`load_official_data`函数中,我们读取了附件1和附件2,并进行了数据整合。在运行前,请确保将`DATA_DIR`设置为附件所在目录。 现在,我们提供一个完整的代码,并提示用户修改数据目录和输出路径。 注意:由于问题3中要求合单天数为7天,我们在主函数中设置了`MERGE_DAYS = 7`,但也可以根据需要修改为15或30天。 运行程序后,将在指定路径生成Excel文件。 如果遇到任何错误,程序会捕获异常并打印错误信息。 由于代码较长,我们不再重复贴出整个代码,但请注意以下几点: 1. 在`load_official_data`中,我们使用`parse_dates`参数解析日期列。 2. 在合并订单时,我们按照产品号分组,并在每个产品内按发货日期排序,然后合并相邻7天内的订单。 3. 排程时,我们考虑了换模时间、休息时段、节假日、相邻机台不能同时生产属性3的产品等约束。 4. 在计算结束时间时,我们使用了`calculate_end_time`函数,该函数会跳过休息时间和节假日。 现在,我们假设用户已经将代码保存为Python文件,并准备好附件数据,然后运行即可。 如果用户需要修改合单天数,可以在主函数中修改`MERGE_DAYS`变量。 最后,我们提供一次完整的代码,但为了简洁,我们只给出主函数和必要的部分,因为整个代码已经很长了。但请注意,下面的代码是完整的,只是省略了类定义和函数定义(但实际上在之前的代码中已经给出)。 但是,为了确保清晰,我们将整个代码结构再次完整写出,但省略了类定义和函数内部的注释(因为前面已经详细注释)。 由于代码过长,我们建议用户直接使用之前提供的完整代码。 注意:在运行之前,请确保安装了pandas和openpyxl(用于写入Excel)。 安装命令:pip install pandas openpyxl 如果遇到路径问题,请使用绝对路径。 下面,我们再次提供完整代码,但为了减少重复,我们只给出关键部分,并确保代码可以运行。 由于我们之前已经提供了完整代码,这里不再重复,但为了确保完整性,我们将关键部分重新整理如下:</think>我将为您提供一个完整的生产排程解决方案,包含详细的代码实现和解释。该方案基于您提供的需求,实现了订单合并、机台调度和生产计划生成功能。 ```python import pandas as pd import numpy as np from datetime import datetime, timedelta, time import os from typing import List, Dict, Optional # ====================== 基础参数配置 ====================== CHANGE_TIME = 3.5 # 换模时间(小时) REST_PERIODS = [ # 每日休息时段(小时) (12, 12.5), # 12:00-12:30 (16.5, 17), # 16:30-17:00 (0, 1) # 0:00-1:00 ] # ====================== 核心类定义 ====================== class Order: """订单类,表示单个生产订单""" def __init__(self, order_id, product_id, quantity, delivery_date, capacity, weight, attr): self.order_id = order_id # 订单号 self.product_id = product_id # 产品号 self.quantity = quantity # 数量(pcs) self.delivery_date = delivery_date # 发货日期(datetime) self.capacity = capacity # 产能(pcs/h) self.weight = weight # 产品重量(g) # 计算生产所需工时 self.processing_hours = quantity / capacity if capacity > 0 else 0 self.start_time = None # 生产开始时间 self.end_time = None # 生产结束时间 self.machine = None # 分配机台 self.delay_days = 0 # 延期天数 self.attr = attr # 产品属性(1:易搭配,3:难生产) self.merged_id = None # 合单序号 self.plan_number = None # 计划序号 self.forward_plan = None # 前置任务计划序号 class MergedOrder: """合并订单类,表示合并后的生产任务""" def __init__(self, product_id, total_quantity, delivery_dates, capacity, weight, attr, original_orders, merged_id): self.product_id = product_id # 产品号 self.total_quantity = total_quantity # 合并后总数量 self.delivery_dates = delivery_dates # 原始订单发货日期列表 self.capacity = capacity # 产能 self.weight = weight # 产品重量 # 计算总生产工时 self.processing_hours = total_quantity / capacity if capacity > 0 else 0 self.attr = attr # 产品属性 self.original_orders = original_orders # 原始订单对象列表 self.start_time = None # 开始时间 self.end_time = None # 结束时间 self.machine = None # 分配机台 self.merged_id = merged_id # 合单序号 self.plan_number = None # 计划序号 class Machine: """机台类,表示生产设备及其状态""" def __init__(self, machine_id, initial_time=None): self.machine_id = machine_id # 机台号 # 初始可用时间,默认为2025年3月1日 self.available_time = initial_time or datetime(2025, 3, 1) self.last_product = None # 上一生产产品 self.adjacent = [] # 相邻机台列表 self.schedule = [] # 排程计划(任务列表) self.last_plan_number = None # 上一任务计划序号 # ====================== 数据加载函数 ====================== def load_official_data(data_dir: str = "."): """ 加载附件数据 :param data_dir: 附件所在目录 :return: 订单数据、产品-机台关系、产品属性、机台列表、节假日集合 """ try: # 1. 读取附件1 - 订单和机台信息 attachment1 = pd.ExcelFile(os.path.join(data_dir, "附件1(1).xlsx")) # 加载订单表 orders_df = attachment1.parse( sheet_name="订单表", parse_dates=["发货日期(DeliveryDate)"], usecols=["订单号", "产品号", "订单数量", "发货日期(DeliveryDate)", "Weight/pcs(g)"] ) # 加载机台初始状态表 machine_initial_df = attachment1.parse( sheet_name="机台初始工作状态表", parse_dates=["生产开始时间"] ) # 加载放假日期表 holidays_df = attachment1.parse(sheet_name="放假日期表") holidays = {pd.to_datetime(row["放假日期"]).date() for _, row in holidays_df.iterrows()} # 2. 读取附件2 - 产品参数和关系 attachment2 = pd.ExcelFile(os.path.join(data_dir, "附件2(1).xlsx")) # 加载产品工时参数 product_capacity_df = attachment2.parse(sheet_name="产品工时计算参数表") # 加载产品-机台关系 product_machine_df = attachment2.parse(sheet_name="产品机台生产关系表") # 加载机台关系 machine_relation_df = attachment2.parse(sheet_name="机台关系表") # 加载产品属性 product_attr_df = attachment2.parse(sheet_name="产品属性表") # 3. 构建数据映射关系 # 产品产能映射 product_capacity = {row["产品号"]: row["Capacity(pcs/h)"] for _, row in product_capacity_df.iterrows()} # 产品重量映射 product_weight = {row["产品号"]: row["Weight/pcs(g)"] for _, row in product_capacity_df.iterrows()} # 产品可生产机台映射 product_machines = {} for _, row in product_machine_df.iterrows(): valid_machines = [col for col in product_machine_df.columns if col != "产品号" and row[col] == 1] product_machines[row["产品号"]] = valid_machines # 机台相邻关系映射 machine_adjacent = {} for _, row in machine_relation_df.iterrows(): adjacent_machines = [col for col in machine_relation_df.columns if col != "机台号" and row[col] == 1] machine_adjacent[row["机台号"]] = adjacent_machines # 产品属性映射 product_attrs = {row["产品号"]: row["属性"] for _, row in product_attr_df.iterrows()} # 4. 补充订单表数据 orders_df["Capacity(pcs/h)"] = orders_df["产品号"].map(product_capacity) orders_df["Weight/pcs(g)"] = orders_df["产品号"].map(product_weight) # 5. 初始化机台对象 machine_initial_times = {row["机台号"]: row["生产开始时间"] for _, row in machine_initial_df.iterrows()} machines = [] for mid in machine_initial_times.keys(): machine = Machine(mid, machine_initial_times[mid]) machine.adjacent = machine_adjacent.get(mid, []) machines.append(machine) return orders_df, product_machines, product_attrs, machines, holidays except Exception as e: print(f"数据加载失败,请检查附件路径和格式:{e}") raise # ====================== 订单合并逻辑 ====================== def merge_orders(orders: List[Order], merge_days: int) -> tuple[List[MergedOrder], int]: """ 合并相同产品的订单 :param orders: 订单列表 :param merge_days: 合并天数阈值 :return: 合并任务列表, 合并次数 """ merged_tasks = [] merge_count = 0 merged_id_counter = 1 # 按产品分组 product_groups = {} for order in orders: if order.product_id not in product_groups: product_groups[order.product_id] = [] product_groups[order.product_id].append(order) # 对每个产品组的订单按发货日期排序 for product_id, group_orders in product_groups.items(): group_orders.sort(key=lambda x: x.delivery_date) current_merge = [group_orders[0]] # 遍历该产品组的所有订单 for i in range(1, len(group_orders)): # 计算日期差 date_diff = (group_orders[i].delivery_date - current_merge[0].delivery_date).days if date_diff <= merge_days: current_merge.append(group_orders[i]) else: # 创建合并任务 merged_tasks.append(create_merged_task( product_id, current_merge, merged_id_counter )) # 更新合并计数 merge_count += 1 if len(current_merge) > 1 else 0 merged_id_counter += 1 current_merge = [group_orders[i]] # 处理最后一组订单 merged_tasks.append(create_merged_task( product_id, current_merge, merged_id_counter )) if len(current_merge) > 1: merge_count += 1 merged_id_counter += 1 return merged_tasks, merge_count def create_merged_task(product_id, orders, merged_id): """创建合并任务对象""" return MergedOrder( product_id=product_id, total_quantity=sum(o.quantity for o in orders), delivery_dates=[o.delivery_date for o in orders], capacity=orders[0].capacity, weight=orders[0].weight, attr=orders[0].attr, original_orders=orders, merged_id=merged_id ) # ====================== 生产调度核心逻辑 ====================== def calculate_end_time(start_time: datetime, processing_hours: float, holidays: set) -> datetime: """ 计算考虑休息时间和节假日的实际结束时间 :param start_time: 开始时间 :param processing_hours: 需要工时 :param holidays: 节假日集合 :return: 实际结束时间 """ current = start_time remaining = processing_hours while remaining > 0: # 检查是否为节假日 if current.date() in holidays: current = datetime.combine(current.date() + timedelta(days=1), time(0, 0)) continue # 计算当天的有效工作时间段 day_start = datetime.combine(current.date(), time(0, 0)) intervals = [] prev_end = day_start # 根据休息时段划分工作时间段 for rest_start, rest_end in REST_PERIODS: rest_start_time = day_start + timedelta(hours=rest_start) rest_end_time = day_start + timedelta(hours=rest_end) if prev_end < rest_start_time: intervals.append((prev_end, rest_start_time)) prev_end = rest_end_time if prev_end < day_start + timedelta(hours=24): intervals.append((prev_end, day_start + timedelta(hours=24))) # 在每个时间段内工作 for (s, e) in intervals: if current < s: current = s if current >= e: continue # 计算该时间段内可用时间 available = (e - current).total_seconds() / 3600 use = min(remaining, available) current += timedelta(hours=use) remaining -= use if remaining <= 0: return current # 转到下一天 current = datetime.combine(current.date() + timedelta(days=1), time(0, 0)) return current def problem3_scheduling(merged_tasks: List[MergedOrder], product_machines: Dict, machines: List[Machine], holidays: set) -> tuple[List[Order], List[Dict]]: """ 生产调度主函数 :param merged_tasks: 合并任务列表 :param product_machines: 产品-机台关系 :param machines: 机台列表 :param holidays: 节假日集合 :return: 详细排程结果 """ # 任务排序:属性优先(易搭配在前),然后按最早发货日期 merged_tasks.sort(key=lambda x: (x.attr, min(x.delivery_dates))) machine_map = {m.machine_id: m for m in machines} plan_number = 1 detailed_results = [] for task in merged_tasks: # 获取可生产该产品的机台 candidate_machines = [m for m in machines if m.machine_id in product_machines.get(task.product_id, [])] # 特殊处理:易搭配产品优先分配到M03机台 if task.attr == 1: m03 = next((m for m in candidate_machines if m.machine_id == "M03"), None) if m03: candidate_machines = [m03] + [m for m in candidate_machines if m.machine_id != "M03"] best_machine = None best_end = None best_start = None # 评估每个候选机台 for machine in candidate_machines: # 计算换模时间 change = CHANGE_TIME if machine.last_product != task.product_id else 0 initial_start = machine.available_time + timedelta(hours=change) # 计算实际结束时间 end_time = calculate_end_time(initial_start, task.processing_hours, holidays) adjusted_start = initial_start # 特殊处理:难生产产品需考虑相邻机台 if task.attr == 3: for adj_id in machine.adjacent: adj_machine = machine_map.get(adj_id) if adj_machine and adj_machine.schedule: last_task = adj_machine.schedule[-1] # 检查时间冲突 if (last_task["product_attr"] == 3 and not (end_time <= last_task["start"] or adjusted_start >= last_task["end"])): # 调整开始时间以避免冲突 adjusted_start = max(adjusted_start, last_task["end"]) end_time = calculate_end_time(adjusted_start, task.processing_hours, holidays) # 选择最早结束的机台 if best_end is None or end_time < best_end: best_end = end_time best_start = adjusted_start best_machine = machine # 分配任务到最佳机台 if best_machine: # 更新任务信息 task.start_time = best_start task.end_time = best_end task.machine = best_machine.machine_id task.plan_number = plan_number # 更新原始订单信息 for original_order in task.original_orders: original_order.start_time = best_start original_order.end_time = best_end original_order.machine = best_machine.machine_id original_order.merged_id = task.merged_id original_order.plan_number = plan_number original_order.forward_plan = best_machine.last_plan_number # 计算延期天数 original_order.delay_days = max(0, (best_end.date() - original_order.delivery_date.date()).days) # 构建结果字典 detailed_results.append({ "计划序号(Plan Number)": plan_number, "生产计划安排机台号": best_machine.machine_id, "订单号PO": original_order.order_id, "产品号": original_order.product_id, "需要工时 (Requested Time--Hours)": round(original_order.processing_hours, 2), "生产计划开始时间": original_order.start_time.strftime("%Y-%m-%d %H:%M:%S"), "生产计划预计完成时间": original_order.end_time.strftime("%Y-%m-%d %H:%M:%S"), "发货日期(DeliveryDate)": original_order.delivery_date.strftime("%Y-%m-%d"), "订单数量": original_order.quantity, "Weight/pcs(g)": original_order.weight, "Capacity(pcs/h)": original_order.capacity, "最迟开始时间 (Late Start Time)": (original_order.delivery_date - timedelta( hours=original_order.processing_hours )).strftime("%Y-%m-%d %H:%M:%S"), "合单序号(Joined Order Number)": task.merged_id, "前置任务生产计划序号": original_order.forward_plan or "", "是否延期": "是" if original_order.delay_days > 0 else "否", "延期天数": original_order.delay_days }) # 更新机台状态 best_machine.available_time = best_end best_machine.last_product = task.product_id best_machine.last_plan_number = plan_number best_machine.schedule.append({ "product": task.product_id, "start": best_start, "end": best_end, "product_attr": task.attr }) plan_number += 1 return detailed_results # ====================== 结果导出函数 ====================== def export_results(detailed_results: List[Dict], output_path: str = "附件5_生产排程计划表.xlsx"): """ 导出完整排程结果到Excel :param detailed_results: 详细排程结果 :param output_path: 输出文件路径 """ # 创建DataFrame df = pd.DataFrame(detailed_results) # 定义列顺序 column_order = [ "计划序号(Plan Number)", "生产计划安排机台号", "订单号PO", "产品号", "需要工时 (Requested Time--Hours)", "生产计划开始时间", "生产计划预计完成时间", "发货日期(DeliveryDate)", "订单数量", "Weight/pcs(g)", "Capacity(pcs/h)", "最迟开始时间 (Late Start Time)", "合单序号(Joined Order Number)", "前置任务生产计划序号", "是否延期", "延期天数" ] # 按指定顺序排列列 df = df[column_order] # 导出到Excel df.to_excel(output_path, index=False) print(f"结果已导出至:{os.path.abspath(output_path)}") # ====================== 主执行函数 ====================== def main(): """主函数,执行整个排程流程""" # ---------------------- 用户配置区域 ---------------------- # 1. 附件所在目录:将"."改为附件实际存放路径 DATA_DIR = "." # 2. 导出文件路径:设置结果输出路径 OUTPUT_PATH = "附件5_生产排程计划表.xlsx" # 3. 合单天数:可根据需要修改(7/15/30天) MERGE_DAYS = 7 # -------------------------------------------------------- try: print("开始加载数据...") # 加载数据 orders_df, product_machines, product_attrs, machines, holidays = load_official_data(DATA_DIR) print("初始化订单对象...") # 初始化订单对象 original_orders = [] for _, row in orders_df.iterrows(): order = Order( order_id=row["订单号"], product_id=row["产品号"], quantity=row["订单数量"], delivery_date=row["发货日期(DeliveryDate)"], capacity=row["Capacity(pcs/h)"], weight=row["Weight/pcs(g)"], attr=product_attrs.get(row["产品号"], 2) # 默认为普通产品 ) original_orders.append(order) if not original_orders: raise ValueError("没有有效订单,请检查数据格式") print(f"开始订单合并(合单天数:{MERGE_DAYS}天)...") # 合并订单 merged_tasks, merge_count = merge_orders(original_orders, MERGE_DAYS) print(f"合单完成,共合并 {merge_count} 次,生成 {len(merged_tasks)} 个合并任务") print("开始生产排程...") # 执行排程 detailed_results = problem3_scheduling(merged_tasks, product_machines, machines, holidays) print(f"排程完成,共生成 {len(detailed_results)} 条生产计划") print("导出结果...") # 导出结果 export_results(detailed_results, OUTPUT_PATH) print("程序执行完成!") except Exception as e: print(f"程序执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main() ``` ### 代码详细解释 #### 1. 基础参数配置 - `CHANGE_TIME`: 换模时间(3.5小时) - `REST_PERIODS`: 每日休息时段(中午12:00-12:30,下午16:30-17:00,凌晨0:00-1:00) #### 2. 核心类定义 - **Order类**:表示单个生产订单,包含订单号、产品号、数量、发货日期等关键信息 - **MergedOrder类**:表示合并后的生产任务,包含原始订单列表和合并信息 - **Machine类**:表示生产机台,包含机台状态、排程计划和可用时间 #### 3. 数据加载函数 (`load_official_data`) - 从Excel附件加载订单、机台、产品和假期数据 - 构建产品-产能、产品-机台、机台-相邻关系等映射 - 返回结构化数据供后续处理 #### 4. 订单合并逻辑 (`merge_orders`) - 按产品分组,相同产品订单按发货日期排序 - 合并发货日期在指定天数内的订单(默认7天) - 创建合并任务对象,保留原始订单引用 #### 5. 生产调度核心逻辑 (`problem3_scheduling`) - **任务排序**:优先处理易搭配产品(属性1),然后按最早发货日期 - **机台选择**: - 易搭配产品优先分配到M03机台 - 考虑换模时间(不同产品切换需3.5小时) - 难生产产品(属性3)避免相邻机台同时生产 - **时间计算**:使用`calculate_end_time`考虑休息时间和节假日 - **结果生成**:为每个订单创建详细排程记录 #### 6. 时间计算函数 (`calculate_end_time`) - 考虑每日休息时段 - 跳过节假日 - 精确计算实际生产结束时间 #### 7. 结果导出函数 (`export_results`) - 生成包含16个指定字段的DataFrame - 按指定列顺序导出为Excel文件 #### 8. 主执行流程 (`main`) 1. 加载数据 2. 初始化订单对象 3. 合并订单(默认7天) 4. 执行生产排程 5. 导出结果到Excel ### 使用说明 1. 将附件1和附件2放在指定目录(默认为当前目录) 2. 修改`DATA_DIR`变量为实际附件路径 3. 设置`OUTPUT_PATH`为期望的输出文件路径 4. 可选:调整`MERGE_DAYS`(7/15/30天) 5. 运行程序,生成排程结果 ### 输出结果说明 结果Excel文件包含以下16个字段: 1. 计划序号(Plan Number) 2. 生产计划安排机台号 3. 订单号PO 4. 产品号 5. 需要工时 (Requested Time--Hours) 6. 生产计划开始时间 7. 生产计划预计完成时间 8. 发货日期(DeliveryDate) 9. 订单数量 10. Weight/pcs(g) 11. Capacity(pcs/h) 12. 最迟开始时间 (Late Start Time) 13. 合单序号(Joined Order Number) 14. 前置任务生产计划序号 15. 是否延期 16. 延期天数
阅读全文

相关推荐

帮我应用到这个代码上 import cv2 import numpy as np import time import math import RPi.GPIO as GPIO # 简化的激光点检测函数 - 亮点检测 def detect_bright_spot(gray_frame, min_brightness=220, min_area=5, min_circularity=0.7): """在灰度图像中检测亮点""" # 阈值处理提取亮区 _, bright_mask = cv2.threshold(gray_frame, min_brightness, 255, cv2.THRESH_BINARY) # 形态学操作去噪 kernel = np.ones((3, 3), np.uint8) bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_OPEN, kernel) bright_mask = cv2.morphologyEx(bright_mask, cv2.MORPH_CLOSE, kernel) # 寻找轮廓 contours, _ = cv2.findContours(bright_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None, bright_mask # 过滤轮廓 candidate_points = [] for contour in contours: area = cv2.contourArea(contour) if area < min_area: continue perimeter = cv2.arcLength(contour, True) if perimeter == 0: continue circularity = 4 * np.pi * area / (perimeter * perimeter) if circularity < min_circularity: continue M = cv2.moments(contour) if M["m00"] == 0: continue cx = int(M["m10"] / M["m00"]) cy = int(M["m01"] / M["m00"]) candidate_points.append((cx, cy, area, circularity)) if not candidate_points: return None, bright_mask # 选择最亮的点(面积最大) candidate_points.sort(key=lambda x: x[2], reverse=True) best_point = candidate_points[0] return (best_point[0], best_point[1]), bright_mask # 视觉检测函数(保持原有逻辑) def find_dominant_lines(edges, frame): """检测并连接线段,形成完整的条带边缘""" min_line_length = cv2.getTrackbarPos("Min Line", "Parameters") max_line_gap = cv2.getTrackbarPos("Max Gap", "Parameters") lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=50, minLineLength=min_line_length, maxLineGap=max_line_gap) upper_edge = None lower_edge = None upper_center_x = None lower_center_x = None debug_frame = frame.copy() if lines is None: return (upper_edge, upper_center_x), (lower_edge, lower_center_x), debug_frame horizontal_lines = [] for line in lines: x1, y1, x2, y2 = line[0] angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi if abs(angle) < 30 or abs(angle) > 150: center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 length = np.sqrt((x2 - x1)**2 + (y2 - y1)**2) merged = False for i, (cx, cy, count, min_y, max_y, total_x, total_y, total_length) in enumerate(horizontal_lines): if abs(center_y - cy) < 15: new_count = count + 1 new_min_y = min(min_y, min(y1, y2)) new_max_y = max(max_y, max(y1, y2)) new_total_x = total_x + center_x new_total_y = total_y + center_y new_total_length = total_length + length new_cx = new_total_x / new_count new_cy = new_total_y / new_count horizontal_lines[i] = (new_cx, new_cy, new_count, new_min_y, new_max_y, new_total_x, new_total_y, new_total_length) merged = True break if not merged: horizontal_lines.append((center_x, center_y, 1, min(y1, y2), max(y1, y2), center_x, center_y, length)) if len(horizontal_lines) < 2: return (upper_edge, upper_center_x), (lower_edge, lower_center_x), debug_frame for cx, cy, count, min_y, max_y, _, _, _ in horizontal_lines: avg_length = 300 x1 = int(cx - avg_length / 2) x2 = int(cx + avg_length / 2) cv2.line(debug_frame, (x1, int(cy)), (x2, int(cy)), (0, 255, 0), 2) horizontal_lines.sort(key=lambda x: x[1]) candidate_edges = [] for i in range(len(horizontal_lines)): cx, cy, count, min_y, max_y, _, _, length = horizontal_lines[i] weight = count * length candidate_edges.append((cy, min_y, max_y, weight, cx)) candidate_edges.sort(key=lambda x: x[3], reverse=True) if len(candidate_edges) >= 2: upper_edge = candidate_edges[0][0] upper_center_x = candidate_edges[0][4] lower_edge = candidate_edges[1][0] lower_center_x = candidate_edges[1][4] if upper_edge > lower_edge: upper_edge, lower_edge = lower_edge, upper_edge upper_center_x, lower_center_x = lower_center_x, upper_center_x cv2.line(debug_frame, (0, int(upper_edge)), (frame.shape[1], int(upper_edge)), (0, 0, 255), 2) cv2.circle(debug_frame, (int(upper_center_x), int(upper_edge)), 8, (255, 255, 0), -1) cv2.line(debug_frame, (0, int(lower_edge)), (frame.shape[1], int(lower_edge)), (0, 0, 255), 2) cv2.circle(debug_frame, (int(lower_center_x), int(lower_edge)), 8, (255, 255, 0), -1) return (upper_edge, upper_center_x), (lower_edge, lower_center_x), debug_frame # 计算偏转角度函数(保持原有逻辑) def calculate_angles(offset_x, offset_y, frame_width, frame_height, hfov=70.0, vfov=50.0): """计算目标相对于视野中心的偏转角度""" deg_per_pixel_x = hfov / frame_width deg_per_pixel_y = vfov / frame_height angle_x = offset_x * deg_per_pixel_x angle_y = offset_y * deg_per_pixel_y return angle_x, angle_y # PID控制器类(保持原有逻辑) class PIDController: def __init__(self, kp=0.5, ki=0.01, kd=0.1, max_output=45): self.kp = kp self.ki = ki self.kd = kd self.max_output = max_output self.previous_error = 0 self.integral = 0 self.last_time = time.time() def update(self, error): current_time = time.time() dt = current_time - self.last_time self.last_time = current_time if dt <= 0: return 0 proportional = self.kp * error self.integral += error * dt integral = self.ki * self.integral derivative = self.kd * (error - self.previous_error) / dt self.previous_error = error output = proportional + integral + derivative output = max(-self.max_output, min(self.max_output, output)) return output def reset(self): self.previous_error = 0 self.integral = 0 self.last_time = time.time() # 舵机控制类(保持原有逻辑) class ServoController: def __init__(self, min_angle_x=-30, max_angle_x=50, min_angle_y=-10, max_angle_y=10, min_pwm=500, max_pwm=2500, center_pwm_x=1500, center_pwm_y=1500): # X方向参数 self.min_angle_x = min_angle_x self.max_angle_x = max_angle_x self.center_pwm_x = center_pwm_x # Y方向参数 self.min_angle_y = min_angle_y self.max_angle_y = max_angle_y self.center_pwm_y = center_pwm_y # 通用PWM参数 self.min_pwm = min_pwm self.max_pwm = max_pwm self.current_angle_x = 0 self.current_angle_y = 0 self.angle_limit = 0.5 self.pid_x = PIDController(kp=0.6, ki=0.02, kd=0.15, max_output=40) self.pid_y = PIDController(kp=0.6, ki=0.02, kd=0.15, max_output=40) def update(self, angle_x, angle_y): if abs(angle_x) < self.angle_limit: angle_x = 0 if abs(angle_y) < self.angle_limit: angle_y = 0 adjustment_x = self.pid_x.update(angle_x) adjustment_y = self.pid_y.update(angle_y) self.current_angle_x = self._clamp_angle_x(self.current_angle_x + adjustment_x) self.current_angle_y = self._clamp_angle_y(self.current_angle_y + adjustment_y) pwm_x = self._angle_to_pwm_x(self.current_angle_x) pwm_y = self._angle_to_pwm_y(self.current_angle_y) return pwm_x, pwm_y def _clamp_angle_x(self, angle): return max(self.min_angle_x, min(self.max_angle_x, angle)) def _clamp_angle_y(self, angle): return max(self.min_angle_y, min(self.max_angle_y, angle)) def _angle_to_pwm_x(self, angle): if angle >= 0: return int(self.center_pwm_x + (angle / self.max_angle_x) * (self.max_pwm - self.center_pwm_x)) else: return int(self.center_pwm_x + (angle / self.min_angle_x) * (self.min_pwm - self.center_pwm_x)) def _angle_to_pwm_y(self, angle): if angle >= 0: return int(self.center_pwm_y + (angle / self.max_angle_y) * (self.max_pwm - self.center_pwm_y)) else: return int(self.center_pwm_y + (angle / self.min_angle_y) * (self.min_pwm - self.center_pwm_y)) def reset(self): self.current_angle_x = 0 self.current_angle_y = 0 self.pid_x.reset() self.pid_y.reset() return self.center_pwm_x, self.center_pwm_y def draw_servo_indicator(frame, angle_x, angle_y): """在图像上绘制舵机角度指示器""" height, width = frame.shape[:2] cv2.putText(frame, f"X: {70 + angle_x:.1f}°", (10, height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1) cv2.putText(frame, f"Y: {90 + angle_y:.1f}°", (10, height - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1) bar_width = 100 bar_height = 10 bar_x = width // 2 - bar_width // 2 bar_y = height - 30 cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (100, 100, 100), 1) center_x = bar_x + bar_width // 2 position_x = center_x + int((angle_x / 50) * (bar_width // 2)) cv2.line(frame, (center_x, bar_y - 5), (center_x, bar_y + bar_height + 5), (0, 255, 0), 1) cv2.circle(frame, (position_x, bar_y + bar_height // 2), 5, (0, 255, 255), -1) bar_y = height - 60 cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (100, 100, 100), 1) position_y = center_x + int((angle_y / 10) * (bar_width // 2)) cv2.line(frame, (center_x, bar_y - 5), (center_x, bar_y + bar_height + 5), (0, 255, 0), 1) cv2.circle(frame, (position_y, bar_y + bar_height // 2), 5, (0, 255, 255), -1) def setup_gpio(servo_x_pin, servo_y_pin): GPIO.setmode(GPIO.BCM) GPIO.setup(servo_x_pin, GPIO.OUT) GPIO.setup(servo_y_pin, GPIO.OUT) pwm_x = GPIO.PWM(servo_x_pin, 50) pwm_y = GPIO.PWM(servo_y_pin, 50) pwm_x.start(0) pwm_y.start(0) return pwm_x, pwm_y def apply_servo_control(pwm_x_obj, pwm_y_obj, pwm_x, pwm_y): duty_cycle_x = (pwm_x / 20000.0) * 100 duty_cycle_y = (pwm_y / 20000.0) * 100 pwm_x_obj.ChangeDutyCycle(duty_cycle_x) pwm_y_obj.ChangeDutyCycle(duty_cycle_y) def cleanup_gpio(pwm_x, pwm_y): pwm_x.stop() pwm_y.stop() GPIO.cleanup() def main(): SERVO_X_PIN = 15 SERVO_Y_PIN = 18 pwm_x_obj, pwm_y_obj = setup_gpio(SERVO_X_PIN, SERVO_Y_PIN) cap = cv2.VideoCapture(0) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) cap.set(cv2.CAP_PROP_FPS, 60) # 创建主参数窗口 cv2.namedWindow("Parameters") cv2.createTrackbar("Brightness", "Parameters", 60, 100, lambda x: None) cv2.createTrackbar("Contrast", "Parameters", 60, 100, lambda x: None) cv2.createTrackbar("Canny1", "Parameters", 50, 200, lambda x: None) cv2.createTrackbar("Canny2", "Parameters", 150, 300, lambda x: None) cv2.createTrackbar("Min Line", "Parameters", 50, 300, lambda x: None) cv2.createTrackbar("Max Gap", "Parameters", 20, 100, lambda x: None) # 激光检测参数 cv2.createTrackbar("Laser Thresh", "Parameters", 220, 255, lambda x: None) cv2.createTrackbar("Min Area", "Parameters", 5, 100, lambda x: None) last_offset_x = 0 last_offset_y = 0 frame_count = 0 start_time = time.time() fps = 0 HORIZONTAL_FOV = 70.0 VERTICAL_FOV = 50.0 # 初始化舵机控制器 servo_controller = ServoController( min_angle_x=-30, max_angle_x=50, min_angle_y=-10, max_angle_y=10, center_pwm_x=1500, center_pwm_y=1500 ) servo_enabled = True tracking_history = [] max_history_length = 100 apply_servo_control(pwm_x_obj, pwm_y_obj, 1500, 1500) time.sleep(0.5) laser_in_zone = False laser_start_time = 0 laser_duration = 0 detection_zone = None last_detection_zone = None while True: ret, frame = cap.read() if not ret: print("摄像头读取失败,请检查连接") break frame = cv2.rotate(frame, cv2.ROTATE_180) brightness = cv2.getTrackbarPos("Brightness", "Parameters") contrast = cv2.getTrackbarPos("Contrast", "Parameters") canny1 = cv2.getTrackbarPos("Canny1", "Parameters") canny2 = cv2.getTrackbarPos("Canny2", "Parameters") alpha = contrast / 50.0 beta = brightness - 50 frame = cv2.convertScaleAbs(frame, alpha=alpha, beta=beta) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, (3, 3), 0) edges = cv2.Canny(blurred, canny1, canny2) (upper_edge, upper_center_x), (lower_edge, lower_center_x), debug_frame = find_dominant_lines(edges, frame.copy()) target_detected = False if (upper_edge is not None and lower_edge is not None and upper_center_x is not None and lower_center_x is not None and upper_edge < lower_edge): target_detected = True white_center_y = (upper_edge + lower_edge) // 2 white_center_x = (upper_center_x + lower_center_x) // 2 frame_center_x = frame.shape[1] // 2 frame_center_y = frame.shape[0] // 2 offset_x_px = white_center_x - frame_center_x offset_y_px = white_center_y - frame_center_y filtered_offset_x = 0.7 * last_offset_x + 0.3 * offset_x_px filtered_offset_y = 0.7 * last_offset_y + 0.3 * offset_y_px last_offset_x = filtered_offset_x last_offset_y = filtered_offset_y angle_x, angle_y = calculate_angles( filtered_offset_x, filtered_offset_y, frame.shape[1], frame.shape[0], HORIZONTAL_FOV, VERTICAL_FOV ) if servo_enabled: pwm_x, pwm_y = servo_controller.update(angle_x, angle_y) apply_servo_control(pwm_x_obj, pwm_y_obj, pwm_x, pwm_y) else: pwm_x, pwm_y = servo_controller.center_pwm_x, servo_controller.center_pwm_y tracking_error = math.sqrt(offset_x_px**2 + offset_y_px**2) tracking_history.append(tracking_error) if len(tracking_history) > max_history_length: tracking_history.pop(0) cv2.circle(debug_frame, (frame_center_x, frame_center_y), 10, (0, 255, 0), 2) cv2.circle(debug_frame, (int(white_center_x), int(white_center_y)), 10, (0, 0, 255), 2) cv2.line(debug_frame, (frame_center_x, frame_center_y), (int(white_center_x), int(white_center_y)), (255, 0, 0), 2) # 绘制白色条带区域 cv2.rectangle(debug_frame, (0, int(upper_edge)), (frame.shape[1], int(lower_edge)), (200, 200, 200), 1) detection_zone = (0, int(upper_edge), frame.shape[1], int(lower_edge)) last_detection_zone = detection_zone status_text = [ f"X偏移: {filtered_offset_x:.1f}px", f"Y偏移: {filtered_offset_y:.1f}px", f"X角度: {angle_x:.2f}°", f"Y角度: {angle_y:.2f}°", f"舵机角度 X: {70 + servo_controller.current_angle_x:.1f}°", f"舵机角度 Y: {90 + servo_controller.current_angle_y:.1f}°", f"舵机PWM X: {pwm_x}μs", f"舵机PWM Y: {pwm_y}μs", f"跟踪误差: {tracking_error:.1f}px", f"平均误差: {np.mean(tracking_history):.1f}px", f"FPS: {fps:.1f}", f"舵机控制: {'启用' if servo_enabled else '禁用'}" ] for i, text in enumerate(status_text): cv2.putText(debug_frame, text, (10, 20 + i*25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) else: target_detected = False if servo_enabled: pwm_x, pwm_y = servo_controller.reset() apply_servo_control(pwm_x_obj, pwm_y_obj, pwm_x, pwm_y) else: pwm_x, pwm_y = servo_controller.center_pwm_x, servo_controller.center_pwm_y tracking_history = [] cv2.putText(debug_frame, "未检测到目标", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) if last_detection_zone is not None: detection_zone = last_detection_zone # 绘制上一次的检测区域 cv2.rectangle(debug_frame, (0, detection_zone[1]), (frame.shape[1], detection_zone[3]), (100, 100, 100), 1) # 使用简化的亮点检测方法 min_brightness = cv2.getTrackbarPos("Laser Thresh", "Parameters") min_area = cv2.getTrackbarPos("Min Area", "Parameters") laser_point, laser_mask = detect_bright_spot(gray, min_brightness, min_area) if laser_point is not None: lx, ly = laser_point cv2.circle(debug_frame, (lx, ly), 10, (0, 0, 255), -1) cv2.putText(debug_frame, "LASER", (lx + 15, ly), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) # 检查激光点是否在检测区域内 if detection_zone is not None: _, zone_y1, _, zone_y2 = detection_zone if zone_y1 <= ly <= zone_y2: if not laser_in_zone: laser_in_zone = True laser_start_time = time.time() print("激光点进入识别区域,开始计时") else: laser_duration = time.time() - laser_start_time else: if laser_in_zone: print(f"激光点离开识别区域,停留时间: {laser_duration:.2f}秒") laser_in_zone = False laser_duration = 0 else: cv2.putText(debug_frame, "无检测区域", (lx + 15, ly + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) else: if laser_in_zone: print(f"激光点消失,停留时间: {laser_duration:.2f}秒") laser_in_zone = False laser_duration = 0 if laser_in_zone: cv2.putText(debug_frame, f"计时: {laser_duration:.2f}秒", (frame.shape[1] - 200, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) frame_count += 1 if frame_count >= 10: end_time = time.time() fps = frame_count / (end_time - start_time) frame_count = 0 start_time = time.time() cv2.putText(debug_frame, f"FPS: {fps:.1f}", (debug_frame.shape[1] - 120, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) draw_servo_indicator(debug_frame, servo_controller.current_angle_x, servo_controller.current_angle_y) cv2.imshow("Visual Servo Tracking", debug_frame) cv2.imshow("Edges", edges) cv2.imshow("Laser Mask", laser_mask) key = cv2.waitKey(1) & 0xFF if key == ord('q'): cv2.destroyAllWindows() break elif key == ord(' '): servo_enabled = not servo_enabled print(f"舵机控制 {'已启用' if servo_enabled else '已禁用'}") elif key == ord('r'): servo_controller.reset() apply_servo_control(pwm_x_obj, pwm_y_obj, 1500, 1500) print("舵机已重置到中心位置") elif key == ord('c'): servo_controller = ServoController(min_angle_x=-30, max_angle_x=50, min_angle_y=-10, max_angle_y=10, center_pwm_x=1500, center_pwm_y=1500) apply_servo_control(pwm_x_obj, pwm_y_obj, 1500, 1500) print("舵机已校准到中心位置") elif key == ord('t'): laser_in_zone = False laser_duration = 0 print("激光计时已重置") apply_servo_control(pwm_x_obj, pwm_y_obj, 0, 0) cap.release() cv2.destroyAllWindows() cleanup_gpio(pwm_x_obj, pwm_y_obj) if __name__ == "__main__": main()

import os import torch import transformers from transformers import ( AutoModelForCausalLM, AutoTokenizer, TrainingArguments, DataCollatorForLanguageModeling, BitsAndBytesConfig, Trainer ) from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training from datasets import load_dataset import logging import psutil import gc from datetime import datetime # === 配置区域 === MODEL_NAME = "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/models/Yi-6B" DATASET_PATH = "./data/train_lora_formatted.jsonl" OUTPUT_DIR = "./yi6b-lora-optimized" DEVICE_MAP = "auto" # 使用自动设备映射 # 确保输出目录存在 os.makedirs(OUTPUT_DIR, exist_ok=True) # === 内存优化配置 === os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # 减少内存碎片 torch.backends.cuda.cufft_plan_cache.clear() # 清理CUDA缓存 # === 增强的日志系统 === def setup_logging(output_dir): """配置日志系统,支持文件和TensorBoard""" logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 文件日志处理器 file_handler = logging.FileHandler(os.path.join(output_dir, "training.log")) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) # TensorBoard日志目录 tensorboard_log_dir = os.path.join(output_dir, "logs", datetime.now().strftime("%Y%m%d-%H%M%S")) os.makedirs(tensorboard_log_dir, exist_ok=True) # 安装TensorBoard回调 tb_writer = None try: from torch.utils.tensorboard import SummaryWriter tb_writer = SummaryWriter(log_dir=tensorboard_log_dir) logger.info(f"TensorBoard日志目录: {tensorboard_log_dir}") except ImportError: logger.warning("TensorBoard未安装,可视化功能不可用") return logger, tb_writer logger, tb_writer = setup_logging(OUTPUT_DIR) # === 量化配置 - 使用更高效的配置 === quant_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) # === 加载模型 === logger.info("加载预训练模型...") model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True, attn_implementation="flash_attention_2" # 使用FlashAttention优化内存 ) # === 分词器处理 === logger.info("加载分词器...") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) tokenizer.padding_side = "right" if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = tokenizer.eos_token_id # === 准备模型训练 === model = prepare_model_for_kbit_training( model, use_gradient_checkpointing=True # 启用梯度检查点以节省内存 ) # === LoRA 配置 - 优化内存使用 === logger.info("配置LoRA...") lora_config = LoraConfig( r=64, # 降低rank以减少内存使用 lora_alpha=32, # 降低alpha值 target_modules=["q_proj", "v_proj"], # 减少目标模块 lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, lora_config) # 记录可训练参数 trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) total_params = sum(p.numel() for p in model.parameters()) logger.info(f"可训练参数: {trainable_params:,} / 总参数: {total_params:,} ({trainable_params/total_params:.2%})") # === 加载并预处理数据集 === logger.info("加载和预处理数据集...") dataset = load_dataset("json", data_files=DATASET_PATH, split="train") # 文本过滤函数 def is_valid_text(example): text = example.get("text", "") return text is not None and len(text.strip()) > 200 # 增加最小长度要求 dataset = dataset.filter(is_valid_text) logger.info(f"过滤后数据集大小: {len(dataset)} 条") # 动态填充的分词函数 - 节省内存 def tokenize_function(examples): tokenized = tokenizer( examples["text"], padding=True, # 使用动态填充 truncation=True, max_length=1024, # 降低上下文长度以减少内存使用 ) # 创建 labels - 因果语言建模需要 labels = input_ids tokenized["labels"] = tokenized["input_ids"].copy() return tokenized tokenized_dataset = dataset.map( tokenize_function, batched=True, remove_columns=["text"], batch_size=64, # 降低批处理大小以减少内存峰值 num_proc=4, # 减少进程数以降低内存开销 ) # === 数据整理器 === data_collator = DataCollatorForLanguageModeling( tokenizer=tokenizer, mlm=False # 因果语言建模 ) # === 训练参数 - 优化内存使用 === report_to_list = ["tensorboard"] if tb_writer else [] training_args = TrainingArguments( output_dir=OUTPUT_DIR, per_device_train_batch_size=4, # 大幅降低批次大小 gradient_accumulation_steps=4, # 增加梯度累积步数以保持有效批次大小 learning_rate=2e-5, num_train_epochs=3, logging_steps=50, save_strategy="steps", save_steps=500, bf16=True, optim="paged_adamw_32bit", report_to=report_to_list, warmup_ratio=0.05, gradient_checkpointing=True, # 启用梯度检查点 fp16=False, max_grad_norm=0.3, # 降低梯度裁剪阈值 remove_unused_columns=True, # 移除未使用的列以节省内存 dataloader_num_workers=4, # 减少数据加载工作线程 evaluation_strategy="steps", eval_steps=500, save_total_limit=2, # 减少保存的检查点数量 logging_dir=os.path.join(OUTPUT_DIR, "logs"), load_best_model_at_end=True, ddp_find_unused_parameters=False, logging_first_step=True, group_by_length=True, lr_scheduler_type="cosine", weight_decay=0.01, ) # === GPU监控工具 === def monitor_gpu(): """监控GPU使用情况""" if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024**3 mem_reserved = torch.cuda.memory_reserved(device) / 1024**3 mem_total = torch.cuda.get_device_properties(device).total_memory / 1024**3 return { "allocated": f"{mem_alloc:.2f} GB", "reserved": f"{mem_reserved:.2f} GB", "total": f"{mem_total:.2f} GB", "utilization": f"{mem_alloc/mem_total*100:.1f}%" } return {} # === 创建训练器 === eval_dataset = None if len(tokenized_dataset) > 100: eval_dataset = tokenized_dataset.select(range(100)) trainer = Trainer( model=model, tokenizer=tokenizer, args=training_args, train_dataset=tokenized_dataset, eval_dataset=eval_dataset, data_collator=data_collator, ) # === 训练前验证 === def validate_data_and_model(): """验证数据和模型是否准备好训练""" logger.info("\n=== 训练前验证 ===") # 检查样本格式 sample = tokenized_dataset[0] logger.info(f"样本键: {list(sample.keys())}") logger.info(f"input_ids 长度: {len(sample['input_ids'])}") # 创建单个样本测试批次 test_batch = data_collator([sample]) # 移动数据到设备 test_batch = {k: v.to(model.device) for k, v in test_batch.items()} # 前向传播测试 model.train() outputs = model(**test_batch) loss_value = outputs.loss.item() logger.info(f"测试批次损失: {loss_value:.4f}") # 记录到TensorBoard if tb_writer: tb_writer.add_scalar("debug/test_loss", loss_value, 0) # 反向传播测试 outputs.loss.backward() logger.info("反向传播成功!") # 重置梯度 model.zero_grad() logger.info("验证完成,准备开始训练\n") # 记录初始GPU使用情况 gpu_status = monitor_gpu() logger.info(f"初始GPU状态: {gpu_status}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("system/initial_gpu", str(gpu_status), 0) validate_data_and_model() # === 自定义回调 - 监控资源使用 === class ResourceMonitorCallback(transformers.TrainerCallback): def __init__(self, tb_writer=None): self.tb_writer = tb_writer self.start_time = datetime.now() self.last_log_time = datetime.now() def on_step_end(self, args, state, control, **kwargs): current_time = datetime.now() time_diff = (current_time - self.last_log_time).total_seconds() # 每分钟记录一次资源使用情况 if time_diff > 60: self.last_log_time = current_time # GPU监控 gpu_status = monitor_gpu() logger.info(f"Step {state.global_step} - GPU状态: {gpu_status}") # CPU和内存监控 cpu_percent = psutil.cpu_percent() mem = psutil.virtual_memory() logger.info(f"CPU使用率: {cpu_percent}%, 内存使用: {mem.used/1024**3:.2f}GB/{mem.total/1024**3:.2f}GB") # 记录到TensorBoard if self.tb_writer: # GPU显存使用 if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024**3 self.tb_writer.add_scalar("system/gpu_mem", mem_alloc, state.global_step) # CPU使用率 self.tb_writer.add_scalar("system/cpu_usage", cpu_percent, state.global_step) # 系统内存使用 self.tb_writer.add_scalar("system/ram_usage", mem.used/1024**3, state.global_step) def on_log(self, args, state, control, logs=None, **kwargs): """记录训练指标到TensorBoard""" if self.tb_writer and logs is not None: for metric_name, metric_value in logs.items(): if "loss" in metric_name or "lr" in metric_name or "grad_norm" in metric_name: self.tb_writer.add_scalar(f"train/{metric_name}", metric_value, state.global_step) def on_train_end(self, args, state, control, **kwargs): """训练结束时记录总时间""" training_time = datetime.now() - self.start_time logger.info(f"训练总时间: {training_time}") if self.tb_writer: self.tb_writer.add_text("system/total_time", str(training_time)) # 添加回调 trainer.add_callback(ResourceMonitorCallback(tb_writer=tb_writer)) # === 内存清理函数 === def clear_memory(): """清理内存和GPU缓存""" gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() logger.info("内存清理完成") # === 启动训练 === try: logger.info("开始训练...") # 分阶段训练以减少内存峰值 num_samples = len(tokenized_dataset) chunk_size = 1000 # 每次处理1000个样本 for i in range(0, num_samples, chunk_size): end_idx = min(i + chunk_size, num_samples) logger.info(f"训练样本 {i} 到 {end_idx-1} / {num_samples}") # 创建子数据集 chunk_dataset = tokenized_dataset.select(range(i, end_idx)) # 更新训练器 trainer.train_dataset = chunk_dataset # 训练当前块 trainer.train() # 清理内存 clear_memory() # 保存训练指标 metrics = trainer.evaluate() trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) # 保存最佳模型 trainer.save_model(OUTPUT_DIR) tokenizer.save_pretrained(OUTPUT_DIR) logger.info(f"训练完成! 模型保存在: {OUTPUT_DIR}") # 记录最终指标到TensorBoard if tb_writer: for metric_name, metric_value in metrics.items(): tb_writer.add_scalar(f"final/{metric_name}", metric_value) tb_writer.close() except Exception as e: logger.error(f"训练出错: {e}") import traceback logger.error(traceback.format_exc()) # 尝试更小批量训练 logger.info("\n尝试更小批量训练...") small_dataset = tokenized_dataset.select(range(50)) trainer.train_dataset = small_dataset trainer.train() # 保存模型 trainer.save_model(f"{OUTPUT_DIR}_small") tokenizer.save_pretrained(f"{OUTPUT_DIR}_small") logger.info(f"小批量训练完成! 模型保存在: {OUTPUT_DIR}_small") # 记录错误到TensorBoard if tb_writer: tb_writer.add_text("error/exception", traceback.format_exc()) # 清理内存 clear_memory() # === 训练后验证 === def validate_final_model(): """验证训练后的模型""" logger.info("\n=== 训练后验证 ===") # 加载保存的模型 from peft import PeftModel # 仅加载基础模型配置 base_model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True, load_in_4bit=True ) # 加载LoRA适配器 peft_model = PeftModel.from_pretrained(base_model, OUTPUT_DIR) # 合并LoRA权重 merged_model = peft_model.merge_and_unload() # 测试生成 prompt = "中国的首都是" inputs = tokenizer(prompt, return_tensors="pt").to(merged_model.device) outputs = merged_model.generate( **inputs, max_new_tokens=50, # 减少生成长度 temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"提示: {prompt}") logger.info(f"生成结果: {generated}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("validation/sample", f"提示: {prompt}\n生成: {generated}") # 更全面的测试 test_prompts = [ "人工智能的未来发展趋势是", "如何学习深度学习?", "写一个关于太空探索的短故事:" ] for i, test_prompt in enumerate(test_prompts): inputs = tokenizer(test_prompt, return_tensors="pt").to(merged_model.device) outputs = merged_model.generate( **inputs, max_new_tokens=100, # 减少生成长度 temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"\n提示: {test_prompt}\n生成: {generated_text}\n{'='*50}") # 记录到TensorBoard if tb_writer: tb_writer.add_text(f"validation/test_{i}", f"提示: {test_prompt}\n生成: {generated_text}") logger.info("验证完成") # 执行验证 validate_final_model() # 关闭TensorBoard写入器 if tb_writer: tb_writer.close() logger.info("TensorBoard日志已关闭") (.venv) (base) vipuser@ubuntu22:~/ai_writer_project_final_with_fixed_output_ui$ python train_lora.py 2025-07-13 22:10:19,098 - INFO - TensorBoard日志目录: ./yi6b-lora-optimized/logs/20250713-221019 2025-07-13 22:10:19,099 - INFO - 加载预训练模型... Traceback (most recent call last): File "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/train_lora.py", line 77, in <module> model = AutoModelForCausalLM.from_pretrained( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/.venv/lib/python3.11/site-packages/transformers/models/auto/auto_factory.py", line 566, in from_pretrained return model_class.from_pretrained( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/.venv/lib/python3.11/site-packages/transformers/modeling_utils.py", line 3590, in from_pretrained config = cls._autoset_attn_implementation( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/.venv/lib/python3.11/site-packages/transformers/modeling_utils.py", line 1389, in _autoset_attn_implementation cls._check_and_enable_flash_attn_2( File "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/.venv/lib/python3.11/site-packages/transformers/modeling_utils.py", line 1480, in _check_and_enable_flash_attn_2 raise ImportError(f"{preface} the package flash_attn seems to be not installed. {install_message}") ImportError: FlashAttention2 has been toggled on, but it cannot be used due to the following error: the package flash_attn seems to be not installed. Please refer to the documentation of https://huggingface.co/docs/transformers/perf_infer_gpu_one#flashattention-2 to install Flash Attention 2.

from sklearn.metrics.pairwise import cosine_similarity # 计算所有股票的持仓相似度 def calculate_daily_positions_consine_similarity(df): df = df.copy() all_dates = pd.date_range(start=df['date'].min(), end=df['date'].max(), name='date').strftime("%Y-%m-%d") all_clients = df['client_id'].unique() all_instruments = df['instrument_id'].unique() full_panel = pd.MultiIndex.from_product( [all_dates, all_clients, all_instruments], names=['date', 'client_id', 'instrument_id'] ).to_frame(index=False) merged = full_panel.merge( df, on=['date', 'client_id', 'instrument_id'], how='left' ) # 填充缺失日期 merged['cum_qty'] = merged.groupby(['client_id', 'instrument_id'])['cum_qty'].ffill() merged = merged.fillna(0) merged['position_type'] = np.where( merged['cum_qty'] > 0, 'long', np.where(merged['cum_qty'] < 0, 'short', 'neutral') ) # 计算每日持仓统计 merged['is_long'] = (merged['position_type'] == 'long') merged['is_short'] = (merged['position_type'] == 'short') daily_stats = merged.groupby(['client_id', 'date']).agg( long_qty=('cum_qty', lambda s: s[s > 0].sum()), short_qty=('cum_qty', lambda s: abs(s[s < 0].sum())) ).reset_index() daily_stats["total_qty"] = daily_stats["long_qty"] + daily_stats["short_qty"] merged = merged.merge(daily_stats, on=['client_id', 'date']) merged["posion_weight"] = merged["cum_qty"] / merged["total_qty"] merged["posion_weight"] = merged["posion_weight"].fillna(0) # 计算持仓余弦相似度 def calculate_cosine_similarity(client_group): client_group = client_group.sort_values('date') results = [] stock_matrix = client_group.pivot(index='date', columns='instrument_id', values='posion_weight').fillna(0) similarity_matrix = cosine_similarity(stock_matrix.values) # 计算每日相似度(与前一日比较) for i in range(1, len(stock_matrix)): current_date = stock_matrix.index[i] results.append({ 'date': current_date, 'pos_sim': similarity_matrix[i, i-1] }) # # 计算每日相似度(与前一日比较) # for i in range(1, len(stock_matrix)): # current_date = stock_matrix.index[i] # prev_date = stock_matrix.index[i-1] # # 计算总相似度 # pos_vec1 = stock_matrix.loc[prev_date].values.reshape(1, -1) # pos_vec2 = stock_matrix.loc[current_date].values.reshape(1, -1) # pos_sim = cosine_similarity(pos_vec1, pos_vec2)[0][0] if long_total[prev_date] > 0 and long_total[current_date] > 0 else 0 # results.append({ # 'date': current_date, # 'pos_sim': pos_sim # }) return pd.DataFrame(results) # 每个client,每日,根据所有股票仓位,可以计算相似性 similarity_results = merged.groupby(['client_id']).apply(calculate_cosine_similarity).reset_index() return similarity_results final_daily_stats = daily_stats.merge( similarity_results, on=['client_id', 'date'], how='left' ) return final_daily_stats # 计算每日持仓余弦相似性 daily_positions_cos = calculate_daily_positions_consine_similarity(multi_cpty_df[(multi_cpty_df["client_id"] == "ABRH400")]) daily_positions_cos 你误解了我的意思,不过没关系,现在再看看这个代码,你能理解了么

#综合成绩df_grades、任务点完成详情df_point、章节测验统计df_quizzes、考试统计 df_grades=pd.read_excel(r'E:\20241171-专业1_统计一键导出.xlsx',sheet_name='综合成绩',skiprows=2) 学生姓名 学号/工号 院系 专业 班级 章节任务点(5%) 章节测验(10%) 作业(40%) 考试(30%) 签到(5%) 课程积分(10%) 综合成绩 0 学生1 2024117101 学院1 专业1 20241171 3.07 4.96 28.8 19.5 4.33 5 65.66 1 学生2 2024117102 学院1 专业1 20241171 4.77 9.18 36.0 16.5 4.67 10 81.12 2 学生3 2024117103 学院1 专业1 20241171 5.00 8.35 36.0 20.7 4.67 10 84.72 3 学生4 2024117104 学院1 专业1 20241171 4.89 8.82 36.0 16.2 4.6 df_point['学号/工号'] = df_point['学号/工号'].astype(int) # 将"已完成"替换为1,"未完成"替换为0 df_point = df_point.replace({'已完成':1, '未完成': 0}) merged_df = pd.merge(df_grades, df_point, on='学号/工号', how='left')merged_df = merged_df.drop(columns=['学生姓名', '院系', '专业', '班级'])#计算任务完成率 task_columns = [col for col in merged_df.columns if col.endswith('.mp4') or col.endswith('.pptx')] merged_df['任务完成率'] = merged_df[task_columns].mean(axis=1) grade_features = ['章节任务点(5%)', '章节测验(10%)', '作业(40%)', '考试(30%)', '签到(5%)', '课程积分(10%)'] X = merged_df[grade_features] Y = merged_df[['任务完成率']]#特征标准化 grade_scaler = StandardScaler().fit(X) X_scaled = grade_scaler.fit_transform(X) task_scaler = StandardScaler().fit(Y) Y_scaled = task_scaler.transform(Y)# PCA降维 pca = PCA(n_components=1) # 降至1维 X_pca = pca.fit_transform(X_scaled) X_pca = -X_pca # 组合特征(X轴=成绩PCA,Y轴=任务完成度) combined_features = np.hstack((X_pca, Y_scaled)) # KMeans聚类 kmeans = KMeans(n_clusters=3, random_state=42,n_init=10) clusters = kmeans.fit_predict(combined_features) # 分离成绩和任务部分 grade_centers = kmeans.cluster_centers_[:, :1] # 成绩PCA部分 task_centers = kmeans.cluster_centers_[:, 1:] # 任务部分 # 逆变换成绩部分 grade_centers_restored = pca.inverse_transform(grade_centers) # 还原到6维标准化空间 grade_centers_original = grade_scaler.inverse_transform(grade_centers_restored) # 还原到原始成绩 # 逆变换任务部分 task_centers_original = task_scaler.inverse_transform(task_centers) # 合并结果 cluster_centers_original = np.hstack((grade_centers_original, task_centers_original)) # 创建DataFrame feature_names = X.columns.tolist() + ['任务完成率'] cluster_center_df = pd.DataFrame(cluster_centers_original, columns=feature_names) print("各聚类中心实际特征值:") display(cluster_center_df) 章节任务点(5%) 章节测验(10%) 作业(40%) 考试(30%) 签到(5%) 课程积分(10%) 任务完成率 0 3.763759 6.265802 31.862586 16.766125 4.359226 9.327531 0.983806 1 7.465528 12.938833 47.649160 27.198091 5.972354 11.442964 0.163158 2 4.783249 8.103595 36.210304 19.639151 4.803491 9.910134 0.788278 x_data = X_pca.flatten() # 将二维数组转为一维 y_data = merged_df['任务完成率'].values center_x = kmeans.cluster_centers_[:, 0] # 聚类中心在PCA空间的X值 center_y =kmeans.cluster_centers_[:, 1] #center_y = cluster_center_df['任务完成率'].values# 查看第一主成分的权重向量 weights = pca.components_[0] for i, feature in enumerate(grade_features): print(f"{feature}: {weights[i]:.4f}")章节任务点(5%): -0.4678 章节测验(10%): -0.4299 作业(40%): -0.4434 考试(30%): -0.3644 签到(5%): -0.4462 课程积分(10%): -0.2607# 创建画布 plt.figure(figsize=(10, 6)) # 绘制各聚类散点 plt.scatter(x_data[clusters == 0], y_data[clusters == 0], c='purple', alpha=0.7, s=80, label="稳定优秀型") plt.scatter(x_data[clusters == 2], y_data[clusters == 2], c='green', alpha=0.7, s=80, label="潜力提升型") plt.scatter(x_data[clusters ==1], y_data[clusters == 1], c='orange', alpha=0.7, s=80, label="待提升型") plt.xlabel("综合成绩(降维后)", fontsize=12) plt.ylabel("任务完成率", fontsize=12) plt.legend(title="学生类型", loc='upper left', frameon=True, edgecolor='black')老师让我你接下来试一下用sklearn里面的tsne来把聚类的数据投影到二维和三维,我要怎么做,因为我·没有接触过这个,你要说详细一点,解释一下。这里需要在聚类的基础上接着做tsne吗

import os import torch import transformers from transformers import ( AutoModelForCausalLM, AutoTokenizer, TrainingArguments, DataCollatorForLanguageModeling, BitsAndBytesConfig, Trainer ) from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training from datasets import load_dataset import logging import psutil import gc from datetime import datetime # === 配置区域 === MODEL_NAME = "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/models/Yi-6B" DATASET_PATH = "./data/train_lora_formatted.jsonl" OUTPUT_DIR = "./yi6b-lora-optimized" DEVICE_MAP = "auto" # 使用自动设备映射 # 确保输出目录存在 os.makedirs(OUTPUT_DIR, exist_ok=True) # === 增强的日志系统 === def setup_logging(): """配置日志系统,支持文件和TensorBoard""" # 主日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(OUTPUT_DIR, "training.log")), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # TensorBoard日志目录 tensorboard_log_dir = os.path.join(OUTPUT_DIR, "logs", datetime.now().strftime("%Y%m%d-%H%M%S")) os.makedirs(tensorboard_log_dir, exist_ok=True) # 安装TensorBoard回调 try: from torch.utils.tensorboard import SummaryWriter tb_writer = SummaryWriter(log_dir=tensorboard_log_dir) logger.info(f"TensorBoard日志目录: {tensorboard_log_dir}") except ImportError: tb_writer = None logger.warning("TensorBoard未安装,可视化功能不可用") return logger, tb_writer logger, tb_writer = setup_logging() # === 量化配置 - 使用更高效的配置 === quant_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) # === 加载模型 === logger.info("加载预训练模型...") model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True ) # === 分词器处理 === logger.info("加载分词器...") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) tokenizer.padding_side = "right" if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = tokenizer.eos_token_id # === 准备模型训练 === model = prepare_model_for_kbit_training(model, use_gradient_checkpointing=False) # 关闭梯度检查点以加速训练 # === LoRA 配置 - 增加参数以提升模型能力 === logger.info("配置LoRA...") lora_config = LoraConfig( r=128, # 增加rank以提升模型能力 lora_alpha=64, # 增加alpha值 target_modules=["q_proj", "v_proj", "k_proj", "o_proj"], # 增加更多目标模块 lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, lora_config) # 记录可训练参数 trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) total_params = sum(p.numel() for p in model.parameters()) logger.info(f"可训练参数: {trainable_params:,} / 总参数: {total_params:,} ({trainable_params / total_params:.2%})") # === 加载并预处理数据集 === logger.info("加载和预处理数据集...") dataset = load_dataset("json", data_files=DATASET_PATH, split="train") # 文本过滤函数 def is_valid_text(example): text = example.get("text", "") return text is not None and len(text.strip()) > 200 # 增加最小长度要求 dataset = dataset.filter(is_valid_text) logger.info(f"过滤后数据集大小: {len(dataset)} 条") # 分词函数 - 使用更大的上下文窗口 def tokenize_function(examples): tokenized = tokenizer( examples["text"], padding="max_length", truncation=True, max_length=2048, # 增加上下文长度 ) # 创建 labels - 因果语言建模需要 labels = input_ids tokenized["labels"] = tokenized["input_ids"].copy() return tokenized tokenized_dataset = dataset.map( tokenize_function, batched=True, remove_columns=["text"], batch_size=128, # 增加批处理大小以加速处理 num_proc=8, # 使用更多进程 ) # === 数据整理器 === data_collator = DataCollatorForLanguageModeling( tokenizer=tokenizer, mlm=False # 因果语言建模 ) # === 训练参数 - 优化以利用A100 80GB显存 === training_args = TrainingArguments( output_dir=OUTPUT_DIR, per_device_train_batch_size=16, # 大幅增加批次大小 gradient_accumulation_steps=1, # 减少梯度累积步数 learning_rate=2e-5, # 提高学习率 num_train_epochs=3, # 增加训练轮数 logging_steps=50, save_strategy="steps", save_steps=500, # 每500步保存一次 bf16=True, optim="paged_adamw_32bit", # 使用更稳定的优化器 report_to=["tensorboard"] if tb_writer else [], # 如果TensorBoard可用则报告 warmup_ratio=0.05, # 增加预热比例 gradient_checkpointing=False, # 关闭梯度检查点以加速训练 fp16=False, max_grad_norm=1.0, # 增加梯度裁剪阈值 remove_unused_columns=False, dataloader_num_workers=8, # 增加数据加载工作线程 evaluation_strategy="steps", # 添加验证步骤 eval_steps=500, # 每500步验证一次 save_total_limit=3, # 保存更多检查点 logging_dir=os.path.join(OUTPUT_DIR, "logs"), load_best_model_at_end=True, # 训练结束时加载最佳模型 ddp_find_unused_parameters=False, logging_first_step=True, group_by_length=True, # 按长度分组以加速训练 lr_scheduler_type="cosine", # 使用余弦学习率调度器 weight_decay=0.01, # 添加权重衰减 ) # === GPU监控工具 === def monitor_gpu(): """监控GPU使用情况""" if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024 ** 3 mem_reserved = torch.cuda.memory_reserved(device) / 1024 ** 3 mem_total = torch.cuda.get_device_properties(device).total_memory / 1024 ** 3 return { "allocated": f"{mem_alloc:.2f} GB", "reserved": f"{mem_reserved:.2f} GB", "total": f"{mem_total:.2f} GB", "utilization": f"{mem_alloc / mem_total * 100:.1f}%" } return {} # === 创建训练器 === trainer = Trainer( model=model, tokenizer=tokenizer, args=training_args, train_dataset=tokenized_dataset, eval_dataset=tokenized_dataset.select(range(100)) if len(tokenized_dataset) > 100 else None, data_collator=data_collator, ) # === 训练前验证 === def validate_data_and_model(): """验证数据和模型是否准备好训练""" logger.info("\n=== 训练前验证 ===") # 检查样本格式 sample = tokenized_dataset[0] logger.info(f"样本键: {list(sample.keys())}") logger.info(f"input_ids 长度: {len(sample['input_ids'])}") logger.info(f"labels 长度: {len(sample['labels'])}") # 创建测试批次 test_batch = data_collator([sample, tokenized_dataset[1]]) # 移动数据到设备 test_batch = {k: v.to(model.device) for k, v in test_batch.items()} # 前向传播测试 model.train() outputs = model(**test_batch) logger.info(f"测试批次损失: {outputs.loss.item():.4f}") # 记录到TensorBoard if tb_writer: tb_writer.add_scalar("debug/test_loss", outputs.loss.item(), 0) # 反向传播测试 outputs.loss.backward() logger.info("反向传播成功!") # 重置梯度 model.zero_grad() logger.info("验证完成,准备开始训练\n") # 记录初始GPU使用情况 gpu_status = monitor_gpu() logger.info(f"初始GPU状态: {gpu_status}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("system/initial_gpu", str(gpu_status), 0) validate_data_and_model() # === 自定义回调 - 监控资源使用 === class ResourceMonitorCallback(transformers.TrainerCallback): def __init__(self, tb_writer=None): self.tb_writer = tb_writer self.start_time = datetime.now() def on_step_end(self, args, state, control, **kwargs): if state.global_step % 50 == 0: # GPU监控 gpu_status = monitor_gpu() logger.info(f"Step {state.global_step} - GPU状态: {gpu_status}") # CPU和内存监控 cpu_percent = psutil.cpu_percent() mem = psutil.virtual_memory() logger.info( f"CPU使用率: {cpu_percent}%, 内存使用: {mem.used / 1024 ** 3:.2f}GB/{mem.total / 1024 ** 3:.2f}GB") # 记录到TensorBoard if self.tb_writer: # GPU显存使用 if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024 ** 3 self.tb_writer.add_scalar("system/gpu_mem", mem_alloc, state.global_step) # CPU使用率 self.tb_writer.add_scalar("system/cpu_usage", cpu_percent, state.global_step) # 系统内存使用 self.tb_writer.add_scalar("system/ram_usage", mem.used / 1024 ** 3, state.global_step) def on_log(self, args, state, control, logs=None, **kwargs): """记录训练指标到TensorBoard""" if self.tb_writer: for key, value in logs.items(): if "loss" in key or "lr" in key or "grad_norm" in key: self.tb_writer.add_scalar(f"train/{key}", value, state.global_step) def on_train_end(self, args, state, control, **kwargs): """训练结束时记录总时间""" training_time = datetime.now() - self.start_time logger.info(f"训练总时间: {training_time}") if self.tb_writer: self.tb_writer.add_text("system/total_time", str(training_time)) # 添加回调 trainer.add_callback(ResourceMonitorCallback(tb_writer)) # === 启动训练 === try: logger.info("开始训练...") train_result = trainer.train() # 保存训练指标 metrics = train_result.metrics trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) # 保存最佳模型 trainer.save_model(OUTPUT_DIR) tokenizer.save_pretrained(OUTPUT_DIR) logger.info(f"训练完成! 模型保存在: {OUTPUT_DIR}") # 记录最终指标到TensorBoard if tb_writer: for key, value in metrics.items(): tb_writer.add_scalar(f"final/{key}", value) tb_writer.close() except Exception as e: logger.error(f"训练出错: {e}") import traceback logger.error(traceback.format_exc()) # 尝试小批量训练 logger.info("\n尝试小批量训练...") small_dataset = tokenized_dataset.select(range(100)) trainer.train_dataset = small_dataset trainer.train() # 保存模型 trainer.save_model(f"{OUTPUT_DIR}_small") tokenizer.save_pretrained(f"{OUTPUT_DIR}_small") logger.info(f"小批量训练完成! 模型保存在: {OUTPUT_DIR}_small") # 记录错误到TensorBoard if tb_writer: tb_writer.add_text("error/exception", traceback.format_exc()) # 清理内存 del model del trainer gc.collect() torch.cuda.empty_cache() # === 训练后验证 === def validate_final_model(): """验证训练后的模型""" logger.info("\n=== 训练后验证 ===") # 加载保存的模型 from peft import PeftModel, PeftConfig # 仅加载基础模型配置 base_model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True, load_in_4bit=True ) # 加载LoRA适配器 peft_model = PeftModel.from_pretrained(base_model, OUTPUT_DIR) # 合并LoRA权重 merged_model = peft_model.merge_and_unload() # 测试生成 prompt = "中国的首都是" inputs = tokenizer(prompt, return_tensors="pt").to(merged_model.device) outputs = merged_model.generate( **inputs, max_new_tokens=100, # 生成长度增加 temperature=0.7, top_p=0.9, # 添加top-p采样 repetition_penalty=1.2, # 添加重复惩罚 do_sample=True ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"提示: {prompt}") logger.info(f"生成结果: {generated}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("validation/sample", f"提示: {prompt}\n生成: {generated}") # 更全面的测试 test_prompts = [ "人工智能的未来发展趋势是", "如何学习深度学习?", "写一个关于太空探索的短故事:" ] for i, prompt in enumerate(test_prompts): inputs = tokenizer(prompt, return_tensors="pt").to(merged_model.device) outputs = merged_model.generate( **inputs, max_new_tokens=150, temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"\n提示: {prompt}\n生成: {generated}\n{'=' * 50}") # 记录到TensorBoard if tb_writer: tb_writer.add_text(f"validation/test_{i}", f"提示: {prompt}\n生成: {generated}") logger.info("验证完成") # 执行验证 validate_final_model() # 关闭TensorBoard写入器 if tb_writer: tb_writer.close() logger.info("TensorBoard日志已关闭") 包含'except ImportError'的try 块中的'SummaryWriter'也应在except块中定义:49 在'Trainer |Trainer'中找不到引用'log_metrics':313 在'Trainer|Trainer'中找不到引用'save_metrics':314 未使用的import 语句'PeftConfig':362 从外部作用域隐藏名称'logger':41 从外部作用域隐藏名称'tb writer':50 从外部作用域隐藏名称'tb_writer':53 从外部作用域隐藏名称'tb_writer':258 从外部作用域隐藏名称'key':291 从外部作用域隐藏名称value':291 拼写错误:在单词'vipuser'中:20 拼写错误:在单词'asctime'中:35 拼写错误:在单词'levelname'中:35 拼写错误:在单词'adamw'中:161

import locale locale.setlocale(locale.LC_ALL, 'chinese') import tkinter as tk from tkinter import filedialog, messagebox, ttk, scrolledtext import csv from datetime import datetime import logging import os from collections import defaultdict class CSVProcessorApp: def __init__(self, root): self.root = root self.root.title("CSV_ProcessPro") self.root.geometry("800x600") # 调整窗口大小 self.root.resizable(False, False) # 禁止调整窗口大小 # 初始化变量 self.file_path = tk.StringVar() self.csv_data = [] self.headers = [] self.setup_variables() # 初始化日志 self.setup_logging() # 创建UI self.create_widgets() # 设置全局样式 self.setup_styles() def setup_styles(self): """设置全局样式""" self.style = ttk.Style() self.style.configure("TFrame", background="#f0f0f0") self.style.configure("TLabel", background="#f0f0f0", font=('Arial', 9)) self.style.configure("TButton", font=('Arial', 9, 'bold')) self.style.configure("Accent.TButton", foreground="black", font=('Arial', 9, 'bold'), borderwidth=2, relief="raised") self.style.map("Accent.TButton", background=[("active", "#4a90e2"), ("!active", "#d4e6ff")], bordercolor=[("active", "#4a90e2"), ("!active", "#ffcc00")]) self.style.configure("Remove.TButton", foreground="black", font=('Arial', 8), background="#ffcccc", borderwidth=1, relief="solid") self.style.map("Remove.TButton", background=[("active", "#ff9999"), ("!active", "#ffcccc")]) def setup_variables(self): """初始化所有动态变量""" # 排序相关 self.sort_header = tk.StringVar() self.sort_order = tk.StringVar(value="升序") # 去重相关 self.dedupe_header = tk.StringVar() # 删除行相关 self.delete_keyword = tk.StringVar() self.delete_column = tk.StringVar() self.delete_case_sensitive = tk.BooleanVar() # 合并文件相关 self.merge_file_paths = [] self.merge_column = tk.StringVar() # 状态变量 self.enable_sort = tk.BooleanVar() self.enable_dedupe = tk.BooleanVar() self.enable_custom_letter_sort = tk.BooleanVar() self.letter_range_start = tk.StringVar(value="A") self.letter_range_end = tk.StringVar(value="Z") def setup_logging(self): """配置日志记录""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('csv_processor.log', encoding='utf-8'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) self.logger.info("===== 程序启动 =====") def create_widgets(self): """创建所有界面组件""" # 主容器 main_container = ttk.Frame(self.root, padding=5) main_container.pack(fill=tk.BOTH, expand=True) # 使用notebook分页组织功能 self.notebook = ttk.Notebook(main_container) self.notebook.pack(fill=tk.BOTH, expand=True) # 创建各个标签页 self.create_file_tab() self.create_process_tab() self.create_delete_tab() self.create_merge_tab() self.create_log_tab() def create_file_tab(self): """创建文件操作标签页""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="文件操作") # 文件选择部分 frame = ttk.LabelFrame(tab, text="CSV文件选择", padding=10) frame.pack(fill=tk.X, padx=5, pady=5) ttk.Label(frame, text="文件路径:").grid(row=0, column=0, sticky=tk.W) ttk.Entry(frame, textvariable=self.file_path, width=40).grid(row=0, column=1, sticky=tk.EW) ttk.Button(frame, text="浏览", command=self.select_file).grid(row=0, column=2, padx=5) # 文件信息显示 self.file_info = scrolledtext.ScrolledText(tab, height=8, width=80) self.file_info.pack(fill=tk.X, padx=5, pady=5) def create_process_tab(self): """创建数据处理标签页""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="排序/去重") # 排序选项部分 frame = ttk.LabelFrame(tab, text="排序选项", padding=10) frame.pack(fill=tk.X, padx=5, pady=5) ttk.Checkbutton(frame, text="启用排序", variable=self.enable_sort, command=self.toggle_sort).grid(row=0, column=0, sticky=tk.W) ttk.Label(frame, text="排序表头:").grid(row=1, column=0, sticky=tk.W) self.sort_header_combobox = ttk.Combobox(frame, textvariable=self.sort_header, state="readonly") self.sort_header_combobox.grid(row=1, column=1, sticky=tk.EW) ttk.Label(frame, text="排序方式:").grid(row=2, column=0, sticky=tk.W) self.sort_order_combobox = ttk.Combobox(frame, textvariable=self.sort_order, values=["升序", "降序", "自定义字母排序"]) self.sort_order_combobox.grid(row=2, column=1, sticky=tk.W) # 自定义字母排序范围 ttk.Checkbutton(frame, text="启用字母范围过滤", variable=self.enable_custom_letter_sort, command=self.toggle_letter_sort).grid(row=3, column=0, sticky=tk.W) ttk.Label(frame, text="字母范围:").grid(row=4, column=0, sticky=tk.W) ttk.Entry(frame, textvariable=self.letter_range_start, width=5).grid(row=4, column=1, sticky=tk.W) ttk.Label(frame, text="到").grid(row=4, column=2) ttk.Entry(frame, textvariable=self.letter_range_end, width=5).grid(row=4, column=3, sticky=tk.W) # 去重选项部分 frame = ttk.LabelFrame(tab, text="去重选项", padding=10) frame.pack(fill=tk.X, padx=5, pady=5) ttk.Checkbutton(frame, text="启用去重", variable=self.enable_dedupe, command=self.toggle_dedupe).grid(row=0, column=0, sticky=tk.W) ttk.Label(frame, text="去重表头:").grid(row=1, column=0, sticky=tk.W) self.dedupe_header_combobox = ttk.Combobox(frame, textvariable=self.dedupe_header, state="readonly") self.dedupe_header_combobox.grid(row=1, column=1, sticky=tk.EW) # 处理按钮 btn_frame = ttk.Frame(tab) btn_frame.pack(pady=10) ttk.Button(btn_frame, text="处理并保存到桌面", command=self.process_csv, style="Accent.TButton").pack() def create_delete_tab(self): """创建删除行标签页""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="删除行") frame = ttk.LabelFrame(tab, text="删除包含指定字符的行", padding=10) frame.pack(fill=tk.X, padx=5, pady=5) # 删除条件设置 ttk.Label(frame, text="搜索列:").grid(row=0, column=0, sticky=tk.W) self.delete_column_combobox = ttk.Combobox(frame, textvariable=self.delete_column, state="readonly") self.delete_column_combobox.grid(row=0, column=1, sticky=tk.EW) ttk.Label(frame, text="关键字:").grid(row=1, column=0, sticky=tk.W) ttk.Entry(frame, textvariable=self.delete_keyword).grid(row=1, column=1, sticky=tk.EW) ttk.Checkbutton(frame, text="区分大小写", variable=self.delete_case_sensitive).grid(row=2, column=0, sticky=tk.W) # 执行按钮 btn_frame = ttk.Frame(tab) btn_frame.pack(pady=10) ttk.Button(btn_frame, text="执行删除并保存到桌面", command=self.delete_rows_with_keyword, style="Accent.TButton").pack() def create_merge_tab(self): """创建文件合并标签页""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="文件合并") # 合并文件部分 frame = ttk.LabelFrame(tab, text="合并CSV文件", padding=10) frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # 文件列表容器 list_frame = ttk.Frame(frame) list_frame.pack(fill=tk.BOTH, expand=True) ttk.Label(list_frame, text="已选择文件:").grid(row=0, column=0, sticky=tk.W) # 文件列表和滚动条 self.merge_file_canvas = tk.Canvas(list_frame, height=150) self.merge_file_canvas.grid(row=1, column=0, sticky=tk.EW) scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.merge_file_canvas.yview) scrollbar.grid(row=1, column=1, sticky=tk.NS) self.merge_file_canvas.configure(yscrollcommand=scrollbar.set) self.merge_file_frame = ttk.Frame(self.merge_file_canvas) self.merge_file_canvas.create_window((0, 0), window=self.merge_file_frame, anchor="nw") # 按钮区域 btn_frame = ttk.Frame(frame) btn_frame.pack(fill=tk.X, pady=5) ttk.Button(btn_frame, text="添加文件", command=self.add_merge_file).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="清空列表", command=self.clear_merge_list).pack(side=tk.LEFT, padx=5) # 合并选项 opt_frame = ttk.Frame(frame) opt_frame.pack(fill=tk.X, pady=5) ttk.Label(opt_frame, text="合并依据列(可选):").grid(row=0, column=0, sticky=tk.W) self.merge_column_combo = ttk.Combobox(opt_frame, textvariable=self.merge_column, state="readonly") self.merge_column_combo.grid(row=0, column=1, sticky=tk.EW) # 合并按钮 btn_frame = ttk.Frame(tab) btn_frame.pack(pady=10) ttk.Button(btn_frame, text="执行合并并保存到桌面", command=self.merge_csv_files, style="Accent.TButton").pack() def create_log_tab(self): """创建日志标签页""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="运行日志") self.log_text = scrolledtext.ScrolledText(tab, height=15, width=80) self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) def log_message(self, message, level="info"): """记录日志并显示在GUI中""" log_methods = { "info": self.logger.info, "error": self.logger.error, "warning": self.logger.warning } # 记录到日志文件 log_methods.get(level, self.logger.info)(message) # 显示在GUI日志标签页 timestamp = datetime.now().strftime("%H:%M:%S") tagged_msg = f"[{timestamp}] {message}" self.log_text.insert(tk.END, tagged_msg + "\n") self.log_text.see(tk.END) # 同时在文件信息标签页显示重要信息 if level in ["error", "warning"]: self.file_info.config(state=tk.NORMAL) self.file_info.insert(tk.END, tagged_msg + "\n") self.file_info.config(state=tk.DISABLED) self.file_info.see(tk.END) def select_file(self): """选择CSV文件""" file_path = filedialog.askopenfilename( title="选择CSV文件", filetypes=[("CSV文件", "*.csv"), ("文本文件", "*.txt"), ("所有文件", "*.*")] ) if file_path: self.file_path.set(file_path) self.log_message(f"已选择文件: {file_path}") self.load_csv(file_path) def load_csv(self, file_path): """加载CSV文件内容""" try: with open(file_path, 'r', encoding='utf-8-sig') as file: reader = csv.reader(file) self.csv_data = list(reader) if len(self.csv_data) > 0: self.headers = self.csv_data[0] self.update_ui_with_headers() # 显示文件信息 self.show_file_info(file_path) self.log_message(f"文件加载成功,共 {len(self.csv_data)} 行") else: raise ValueError("文件为空或格式不正确") except Exception as e: error_msg = f"读取CSV文件失败: {str(e)}" self.log_message(error_msg, "error") messagebox.showerror("错误", error_msg) def show_file_info(self, file_path): """显示文件信息""" self.file_info.config(state=tk.NORMAL) self.file_info.delete(1.0, tk.END) info = [ f"文件路径: {file_path}", f"总行数: {len(self.csv_data)}", f"列数: {len(self.headers)}", f"表头: {', '.join(self.headers)}", "="*40, "前5行数据预览:" ] self.file_info.insert(tk.END, "\n".join(info) + "\n") # 显示前5行数据 for i, row in enumerate(self.csv_data[:5], 1): self.file_info.insert(tk.END, f"{i}. {', '.join(row)}\n") self.file_info.config(state=tk.DISABLED) def update_ui_with_headers(self): """根据加载的CSV更新UI元素""" # 更新所有下拉框 for combo in [ self.sort_header_combobox, self.dedupe_header_combobox, self.delete_column_combobox, self.merge_column_combo ]: combo['values'] = self.headers # 设置默认值 if self.headers: self.sort_header.set(self.headers[0]) self.dedupe_header.set(self.headers[0]) self.delete_column.set(self.headers[0]) self.merge_column.set("") def toggle_sort(self): """切换排序功能的启用状态""" state = "normal" if self.enable_sort.get() else "disabled" self.sort_header_combobox['state'] = state self.sort_order_combobox['state'] = state self.toggle_letter_sort() self.log_message(f"排序功能 {'启用' if self.enable_sort.get() else '禁用'}") def toggle_dedupe(self): """切换去重功能的启用状态""" state = "normal" if self.enable_dedupe.get() else "disabled" self.dedupe_header_combobox['state'] = state self.log_message(f"去重功能 {'启用' if self.enable_dedupe.get() else '禁用'}") def toggle_letter_sort(self): """控制字母范围输入框的启用状态""" if not self.enable_sort.get(): return state = "normal" if self.enable_custom_letter_sort.get() else "disabled" self.letter_range_start_entry['state'] = state self.letter_range_end_entry['state'] = state self.log_message(f"字母范围过滤 {'启用' if self.enable_custom_letter_sort.get() else '禁用'}") def add_merge_file(self): """添加要合并的文件""" file_paths = filedialog.askopenfilenames( title="选择要合并的CSV文件", filetypes=[("CSV文件", "*.csv"), ("文本文件", "*.txt"), ("所有文件", "*.*")] ) if file_paths: for path in file_paths: if path not in self.merge_file_paths: self.merge_file_paths.append(path) self.update_merge_file_list() def clear_merge_list(self): """清空合并文件列表""" if self.merge_file_paths: self.merge_file_paths = [] self.update_merge_file_list() self.log_message("已清空合并文件列表") def update_merge_file_list(self): """更新合并文件列表显示""" # 清除现有内容 for widget in self.merge_file_frame.winfo_children(): widget.destroy() if not self.merge_file_paths: ttk.Label(self.merge_file_frame, text="尚未选择任何文件").pack() self.merge_file_canvas.configure(scrollregion=self.merge_file_canvas.bbox("all")) return # 添加文件列表 for i, path in enumerate(self.merge_file_paths): row_frame = ttk.Frame(self.merge_file_frame) row_frame.pack(fill=tk.X, pady=2) ttk.Label(row_frame, text=f"{i+1}. {os.path.basename(path)}", width=40, anchor="w").pack(side=tk.LEFT) ttk.Button(row_frame, text="移除", command=lambda p=path: self.remove_merge_file(p), style="Remove.TButton").pack(side=tk.LEFT, padx=2) # 更新滚动区域 self.merge_file_frame.update_idletasks() self.merge_file_canvas.configure(scrollregion=self.merge_file_canvas.bbox("all")) def remove_merge_file(self, file_path): """移除指定的合并文件""" if file_path in self.merge_file_paths: self.merge_file_paths.remove(file_path) self.update_merge_file_list() self.log_message(f"已移除文件: {file_path}") def delete_rows_with_keyword(self): """删除包含关键字的行并保存到桌面""" if not self.file_path.get(): messagebox.showwarning("警告", "请先选择CSV文件") return column = self.delete_column.get() keyword = self.delete_keyword.get() if not column: messagebox.showwarning("警告", "请选择要搜索的列") return if not keyword: messagebox.showwarning("警告", "请输入要搜索的关键字") return try: col_index = self.headers.index(column) case_sensitive = self.delete_case_sensitive.get() if not case_sensitive: keyword = keyword.lower() new_data = [self.headers] deleted_count = 0 for row in self.csv_data[1:]: if len(row) > col_index: value = row[col_index] compare_value = value if case_sensitive else value.lower() if keyword not in compare_value: new_data.append(row) else: deleted_count += 1 if deleted_count == 0: messagebox.showinfo("信息", "没有找到包含关键字的行") return # 生成保存路径 operation = f"deleted_{keyword}" save_path = self.generate_filename(self.file_path.get(), operation) # 保存文件 self.save_csv_file(new_data, save_path) # 更新当前数据 self.csv_data = new_data self.log_message(f"已删除 {deleted_count} 行包含关键字 '{keyword}' 的数据") messagebox.showinfo("成功", f"已删除 {deleted_count} 行,结果已保存到桌面:\n{os.path.basename(save_path)}") except Exception as e: error_msg = f"删除行时出错: {str(e)}" self.log_message(error_msg, "error") messagebox.showerror("错误", error_msg) def get_desktop_path(self): """获取桌面路径""" try: desktop = os.path.join(os.path.join(os.environ['USERPROFILE']), 'Desktop') if os.path.exists(desktop): return desktop except KeyError: pass # 如果上面的方法失败,尝试其他方法 desktop = os.path.join(os.path.expanduser('~'), 'Desktop') if os.path.exists(desktop): return desktop # 如果还是失败,返回当前目录 return os.getcwd() def generate_filename(self, original_name, operation): """生成新的文件名""" if not original_name: original_name = "processed" base = os.path.basename(original_name) name, ext = os.path.splitext(base) # 清理操作名称中的特殊字符 clean_op = "".join(c if c.isalnum() else "_" for c in operation) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") new_name = f"{name}_{clean_op}_{timestamp}{ext}" return os.path.join(self.get_desktop_path(), new_name) def save_csv_file(self, data, save_path): """保存CSV文件到指定路径""" try: with open(save_path, 'w', encoding='utf-8-sig', newline='') as file: writer = csv.writer(file) writer.writerows(data) # 更新文件信息显示 self.show_file_info(save_path) self.log_message(f"文件已保存到: {save_path}") return True except Exception as e: error_msg = f"保存文件时出错: {str(e)}" self.log_message(error_msg, "error") messagebox.showerror("错误", error_msg) return False def process_csv(self): """处理CSV文件(排序、去重等)并保存到桌面""" if not self.file_path.get(): messagebox.showwarning("警告", "请先选择CSV文件") return if not self.csv_data: messagebox.showwarning("警告", "CSV文件没有数据") return self.log_message("开始处理CSV文件...") new_data = [self.headers.copy()] data_rows = self.csv_data[1:] # 去重处理 if self.enable_dedupe.get(): try: dedupe_index = self.headers.index(self.dedupe_header.get()) seen = set() unique_rows = [] for row in data_rows: if len(row) > dedupe_index: key = row[dedupe_index] if key not in seen: seen.add(key) unique_rows.append(row) data_rows = unique_rows self.log_message(f"去重完成,根据表头 '{self.dedupe_header.get()}' 删除重复项") except Exception as e: self.log_message(f"去重时出错: {str(e)}", "error") messagebox.showerror("错误", f"去重时出错: {str(e)}") return # 排序处理 if self.enable_sort.get(): try: sort_index = self.headers.index(self.sort_header.get()) order = self.sort_order.get() reverse = (order == "降序") # 字母范围过滤 if self.enable_custom_letter_sort.get(): try: letter_start = self.letter_range_start.get().upper() letter_end = self.letter_range_end.get().upper() if not (len(letter_start) == 1 and len(letter_end) == 1 and letter_start.isalpha() and letter_end.isalpha()): raise ValueError("字母范围必须是单个字母(如A-Z)") filtered_rows = [] for row in data_rows: if len(row) > sort_index: value = str(row[sort_index]).strip().upper() if value and letter_start <= value[0] <= letter_end: filtered_rows.append(row) data_rows = filtered_rows self.log_message(f"字母范围过滤完成:{letter_start} 到 {letter_end}") except Exception as e: self.log_message(f"字母范围过滤失败: {str(e)}", "error") messagebox.showerror("错误", f"字母范围过滤失败: {str(e)}") return # 排序逻辑 def sort_key(row): if len(row) > sort_index: value = row[sort_index] # 尝试解析为日期 for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%m/%d/%Y", "%Y.%m.%d"): try: return datetime.strptime(value, fmt) except ValueError: continue # 尝试解析为数字 try: return float(value) except ValueError: pass return value.lower() # 默认按字符串排序 return "" try: if order == "自定义字母排序": data_rows.sort(key=lambda x: str(sort_key(x)).lower() if len(x) > sort_index else "", reverse=False) else: data_rows.sort(key=sort_key, reverse=reverse) self.log_message(f"排序完成,表头 '{self.sort_header.get()}',顺序: {order}") except Exception as e: self.log_message(f"排序时出错: {str(e)}", "error") messagebox.showerror("错误", f"排序时出错: {str(e)}") return except Exception as e: self.log_message(f"排序处理时出错: {str(e)}", "error") messagebox.showerror("错误", f"排序处理时出错: {str(e)}") return # 组合最终数据 new_data.extend(data_rows) # 生成操作描述 operations = [] if self.enable_sort.get(): operations.append(f"sorted_{self.sort_header.get()}_{self.sort_order.get()}") if self.enable_dedupe.get(): operations.append(f"deduped_{self.dedupe_header.get()}") operation = "_".join(operations) if operations else "processed" # 生成保存路径 save_path = self.generate_filename(self.file_path.get(), operation) # 保存文件 if self.save_csv_file(new_data, save_path): # 更新当前数据 self.csv_data = new_data messagebox.showinfo("成功", f"文件处理完成,已保存到桌面:\n{os.path.basename(save_path)}") def merge_csv_files(self): """合并多个CSV文件并保存到桌面""" if not self.merge_file_paths: messagebox.showwarning("警告", "请先添加要合并的文件") return try: # 检查所有文件是否存在 missing_files = [f for f in self.merge_file_paths if not os.path.exists(f)] if missing_files: raise FileNotFoundError(f"以下文件不存在: {', '.join(missing_files)}") merge_column = self.merge_column.get() common_headers = None all_data = [] # 收集所有文件的表头和数据 header_sets = [] for file_path in self.merge_file_paths: with open(file_path, 'r', encoding='utf-8-sig') as file: reader = csv.reader(file) data = list(reader) if data: header_sets.append(set(data[0])) all_data.append(data) # 找出共同表头 if header_sets: common_headers = set(header_sets[0]) for headers in header_sets[1:]: common_headers.intersection_update(headers) common_headers = sorted(common_headers) if not common_headers: raise ValueError("选中的文件没有共同的列,无法合并") # 如果没有指定合并依据列,使用所有共同列 merge_indices = None if merge_column: if merge_column not in common_headers: raise ValueError(f"合并依据列 '{merge_column}' 不在共同列中") merge_indices = [i for i, h in enumerate(common_headers) if h == merge_column] # 合并数据 merged_data = [common_headers.copy()] key_counter = defaultdict(int) for data in all_data: if not data: continue headers = data[0] header_map = {h: i for i, h in enumerate(headers)} for row in data[1:]: # 如果指定了合并列,检查是否已存在相同键 if merge_indices: merge_values = [row[header_map[h]] for h in common_headers if h == merge_column] if merge_values: key = tuple(merge_values) key_counter[key] += 1 if key_counter[key] > 1: continue # 跳过重复键的行 # 构建新行,只保留共同列 new_row = [] for col in common_headers: if col in header_map and len(row) > header_map[col]: new_row.append(row[header_map[col]]) else: new_row.append("") merged_data.append(new_row) # 生成操作描述 operation = "merged" if merge_column: operation += f"_by_{merge_column}" # 生成保存路径 first_file = os.path.basename(self.merge_file_paths[0]) save_path = self.generate_filename(first_file, operation) # 保存文件 if self.save_csv_file(merged_data, save_path): messagebox.showinfo("成功", f"文件合并完成,已保存到桌面:\n{os.path.basename(save_path)}") except Exception as e: error_msg = f"合并文件时出错: {str(e)}" self.log_message(error_msg, "error") messagebox.showerror("错误", error_msg) if __name__ == "__main__": root = tk.Tk() app = CSVProcessorApp(root) root.mainloop() 优化这个代码逻辑,新增以下功能,并输出完整代码 1.排序和去重功能中增加自定义指定某行为表头的功能 2.优化功能处理顺序,新增一个可以依次执行删除行,再排序,在去重的功能,最后在桌面上输出一份处理后的新的csv数据。 3.保留目前代码中的所有功能和现在的也可以单一执行某中功能的功能

最新推荐

recommend-type

开发界面语义化:声控 + 画图协同生成代码.doc

开发界面语义化:声控 + 画图协同生成代码.doc
recommend-type

LABVIEW与三菱PLC通信:实现数据批量读写的高效库解决方案

如何通过LabVIEW与三菱PLC建立高效的通信桥梁,实现数据批量读写。首先概述了LabVIEW和三菱PLC的基本概念及其在工业自动化中的重要性。接着重点讲解了利用Modbus RTU协议构建通信连接的具体步骤和技术细节,包括初始化通信、发送读写请求、处理响应数据和关闭连接等功能。文中还提供了一个简化的代码示例,展示了如何在LabVIEW环境中实现这一过程。最后对这项技术进行了总结和展望,强调其在提高数据交互效率方面的潜力以及未来的广泛应用前景。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些熟悉LabVIEW或三菱PLC的人士。 使用场景及目标:适用于需要频繁进行数据交互的工业控制系统,如生产线监控、设备状态监测等场合。主要目的是提升数据传输的速度和可靠性,从而优化整个系统的运行效率。 阅读建议:读者可以通过本文深入了解LabVIEW与三菱PLC通信的实现方法,掌握批量数据读写库的设计思路,并将其应用于实际工程项目中。建议边阅读边尝试动手实践相关代码,以便更好地理解和吸收所学知识。
recommend-type

欧姆龙PLC NJ系列模切机程序:高级伺服运动与张力控制的应用实例

欧姆龙PLC NJ系列模切机项目的编程细节及其关键技术。主要内容涵盖12轴EtherCAT总线伺服运动控制,包括回零、点动、定位和速度控制;张力控制采用PID算法并进行收放卷径计算;隔膜自动纠偏控制利用模拟量数据平均化处理;同步运动控制实现凸轮表追剪和裁切;以及结构化编程和ST语言功能块的使用。项目结构规范,注释详尽,有助于理解和维护代码。通过本项目的学习,可以掌握PLC高端复杂应用的实际操作技能。 适合人群:从事工业自动化领域的工程师和技术人员,特别是对PLC编程和伺服运动控制有浓厚兴趣的人群。 使用场景及目标:适用于需要深入了解PLC编程技巧和自动化控制系统原理的技术人员。目标是提升编程能力和对复杂自动化系统的工作机制的理解。 其他说明:本文不仅提供具体的编程指导,还强调了项目管理和代码规范的重要性,为读者提供了全面的学习体验。
recommend-type

大班主题性区域活动计划表.doc

大班主题性区域活动计划表.doc
recommend-type

高校教研室工作计划.doc

高校教研室工作计划.doc
recommend-type

Python程序TXLWizard生成TXL文件及转换工具介绍

### 知识点详细说明: #### 1. 图形旋转与TXL向导 图形旋转是图形学领域的一个基本操作,用于改变图形的方向。在本上下文中,TXL向导(TXLWizard)是由Esteban Marin编写的Python程序,它实现了特定的图形旋转功能,主要用于电子束光刻掩模的生成。光刻掩模是半导体制造过程中非常关键的一个环节,它确定了在硅片上沉积材料的精确位置。TXL向导通过生成特定格式的TXL文件来辅助这一过程。 #### 2. TXL文件格式与用途 TXL文件格式是一种基于文本的文件格式,它设计得易于使用,并且可以通过各种脚本语言如Python和Matlab生成。这种格式通常用于电子束光刻中,因为它的文本形式使得它可以通过编程快速创建复杂的掩模设计。TXL文件格式支持引用对象和复制对象数组(如SREF和AREF),这些特性可以用于优化电子束光刻设备的性能。 #### 3. TXLWizard的特性与优势 - **结构化的Python脚本:** TXLWizard 使用结构良好的脚本来创建遮罩,这有助于开发者创建清晰、易于维护的代码。 - **灵活的Python脚本:** 作为Python程序,TXLWizard 可以利用Python语言的灵活性和强大的库集合来编写复杂的掩模生成逻辑。 - **可读性和可重用性:** 生成的掩码代码易于阅读,开发者可以轻松地重用和修改以适应不同的需求。 - **自动标签生成:** TXLWizard 还包括自动为图形对象生成标签的功能,这在管理复杂图形时非常有用。 #### 4. TXL转换器的功能 - **查看.TXL文件:** TXL转换器(TXLConverter)允许用户将TXL文件转换成HTML或SVG格式,这样用户就可以使用任何现代浏览器或矢量图形应用程序来查看文件。 - **缩放和平移:** 转换后的文件支持缩放和平移功能,这使得用户在图形界面中更容易查看细节和整体结构。 - **快速转换:** TXL转换器还提供快速的文件转换功能,以实现有效的蒙版开发工作流程。 #### 5. 应用场景与技术参考 TXLWizard的应用场景主要集中在电子束光刻技术中,特别是用于设计和制作半导体器件时所需的掩模。TXLWizard作为一个向导,不仅提供了生成TXL文件的基础框架,还提供了一种方式来优化掩模设计,提高光刻过程的效率和精度。对于需要进行光刻掩模设计的工程师和研究人员来说,TXLWizard提供了一种有效的方法来实现他们的设计目标。 #### 6. 系统开源特性 标签“系统开源”表明TXLWizard遵循开放源代码的原则,这意味着源代码对所有人开放,允许用户自由地查看、修改和分发软件。开源项目通常拥有活跃的社区,社区成员可以合作改进软件,添加新功能,或帮助解决遇到的问题。这种开放性促进了技术创新,并允许用户根据自己的需求定制软件。 #### 7. 压缩包子文件的文件名称列表 文件名称列表中的“txlwizard-master”可能指的是TXLWizard项目的主版本库或主分支。这个名称表明了这是项目源代码的中心点,其他开发者会从这个主分支拉取代码进行合作开发或部署。以“-master”结尾通常是版本控制系统中表示主要开发线路的常见约定,例如Git中的master(现在更常被称为main)分支。 通过这些知识点的详细解释,我们可以看到TXLWizard不仅是一个用于生成TXL文件的工具,它还整合了一系列的功能,使得电子束光刻掩模的设计工作更为高效和直观。同时,作为一个开源项目,它能够借助社区的力量不断进步,为用户带来更多的便利和创新。
recommend-type

【创新图生成:扣子平台的技术前沿与创新思维】:引领图像生成技术的新潮流

# 1. 扣子平台的图像生成技术概述 在当今数字化时代,图像生成技术正成为推动技术创新和社会变革的关键力量。扣子平台正是这一技术革新的代表。本章将对扣子平台的图像生成技术进行一个全面概述,探究它如何将理论技术转化为实际应用,并带来显著的社会影响。 扣子平台运用了先进的图像生成技术,将现实世界和数字空间无缝融合,打造了沉浸式的视觉体验。其技术的核心是通过高度复杂的算法模型,快速生成高质
recommend-type

海康威视机器视觉工程师考核

<think>好的,我现在需要帮助用户了解海康威视机器视觉工程师的考核内容和要求。根据用户提供的引用资料,特别是引用[1]和[2],里面提到了考核素材包分为初级和中级,涵盖理论、算法、应用案例等。首先,我要整理这些信息,确保结构清晰,符合用户要求的格式。 接下来,我需要确认素材包的具体内容,比如初级和中级的不同点。引用[2]提到初级包含基础理论、算法实现和实际案例,中级则增加复杂算法和项目分析。这部分需要分点说明,方便用户理解层次。 另外,用户可能想知道如何准备考核,比如下载素材、学习顺序、模拟考核等,引用[2]中有使用说明和注意事项,这部分也要涵盖进去。同时要注意提醒用户考核窗口已关闭,
recommend-type

Linux环境下Docker Hub公共容器映像检测工具集

在给出的知识点中,我们需要详细解释有关Docker Hub、公共容器映像、容器编排器以及如何与这些工具交互的详细信息。同时,我们会涵盖Linux系统下的相关操作和工具使用,以及如何在ECS和Kubernetes等容器编排工具中运用这些检测工具。 ### Docker Hub 和公共容器映像 Docker Hub是Docker公司提供的一项服务,它允许用户存储、管理以及分享Docker镜像。Docker镜像可以视为应用程序或服务的“快照”,包含了运行特定软件所需的所有必要文件和配置。公共容器映像指的是那些被标记为公开可见的Docker镜像,任何用户都可以拉取并使用这些镜像。 ### 静态和动态标识工具 静态和动态标识工具在Docker Hub上用于识别和分析公共容器映像。静态标识通常指的是在不运行镜像的情况下分析镜像的元数据和内容,例如检查Dockerfile中的指令、环境变量、端口映射等。动态标识则需要在容器运行时对容器的行为和性能进行监控和分析,如资源使用率、网络通信等。 ### 容器编排器与Docker映像 容器编排器是用于自动化容器部署、管理和扩展的工具。在Docker环境中,容器编排器能够自动化地启动、停止以及管理容器的生命周期。常见的容器编排器包括ECS和Kubernetes。 - **ECS (Elastic Container Service)**:是由亚马逊提供的容器编排服务,支持Docker容器,并提供了一种简单的方式来运行、停止以及管理容器化应用程序。 - **Kubernetes**:是一个开源平台,用于自动化容器化应用程序的部署、扩展和操作。它已经成为容器编排领域的事实标准。 ### 如何使用静态和动态标识工具 要使用这些静态和动态标识工具,首先需要获取并安装它们。从给定信息中了解到,可以通过克隆仓库或下载压缩包并解压到本地系统中。之后,根据需要针对不同的容器编排环境(如Dockerfile、ECS、Kubernetes)编写配置,以集成和使用这些检测工具。 ### Dockerfile中的工具使用 在Dockerfile中使用工具意味着将检测工具的指令嵌入到构建过程中。这可能包括安装检测工具的命令、运行容器扫描的步骤,以及将扫描结果集成到镜像构建流程中,确保只有通过安全和合规检查的容器镜像才能被构建和部署。 ### ECS与Kubernetes中的工具集成 在ECS或Kubernetes环境中,工具的集成可能涉及到创建特定的配置文件、定义服务和部署策略,以及编写脚本或控制器来自动执行检测任务。这样可以在容器编排的过程中实现实时监控,确保容器编排器只使用符合预期的、安全的容器镜像。 ### Linux系统下的操作 在Linux系统下操作这些工具,用户可能需要具备一定的系统管理和配置能力。这包括使用Linux命令行工具、管理文件系统权限、配置网络以及安装和配置软件包等。 ### 总结 综上所述,Docker Hub上的静态和动态标识工具提供了一种方法来检测和分析公共容器映像,确保这些镜像的安全性和可靠性。这些工具在Linux开发环境中尤为重要,因为它们帮助开发人员和运维人员确保他们的容器映像满足安全要求。通过在Dockerfile、ECS和Kubernetes中正确使用这些工具,可以提高应用程序的安全性,减少由于使用不安全的容器镜像带来的风险。此外,掌握Linux系统下的操作技能,可以更好地管理和维护这些工具,确保它们能够有效地发挥作用。
recommend-type

【扣子平台图像艺术探究:理论与实践的完美结合】:深入学习图像生成的艺术

# 1. 图像艺术的理论基础 艺术领域的每一个流派和技巧都有其理论基础。在图像艺术中,理论基础不仅是对艺术表现形式的认知,也是掌握艺术创作内在逻辑的关键。深入理解图像艺术的理论基础,能够帮助艺术家们在创作过程中更加明确地表达自己的艺术意图,以及更好地与观众沟通。 图像艺术的理论