活动介绍

apscheduler.schedulers

时间: 2025-06-07 12:29:22 浏览: 19
### APScheduler 调度器使用指南 #### 安装 APScheduler 为了开始使用 APScheduler,需先安装此库。可以通过 pip 工具轻松完成这一操作[^1]。 ```bash pip install apscheduler ``` #### 选择合适的调度器 根据不同的应用场景和开发环境,可以选择不同类型的调度器来满足需求: - **BlockingScheduler**: 当调度程序作为进程中的唯一活动部分时适用。 - **BackgroundScheduler**: 如果不需要依赖特定框架,并期望调度程序在应用内部后台运行的情况下选用。 - **AsyncIOScheduler**: 对于基于 `asyncio` 模块的应用来说是一个理想的选择。 - **GeventScheduler**: 针对采用 gevent 的项目设计。 - **TornadoScheduler**, **TwistedScheduler**, 和 **QtScheduler** 分别适用于各自的异步网络框架或图形界面平台上的任务调度[^3]. #### 创建与管理任务 通过定义触发条件(Trigger),可以指定何时执行某个函数。常见的触发方式包括但不限于定时间隔、具体日期时间点或是遵循 Cron 表达式的复杂模式[^2]. 下面展示了一个简单的例子,说明如何创建一个每秒钟打印消息的任务: ```python from apscheduler.schedulers.background import BackgroundScheduler import time def job(): print('Hello, world.') scheduler = BackgroundScheduler() scheduler.add_job(job, 'interval', seconds=1) scheduler.start() try: while True: time.sleep(2) except (KeyboardInterrupt, SystemExit): scheduler.shutdown() ``` #### 处理误失任务(Misfires) 当由于某些原因导致原本应该被执行的任务未能按时启动时,这类情况被称为“误失”。默认情况下,APScheduler会尝试补偿这些未及时处理的工作项;然而,在某些场景下这样的机制可能会引起不必要的重复调用。为此,提供了两种策略用于控制此类情形下的行为——调整宽限时间和启用合并功能[^4][^5]: - 设置合理的 `misfire_grace_time` 参数值可允许一定范围内的延迟容忍度; - 启动 merge 功能则能确保即使存在多个待处理实例也只会实际触发一次执行过程。
阅读全文

相关推荐

#定时任务 : (任意APP下的views.py下或者urls.py下) from apscheduler.schedulers.background import BackgroundScheduler from django_apscheduler.jobstores import DjangoJobStore, register_job from apps.intelligent_draw.models import Machines from datetime import datetime, timedelta from utils.const import AllConsts from django.core.mail import send_mail #将已经过时的工作都删除 __lt小于 # from django_apscheduler.models import DjangoJob # DjangoJob.objects.filter(next_run_time__lte=datetime.now()).delete() # 将数据库中已经过时的工作删除 # 开启定时配置 # 实例化调度器 scheduler = BackgroundScheduler() # 调度器使用默认的DjangoJobStore() scheduler.add_jobstore(DjangoJobStore(), 'default') # trigger: 任务执行的方式,共有三种:'date':一次性任务、'interval':循环任务、'cron':定时任务。 # @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_test') #注册一个定时任务 (replace_existing:重新启动时替换现有作业;防止id冲突报错) # 新增一个定时任务,每隔5分钟检测一次,如果发现机器的last_updated比现在的时间晚了半个小时,则更新机器状态为异常,并且发送一封邮件进行告警。 @register_job(scheduler, 'cron', minute='*/5', id='job', replace_existing=True) # 这里写你要执行的任务 def job_task(): print("定时任务开始:") half_hour_ago = datetime.now() - timedelta(minutes=30) machines = Machines.objects.filter(last_updated__lte=half_hour_ago) machine_list = [] for machine in machines: machine_list.append(machine.id) machines.update(status=AllConsts.MACHINE_STATUS_ILLEGAL) send_mail('邮件标题', '邮件内容', '[email protected]', ['[email protected]', '[email protected]']) # 定时任务开始 scheduler.start()这段代码有没有什么问题?

from django.apps import AppConfig class HrunnerConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "hrunner" def ready(self): # 仅在应用加载完成后初始化调度器 if not hasattr(self, 'scheduler_started'): from .task import scheduler # 导入当前应用的调度器 # 启动调度器 scheduler.start() self.scheduler_started = True 另一个文件如下# tasks.py from apscheduler.schedulers.background import BackgroundScheduler from django_apscheduler.jobstores import DjangoJobStore # 初始化调度器 scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoJobStore(), "default") # --- 定义定时任务函数 --- def cleanup_temp_data(): print("清理临时数据...") def send_daily_report(): print("发送日报...") # --- 将任务添加到调度器 --- scheduler.add_job( cleanup_temp_data, "interval", # 间隔性任务 seconds=10, # 每隔 60 秒执行一次 # days=1, # 每天执行一次 id="cleanup_job", # replace_existing=True, # 允许覆盖同名任务 (即数据库已有id为 cleanup_job,不添加这个就会报错) ) scheduler.add_job( send_daily_report, "cron", # 定时任务(类似 crontab) hour=10, # 每天 9 点执行 minute=26, # 加了这行就表示,每天9点 10 分钟 id="report_job", # replace_existing=True, # 允许覆盖同名任务 ) # 注意:这里暂时不启动调度器!在 apps.py 中启动。 # 启动调度器 (这里也启动不了呢) # scheduler.start() 此时数据库里,没有任务,你看看为什么我启动后,会提示我'Job identifier (cleanup_job) conflicts with an existing job'

import os import sys import json import logging import time from apscheduler.schedulers.background import BackgroundScheduler from multiprocessing import cpu_count, Pool BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "teachingReform.settings") import django django.setup() from data_platform_integration.tasks import fetch_and_save_data, init_global_token, load_config logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler("logs/sync.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # def start_sync_process(table_info): # api_url = table_info.get('api_url') # model_class_name = table_info.get('model_class') # if not api_url or not model_class_name: # logger.warning(f"[-] 配置不完整,跳过表: {table_info}") # return # logger.info(f"[+] 开始处理表: {model_class_name}, API URL: {api_url}") # fetch_and_save_data(api_url, model_class_name, table_info) # logger.info(f"[+] 表 {model_class_name} 处理完成。") def start_sync_process(table_info): # 子进程中重新 setup Django 环境 import os import django os.environ.setdefault("DJANGO_SETTINGS_MODULE", "teachingReform.settings") django.setup() api_url = table_info.get('api_url') model_class_name = table_info.get('model_class') if not api_url or not model_class_name: logger.warning(f"[-] 配置不完整,跳过表: {table_info}") return logger.info(f"[+] 开始处理表: {model_class_name}, API URL: {api_url}") fetch_and_save_data(api_url, model_class_name, table_info) logger.info(f"[+] 表 {model_class_name} 处理完成。") def run_once_immediately(): config = load_config() logger.info("[+] 开始立即执行一次任务...") # 初始化全局 token cached_token = '' cached_expires_at = 0.0 init_global_token(cached_token, cached_expires_at) # 获取所有 enabled 表 tables = config.get('tables', []) filtered_tables = [t for t in tables if t.get('enabled', True)] # 按 last_page 降序排序,页数越大的排前面 sorted_tables = sorted( filtered_tables, key=lambda x: x.get("last_page", 1), reverse=True ) MAX_CONCURRENT_WORKERS = 3 # 固定最大并发数 max_workers = min(MAX_CONCURRENT_WORKERS, cpu_count(), len(sorted_tables)) logger.info(f"[+] 使用 {max_workers} 个工作进程来处理 {len(sorted_tables)} 个表。") with Pool(processes=max_workers) as pool: pool.map(start_sync_process, sorted_tables) # 使用排好序的列表 logger.info("[+] 立即执行任务已完成。") def main(): # 启动定时任务 config = load_config() scheduler = BackgroundScheduler() # 初始化全局 token cached_token = '' cached_expires_at = 0.0 init_global_token(cached_token, cached_expires_at) # 获取所有 enabled 表 tables = config.get('tables', []) filtered_tables = [t for t in tables if t.get('enabled', True)] # 按 last_page 降序排序,页数越大的排前面 sorted_tables = sorted( filtered_tables, key=lambda x: x.get("last_page", 1), reverse=True ) for table_info in sorted_tables: # ✅ 使用排序后的表列表 api_url = table_info.get('api_url') model_class = table_info.get('model_class') if not api_url or not model_class: logger.warning(f"[-] 配置不完整,跳过定时任务: {table_info}") continue job_id = f"sync_{model_class}" if scheduler.get_job(job_id): logger.warning(f"[!] 已存在任务 {job_id},跳过添加。") else: scheduler.add_job( fetch_and_save_data, 'interval', hours=48, args=[api_url, model_class, table_info], id=job_id, misfire_grace_time=30, coalesce=True, max_instances=1 ) logger.info(f"[+] 定时任务 {job_id} 已添加。") scheduler.start() logger.info("[+] 定时任务已启动,按 Ctrl+C 停止程序...") # 确保立即执行任务被调用 run_once_immediately() try: while True: time.sleep(1) except KeyboardInterrupt: scheduler.shutdown() logger.info("[-] 程序已终止。") if __name__ == "__main__": main()

# -*- coding: UTF-8 -*- import logging from datetime import datetime import cx_Oracle import numpy as np import requests from pymilvus import connections, Collection, utility, CollectionSchema, FieldSchema, DataType, MilvusException from apscheduler.schedulers.background import BackgroundScheduler import time import re import sys import os from pathlib import Path import json # 获取当前脚本的父目录(即项目根目录) current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(str(current_dir)) # 将项目根目录添加到 sys.path from config.config1 import LOGGING_CONFIG, ORACLE_CONFIG, MODEL_CONFIG, MILVUS_CONFIG # 初始化日志 log_file_path = LOGGING_CONFIG["log_file"] log_file_path = Path(log_file_path) log_file_path.parent.mkdir(exist_ok=True) logging.basicConfig( level=LOGGING_CONFIG["level"], format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(log_file_path), logging.StreamHandler() ] ) logger = logging.getLogger("MaterialSync") class OracleClient: """Oracle数据库客户端""" def __init__(self): self.conn = None self.connect() def connect(self): try: self.conn = cx_Oracle.connect('ecology/[email protected]/oadb', encoding='UTF-8', nencoding='UTF-8') # 指定编码 logger.info("Connected to Oracle database") except Exception as e: logger.error(f"Oracle connection failed: {str(e)}") raise def fetch_all_data(self): """从Oracle数据库中获取所有数据""" try: cursor = self.conn.cursor() query = """ SELECT TO_CHAR(matnr) AS matnr, TO_CHAR(matkl) AS matkl, TO_CHAR(maktx) AS maktx, TO_CHAR(classfication) AS classfication FROM Material_MASTER@ERPLINK WHERE mandt = '688' AND ( matnr like 'DJ%' OR matnr like 'DY%' ) ORDER BY matnr """ cursor.execute(query) columns = [col[0].lower() for col in cursor.description] return [dict(zip(columns, row)) for row in cursor] except Exception as e: logger.error(f"Oracle query failed: {str(e)}") return [] finally: cursor.close() class VectorServiceClient: """HTTP调用模型服务进行向量编码""" def __init__(self): self.service_url = MODEL_CONFIG["model_service_url"] self.timeout = 120 # 请求超时时间(秒) logger.info(f"Using vector service: {self.service_url}") def batch_encode_dense(self, texts): """批量生成密集向量""" return self._call_vector_service(texts, "dense") def batch_encode_sparse(self, texts): """批量生成稀疏向量""" return self._call_vector_service(texts, "sparse") def _call_vector_service(self, texts, vector_type): """调用向量服务通用方法""" try: if not texts: return [] # 准备请求数据 payload = { "texts": texts, "type": vector_type # 添加向量类型参数 } # 配置请求头 headers = { "Content-Type": "application/json; charset=utf-8", "Accept": "application/json" } # 详细记录请求格式 logger.debug(f"Request payload details:") logger.debug(f" Vector type: {vector_type}") logger.debug(f" Text count: {len(texts)}") # 记录前3条文本的详细信息 for i, text in enumerate(texts[:3]): logger.debug(f" Text #{i + 1} (length={len(text)}): {text[:100]}{'...' if len(text) > 100 else ''}") # 记录整个请求体(限制长度) payload_json = json.dumps(payload, ensure_ascii=False) if len(payload_json) > 1000: logger.debug(f" Full request body (truncated): {payload_json[:1000]}...") else: logger.debug(f" Full request body: {payload_json}") # 发送请求到模型服务 response = requests.post( self.service_url, json=payload, headers=headers , timeout=self.timeout ) # 检查响应状态 response.raise_for_status() # 解析响应数据 result = response.json() if "error" in result: logger.error(f"Vector service error ({vector_type}): {result['error']}") raise ValueError(result["error"]) if "vectors" not in result: logger.error(f"Invalid response from {vector_type} service: vectors not found") logger.error(f"Response: {json.dumps(result, ensure_ascii=False)[:500]}") raise ValueError(f"Invalid response from {vector_type} service") logger.info(f"Successfully encoded {len(texts)} texts for {vector_type} vectors") # 对于密集向量,转换为numpy数组 if vector_type == "dense": vectors = np.array(result["vectors"]) # 验证向量维度 expected_dim = MILVUS_CONFIG["vector_dim"] if vectors.shape[1] != expected_dim: logger.error(f"Vector dimension mismatch: expected {expected_dim}, got {vectors.shape[1]}") raise ValueError("Vector dimension mismatch") return vectors else: # 稀疏向量直接返回字典列表 return result["vectors"] except requests.exceptions.RequestException as e: logger.error(f"Request to {vector_type} service failed: {str(e)}") raise except Exception as e: logger.error(f"Encoding via {vector_type} service failed: {str(e)}") raise class MilvusHandler: """Milvus数据库处理器""" def __init__(self): self.collection = None self.vector_service = VectorServiceClient() self.connect() self.prepare_collection() def connect(self): try: connections.connect( host=MILVUS_CONFIG["host"], port=MILVUS_CONFIG["port"] ) logger.info(f"Connected to Milvus: {MILVUS_CONFIG['host']}") except Exception as e: logger.error(f"Milvus connection failed: {str(e)}") raise def prepare_collection(self): """准备集合(自动创建)""" collection_name = MILVUS_CONFIG["collection_name"] if not utility.has_collection(collection_name): fields = [ FieldSchema(name="matnr", dtype=DataType.VARCHAR, is_primary=True, max_length=100), FieldSchema(name="matkl", dtype=DataType.VARCHAR, max_length=50), FieldSchema(name="maktx", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="classfication", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="maktx_vector", dtype=DataType.FLOAT_VECTOR, dim=MILVUS_CONFIG["vector_dim"]), FieldSchema(name="classfication_vector", dtype=DataType.SPARSE_FLOAT_VECTOR) ] schema = CollectionSchema(fields, "Material vector storage") self.collection = Collection(collection_name, schema) # 创建稀疏向量索引 self.collection.create_index( "classfication_vector", {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"} ) # 创建密集向量索引 self.collection.create_index( "maktx_vector", {"index_type": "IVF_FLAT", "metric_type": "IP", "params": {"nlist": 1024}} ) logger.info(f"Created collection with both vector types: {collection_name}") else: self.collection = Collection(collection_name) logger.info(f"Loaded collection schema: {collection_name}") # 确保集合已加载 self.ensure_collection_loaded() def ensure_collection_loaded(self): """确保集合已加载到内存""" try: collection_name = self.collection.name load_state = utility.load_state(collection_name) # 检查集合是否已加载 if load_state != "Loaded": logger.info(f"Collection state is {load_state}, loading now...") self.collection.load() logger.info("Collection loaded successfully") else: logger.info(f"Collection is already loaded (state: {load_state})") except MilvusException as e: logger.error(f"Failed to load collection: {str(e)}") raise except Exception as e: logger.error(f"Error checking collection state: {str(e)}") # 如果无法检查状态,尝试直接加载 try: self.collection.load() logger.info("Collection loaded successfully (using fallback)") except Exception as e2: logger.error(f"Fallback loading failed: {str(e2)}") raise def batch_upsert(self, data, batch_size=500): """分批次插入或更新数据""" total_records = len(data) processed_count = 0 for i in range(0, total_records, batch_size): batch_data = data[i:i + batch_size] # 确保集合已加载 self.ensure_collection_loaded() # 数据清洗 valid_batch_data = [] for item in batch_data: try: cleaned_item = { "matnr": self.clean_utf8(item["matnr"], 'matnr', item['matnr']), "matkl": self.clean_utf8(item["matkl"], 'matkl', item['matnr']), "maktx": self.clean_utf8(item["maktx"], 'maktx', item['matnr']), "classfication": self.clean_utf8(item.get("classfication", ""), 'classfication', item['matnr']) } # 验证UTF-8 if all(self.validate_utf8_string(v) for k, v in cleaned_item.items()): valid_batch_data.append(cleaned_item) else: logger.warning(f"Invalid UTF-8 data skipped: {cleaned_item}") except Exception as e: logger.error(f"Error cleaning item: {str(e)}") if not valid_batch_data: logger.info(f"No valid data in batch {i // batch_size + 1}") continue logger.info(f"Processing batch {i // batch_size + 1} with {len(valid_batch_data)} items") # 查询当前批次中已存在的物料编码 matnr_list = [item['matnr'] for item in valid_batch_data] existing_data = [] try: # 构建安全的查询表达式 safe_matnrs = [f"'{matnr}'" for matnr in matnr_list] expr = f"matnr in [{','.join(safe_matnrs)}]" logger.debug(f"Querying Milvus with expression: {expr}") existing_data = self.collection.query( expr=expr, output_fields=["matnr", "maktx", "classfication", "maktx_vector", "classfication_vector"] ) logger.debug(f"Found {len(existing_data)} existing records") except MilvusException as e: logger.error(f"Milvus query failed: {str(e)}") # 回退方案:逐个查询 logger.warning("Falling back to individual queries") for matnr in matnr_list: try: expr = f"matnr == '{matnr}'" item_data = self.collection.query(expr, output_fields=["matnr", "maktx", "classfication", "maktx_vector", "classfication_vector"]) if item_data: existing_data.extend(item_data) except Exception as e: logger.error(f"Failed to query matnr {matnr}: {str(e)}") existing_dict = {item["matnr"]: item for item in existing_data} # 准备需要重新生成向量的数据 maktx_to_encode = [] # 需要生成密集向量的物料描述 class_to_encode = [] # 需要生成稀疏向量的特征值 maktx_indices = [] # 需要更新密集向量的索引 class_indices = [] # 需要更新稀疏向量的索引 # 准备upsert数据 upsert_data = [] for idx, item in enumerate(valid_batch_data): matnr = item["matnr"] existing = existing_dict.get(matnr, {}) # 检查物料描述是否变化 if matnr in existing_dict: if item["maktx"] == existing.get("maktx", ""): # 物料描述相同,复用现有向量 item["maktx_vector"] = existing.get("maktx_vector") else: # 物料描述变化,需要重新生成 maktx_to_encode.append(item["maktx"]) maktx_indices.append(idx) else: # 新记录,需要生成向量 maktx_to_encode.append(item["maktx"]) maktx_indices.append(idx) # 处理特征值向量 class_value = item["classfication"] # 特征值为空的情况 if not class_value or class_value.isspace(): item["classfication_vector"] = None else: # 特征值不为空 if matnr in existing_dict: if class_value == existing.get("classfication", ""): # 特征值相同,复用现有向量 item["classfication_vector"] = existing.get("classfication_vector") else: # 特征值变化,需要重新生成 class_to_encode.append(class_value) class_indices.append(idx) else: # 新记录,需要生成向量 class_to_encode.append(class_value) class_indices.append(idx) upsert_data.append(item) # 批量生成物料描述向量(密集) if maktx_to_encode: try: logger.info(f"Encoding {len(maktx_to_encode)} dense vectors for maktx...") dense_vectors = self.vector_service.batch_encode_dense(maktx_to_encode) # 将向量分配给对应的记录 for vec_idx, data_idx in enumerate(maktx_indices): upsert_data[data_idx]["maktx_vector"] = dense_vectors[vec_idx] except Exception as e: logger.error(f"Failed to encode dense vectors: {str(e)}") # 跳过这个批次 continue # 批量生成特征值向量(稀疏) if class_to_encode: try: logger.info(f"Encoding {len(class_to_encode)} sparse vectors for classfication...") sparse_vectors = self.vector_service.batch_encode_sparse(class_to_encode) # 将向量分配给对应的记录 for vec_idx, data_idx in enumerate(class_indices): # 确保索引在范围内 if vec_idx < len(sparse_vectors): upsert_data[data_idx]["classfication_vector"] = sparse_vectors[vec_idx] except Exception as e: logger.error(f"Failed to encode sparse vectors: {str(e)}") # 跳过这个批次 continue # 准备Milvus实体数据 entities = [ [item["matnr"] for item in upsert_data], [item["matkl"] for item in upsert_data], [item["maktx"] for item in upsert_data], [item["classfication"] for item in upsert_data], [item.get("maktx_vector", []).tolist() if hasattr(item.get("maktx_vector", None), 'tolist') else [] for item in upsert_data], [self.format_sparse_vector(item.get("classfication_vector")) for item in upsert_data] ] # 执行upsert操作 if upsert_data: try: logger.info(f"Upserting {len(upsert_data)} records to Milvus...") self.collection.upsert(entities) self.collection.flush() # 统计空特征值数量 empty_class_count = sum(1 for item in upsert_data if not item["classfication"] or item["classfication"].isspace()) logger.info(f"Upserted batch {i // batch_size + 1}: " f"{len(upsert_data)} records ({empty_class_count} empty classfication)") processed_count += len(upsert_data) except MilvusException as e: logger.error(f"Milvus upsert failed: {str(e)}") # 记录前3条失败数据 for j in range(min(3, len(upsert_data))): sample = upsert_data[j] logger.error(f"Failed sample {j + 1}: matnr={sample['matnr']}, " f"maktx_len={len(sample['maktx'])}, " f"class_len={len(sample['classfication']) if sample['classfication'] else 0}") return processed_count def format_sparse_vector(self, vec): """格式化稀疏向量为Milvus兼容格式""" if vec is None: return {} # FlagEmbedding 返回的是 {token: weight} 格式 if isinstance(vec, dict): # 转换为 {index: weight} 格式 # 这里我们不需要实际索引,只需确保键是整数 # 使用枚举创建新索引,因为原始token字符串Milvus无法处理 formatted = {} for idx, (token, weight) in enumerate(vec.items()): # 确保权重非负 if float(weight) > 0: formatted[int(idx)] = float(weight) return formatted # 如果传入的是列表或其他格式,转换为字典 try: if isinstance(vec, (list, tuple, np.ndarray)): # 转换为稀疏字典格式,只保留正值 return {i: float(val) for i, val in enumerate(vec) if float(val) > 0} return {} except Exception as e: logger.error(f"Failed to format sparse vector: {str(e)}") return {} @staticmethod def validate_utf8_string(s): try: s.encode('utf-8').decode('utf-8') return True except (UnicodeEncodeError, UnicodeDecodeError): return False @staticmethod def clean_utf8(value, field_name, item_id): """强化 UTF-8 清洗逻辑""" if value is None: return '' try: value_str = str(value) cleaned = re.sub(r'\\u[0-9a-fA-F]{4}', '', value_str) cleaned = cleaned.replace('\xa0', ' ') cleaned = cleaned.encode('utf-8', errors='replace').decode('utf-8') return cleaned except Exception as e: logger.warning(f"Failed to clean UTF-8 for [{field_name}] ({item_id}): {str(e)}") return '' class SyncScheduler: """同步调度器""" def __init__(self): self.oracle = OracleClient() self.milvus = MilvusHandler() def execute_sync(self): """执行同步任务""" logger.info("Starting sync job...") start_time = time.time() try: # 从Oracle获取所有数据 logger.info("Fetching data from Oracle...") all_data = self.oracle.fetch_all_data() if not all_data: logger.info("No data found in Oracle") return logger.info(f"Retrieved {len(all_data)} records from Oracle") # 数据校验和清理 cleaned_data = [] invalid_count = 0 empty_class_count = 0 for item in all_data: try: # 处理可能的键名变化 class_value = item.get('classfication', item.get('classfication', '')) # 数据清洗 cleaned_item = { "matnr": self.clean_utf8(item['matnr'], 'matnr', item['matnr']), "matkl": self.clean_utf8(item['matkl'], 'matkl', item['matnr']), "maktx": self.clean_utf8(item['maktx'], 'maktx', item['matnr']), "classfication": self.clean_utf8(class_value, 'classfication', item['matnr']) } # 统计空特征值 if not cleaned_item["classfication"] or cleaned_item["classfication"].isspace(): empty_class_count += 1 # 验证UTF-8 if all(self.is_valid_utf8(v) for v in cleaned_item.values()): cleaned_data.append(cleaned_item) else: invalid_count += 1 logger.warning(f"Invalid UTF-8 data skipped: matnr={item['matnr']}") except Exception as e: invalid_count += 1 logger.error(f"Error processing item: {item}, error: {str(e)}") if invalid_count > 0: logger.warning(f"Skipped {invalid_count} invalid records") if cleaned_data: processed_count = self.milvus.batch_upsert(cleaned_data) logger.info(f"Successfully processed {processed_count}/{len(cleaned_data)} records") else: logger.warning("No valid data to sync") duration = time.time() - start_time logger.info(f"Sync job completed in {duration:.2f} seconds") except Exception as e: logger.error(f"Sync failed: {str(e)}") duration = time.time() - start_time logger.error(f"Sync job failed after {duration:.2f} seconds") # 尝试重新连接Milvus try: logger.info("Attempting to reconnect to Milvus...") self.milvus = MilvusHandler() logger.info("Milvus reconnected successfully") except Exception as reconnect_error: logger.error(f"Reconnection failed: {str(reconnect_error)}") @staticmethod def clean_utf8(value, field_name, item_id): """强化 UTF-8 清洗逻辑""" if value is None: return '' try: value_str = str(value) cleaned = re.sub(r'\\u[0-9a-fA-F]{4}', '', value_str) cleaned = cleaned.replace('\xa0', ' ') cleaned = cleaned.encode('utf-8', errors='replace').decode('utf-8') return cleaned except Exception as e: logger.warning(f"Failed to clean UTF-8 for [{field_name}] ({item_id}): {str(e)}") return '' @staticmethod def is_valid_utf8(s): try: s.encode('utf-8').decode('utf-8') return True except UnicodeError: return False if __name__ == "__main__": scheduler = BackgroundScheduler() sync = SyncScheduler() # 立即执行一次同步 logger.info("Executing initial sync...") sync.execute_sync() # 每天凌晨2点执行 scheduler.add_job(sync.execute_sync, "cron", hour=10, minute=58) try: logger.info("Scheduler started with HTTP vector services") scheduler.start() # 保持主程序运行 while True: time.sleep(60) except (KeyboardInterrupt, SystemExit): logger.info("Scheduler stopped") scheduler.shutdown() except Exception as e: logger.error(f"Unexpected error: {str(e)}") import traceback logger.error(traceback.format_exc()) 这个代码报错2025-07-05 10:58:16,307 - ERROR - Request to dense service failed: 400 Client Error: Bad Request for url: http://10.162.244.27:8088/encode 2025-07-05 10:58:16,307 - ERROR - Failed to encode dense vectors: 400 Client Error: Bad Request for url: http://10.162.244.27:8088/encode

import re import time import json import threading from selenium import webdriver from datetime import datetime, timedelta from selenium.webdriver.edge.service import Service as EdgeService from selenium.webdriver.edge.options import Options as EdgeOptions from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from apscheduler.schedulers.background import BackgroundScheduler from pytz import timezone import FreeSimpleGUI as sg BEIJING_TZ = timezone('Asia/Shanghai') CONFIG_FILE = 'browser_config.json' ELEMENT_TYPES = { "按钮": "button", "复选框": "checkbox", "单选按钮": "radio", "下拉框": "dropdown", "标签页": "tab", "时间": "time", "文本": "text" } WEEKDAYS = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] SCHEDULER = None def load_config(): default_config = { "order": ["选项1", "选项2", "选项3", "选项4", "选项5", "选项6", "选项7"], "elements": { "target_url": "http://www.igs.gnsswhu.cn/index.php", "选项1": {"type": "标签页", "identifier": "OBS观测值"}, "选项2": {"type": "下拉框", "identifier": "IGS测站列表"}, "选项3": {"type": "复选框", "identifier": "ABMF00GLP"}, "选项4": {"type": "单选按钮", "identifier": "d文件"}, "选项5": {"type": "时间", "identifier": "开始时间"}, "选项6": {"type": "时间", "identifier": "结束时间"}, "选项7": {"type": "按钮", "identifier": "检索"}, }, "start_date": "", "end_date": "", "schedule": { "start_preset": "--", "end_preset": "--", "schedule_type": "--", "schedule_time": "00:00:00" }, "stay_duration": 60 } try: with open(CONFIG_FILE, 'r') as f: user_config = json.load(f) merged_config = default_config.copy() merged_config.update(user_config) # 用户配置覆盖默认 merged_config['order'] = user_config.get('order', default_config['order']) # 确保顺序正确 # 处理日期格式 if 'start_date' in merged_config and ' ' not in merged_config.get('start_date', ''): merged_config['start_date'] += " 00:00:00" if 'end_date' in merged_config and ' ' not in merged_config.get('end_date', ''): merged_config['end_date'] += " 00:00:00" return merged_config except: return default_config def save_config(values, current_order): try: elements = {} for label in current_order: elements[label] = { "type": values[f'-{label}_TYPE-'], "identifier": values[f'-{label}_ID-'] } if values[f'-{label}_TYPE-'] == "文本": elements[label]["text"] = values[f'-{label}_TEXT-'] def get_full_datetime(date_str, h, m, s): if not date_str: return "" # 格式化为两位数 h = f"{int(h):02d}" if h else "00" m = f"{int(m):02d}" if m else "00" s = f"{int(s):02d}" if s else "00" return f"{date_str} {h}:{m}:{s}" start_time = get_full_datetime(values['-START_DATE-'], values['-START_HOUR-'], values['-START_MINUTE-'], values['-START_SECOND-']) end_time = get_full_datetime(values['-END_DATE-'], values['-END_HOUR-'], values['-END_MINUTE-'], values['-END_SECOND-']) config = { "order": current_order, "elements": { "target_url": values['-URL-'], **elements }, "start_identifier": load_config().get('start_identifier', '开始时间'), "end_identifier": load_config().get('end_identifier', '结束时间'), "start_date": start_time, "end_date": end_time, "schedule": load_config().get('schedule', {}), "stay_duration": load_config().get('stay_duration', 60) } with open(CONFIG_FILE, 'w') as f: json.dump(config, f, ensure_ascii=False) return True except Exception as e: print(f"保存配置失败: {str(e)}") return False def create_element_row(label, element_type='', identifier='', text_value=''): return [ sg.Text(label, size=(12, 1)), sg.Combo( list(ELEMENT_TYPES.keys()), default_value=element_type, key=f'-{label}_TYPE-', size=(15, 1), enable_events=True ), sg.Input( identifier, key=f'-{label}_ID-', size=(20, 1) ), sg.Input( text_value, key=f'-{label}_TEXT-', size=(20, 1), visible=(element_type == "文本") ), sg.Button('↑', key=f'-{label}_UP-'), sg.Button('↓', key=f'-{label}_DOWN-'), sg.Button('-', key=f'-{label}_DEL-') ] def create_elements_column(current_order): config = load_config() elements = [] for label in current_order: element_config = config['elements'].get(label, {}) elements.append(create_element_row( label, element_config.get('type', ''), element_config.get('identifier', ''), element_config.get('text', '') )) return sg.Column( elements, scrollable=True, vertical_scroll_only=True, size=(800, 300), key='-ELEMENTS_COL-' ) def show_settings_window(parent_window): """显示停留时间设置窗口""" config = load_config() start_id = config.get('start_identifier', '开始时间') end_id = config.get('end_identifier', '结束时间') layout = [ [sg.Text('网页停留时间(秒):'), sg.Input(config.get('stay_duration', 60), key='-STAY_DURATION-')], [sg.HorizontalSeparator()], [sg.Text('开始时间字段标识:'), sg.Input(start_id, key='-START_ID-', size=15)], [sg.Text('结束时间字段标识:'), sg.Input(end_id, key='-END_ID-', size=15)], [sg.Button('保存'), sg.Button('取消')] ] window = sg.Window('设置', layout, modal=True) while True: event, values = window.read() if event in (None, '取消'): break if event == '保存': try: stay_duration = int(values['-STAY_DURATION-']) if stay_duration <= 0: raise ValueError("停留时间必须大于0") config['stay_duration'] = stay_duration config['start_identifier'] = values['-START_ID-'].strip() config['end_identifier'] = values['-END_ID-'].strip() with open(CONFIG_FILE, 'w') as f: json.dump(config, f, ensure_ascii=False) sg.popup('设置已保存!') break except Exception as e: sg.popup_error(f'输入无效: {str(e)}') window.close() def create_gui(): config = load_config() current_order = config["order"].copy() elements_config = config["elements"] schedule_config = config.get('schedule', {}) # 时间处理函数 def split_datetime(dt_str): if not dt_str: return "", "00", "00", "00" if ' ' in dt_str: date_part, time_part = dt_str.split(' ', 1) h, m, s = time_part.split(':')[:3] else: date_part, h, m, s = dt_str, '00', '00', '00' return date_part, h, m, s # 初始化日期时间 start_date, start_h, start_m, start_s = split_datetime(config.get('start_date', '')) end_date, end_h, end_m, end_s = split_datetime(config.get('end_date', '')) # 主布局 layout = [ [sg.Text('目标网址'), sg.Input(config['elements']['target_url'], key='-URL-')], [sg.HorizontalSeparator()], [create_elements_column(current_order)], [sg.Button('+ 添加选项', key='-ADD_OPTION-')], [sg.HorizontalSeparator()], # 开始时间选择 [sg.Frame('开始时间', [ [sg.CalendarButton('选择日期', target='-START_DATE-', format='%Y-%m-%d'), sg.Input(start_date, key='-START_DATE-', size=(12, 1)), sg.Spin([f"{i:02}" for i in range(24)], start_h, key='-START_HOUR-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], start_m, key='-START_MINUTE-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], start_s, key='-START_SECOND-', size=3)] ])], # 结束时间选择 [sg.Frame('结束时间', [ [sg.CalendarButton('选择日期', target='-END_DATE-', format='%Y-%m-%d'), sg.Input(end_date, key='-END_DATE-', size=(12, 1)), sg.Spin([f"{i:02}" for i in range(24)], end_h, key='-END_HOUR-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], end_m, key='-END_MINUTE-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], end_s, key='-END_SECOND-', size=3)] ])], # 控制按钮 [sg.HorizontalSeparator()], [sg.Button('运行', size=10), sg.Button('定时', key='-SCHEDULE-', size=10), sg.Button('保存配置', size=10), sg.Button('设置', size=10), sg.Button('退出', size=10)], [sg.Output(size=(90, 10), echo_stdout_stderr=True, key='-OUTPUT-')] ] window = sg.Window('浏览器自动化工具', layout, finalize=True) setup_scheduler(window) # 动态更新元素可见性 def update_element_visibility(): for label in current_order: element_type = values.get(f'-{label}_TYPE-', '') window[f'-{label}_TEXT-'].update(visible=(element_type == "文本")) # 主事件循环 while True: event, values = window.read() if event in (None, '退出'): break # 元素类型变化事件 if '_TYPE-' in event: update_element_visibility() # 添加新选项 if event == '-ADD_OPTION-': new_label = f'选项{len(current_order) + 1}' current_order.append(new_label) window.extend_layout(window['-ELEMENTS_COL-'], [create_element_row(new_label)]) window['-ELEMENTS_COL-'].contents_changed() # 删除选项 if '_DEL-' in event: label = event.split('_')[1] if label in current_order: current_order.remove(label) # 删除对应的UI元素 for element in window['-ELEMENTS_COL-'].Widget.winfo_children(): if f'_{label}_' in str(element): element.destroy() # 重新编号剩余选项 new_order = [f"选项{i + 1}" for i in range(len(current_order))] config_changes = {} for old, new in zip(current_order, new_order): config_changes[old] = new # 更新配置和当前顺序 current_order = new_order.copy() window['-ELEMENTS_COL-'].update(visible=False) window['-ELEMENTS_COL-'].update(visible=True) # 移动选项位置 if '_UP-' in event or '_DOWN-' in event: direction = -1 if '_UP-' in event else 1 label = event.split('_')[1] index = current_order.index(label) new_index = index + direction if 0 <= new_index < len(current_order): # 交换顺序 current_order.insert(new_index, current_order.pop(index)) # 重新排列UI元素 elements = [create_element_row(lbl) for lbl in current_order] window['-ELEMENTS_COL-'].update(visible=False) window['-ELEMENTS_COL-'].update(elements) window['-ELEMENTS_COL-'].contents_changed() window['-ELEMENTS_COL-'].update(visible=True) # 保存配置 if event == '保存配置': elements = {} for label in current_order: elements[label] = { "type": values[f'-{label}_TYPE-'], "identifier": values[f'-{label}_ID-'] } if values[f'-{label}_TYPE-'] == "文本": elements[label]["text"] = values[f'-{label}_TEXT-'] # 构建时间字符串 def build_time_str(date_part, h, m, s): return f"{date_part} {h}:{m}:{s}" if date_part else "" new_config = { "order": current_order, "elements": { "target_url": values['-URL-'], ** elements }, "start_date": build_time_str( values['-START_DATE-'], values['-START_HOUR-'], values['-START_MINUTE-'], values['-START_SECOND-'] ), "end_date": build_time_str( values['-END_DATE-'], values['-END_HOUR-'], values['-END_MINUTE-'], values['-END_SECOND-'] ), "schedule": config.get('schedule', {}), "stay_duration": config.get('stay_duration', 60) } try: with open(CONFIG_FILE, 'w') as f: json.dump(new_config, f, indent=4, ensure_ascii=False) sg.popup('配置保存成功!', title='保存结果') except Exception as e: sg.popup_error(f'保存失败: {str(e)}') # 运行自动化 if event == '运行': print("启动浏览器自动化...") threading.Thread( target=browser_automation, args=(values, current_order), daemon=True ).start() # 打开设置窗口 if event == '设置': show_settings_window(window) # 定时设置 if event == '-SCHEDULE-': show_schedule_settings(window, config) # 清理资源 window.close() if SCHEDULER and SCHEDULER.running: SCHEDULER.shutdown() def calculate_dynamic_date(preset): if not preset or preset == "--": return None week_type, weekday_str = preset[:1], preset[1:] week_offset = -1 if week_type == "上" else 0 try: weekday_index = WEEKDAYS.index(weekday_str) except ValueError: return None today = datetime.now(BEIJING_TZ) current_week_monday = today - timedelta(days=today.weekday()) target_week_monday = current_week_monday + timedelta(weeks=week_offset) return target_week_monday + timedelta(days=weekday_index) def show_schedule_settings(parent_window, config): schedule = config.get('schedule', {}) def split_preset_time(time_str): if time_str and re.match(r"\d{2}:\d{2}:\d{2}", time_str): return time_str.split(":") return ["00", "00", "00"] start_time_parts = split_preset_time(schedule.get('start_preset_time', "00:00:00")) end_time_parts = split_preset_time(schedule.get('end_preset_time', "00:00:00")) layout = [ [sg.Text('开始日期预设:'), sg.Combo(["--"] + [f"上{day}" for day in WEEKDAYS] + [f"本{day}" for day in WEEKDAYS], default_value=schedule.get('start_preset', '--'), key='-START_PRESET-', enable_events=True, size=(12, 1)), sg.Text("时间:"), sg.Spin([f"{i:02}" for i in range(24)], start_time_parts[0], key='-START_PRESET_H-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], start_time_parts[1], key='-START_PRESET_M-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], start_time_parts[2], key='-START_PRESET_S-', size=3), sg.Text("实际日期:", size=(10, 1)), sg.Text("", key='-REAL_START-', size=15)], [sg.Text('结束日期预设:'), sg.Combo(["--"] + [f"上{day}" for day in WEEKDAYS] + [f"本{day}" for day in WEEKDAYS], default_value=schedule.get('end_preset', '--'), key='-END_PRESET-', enable_events=True, size=(12, 1)), sg.Text("时间:"), sg.Spin([f"{i:02}" for i in range(24)], end_time_parts[0], key='-END_PRESET_H-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], end_time_parts[1], key='-END_PRESET_M-', size=3), sg.Text(':'), sg.Spin([f"{i:02}" for i in range(60)], end_time_parts[2], key='-END_PRESET_S-', size=3), sg.Text("实际日期:", size=(10, 1)), sg.Text("", key='-REAL_END-', size=15)], [sg.Text('定时执行:'), sg.Combo(["--"] + WEEKDAYS, default_value=schedule.get('schedule_type', '--'), key='-SCHEDULE_TYPE-', size=(8, 1)), sg.Input(schedule.get('schedule_time', '00:00:00'), key='-SCHEDULE_TIME-', size=(8, 1), tooltip="格式: HH:MM:SS")], [sg.Button('保存'), sg.Button('清除定时'), sg.Button('取消')], ] window = sg.Window('定时设置', layout, finalize=True) def update_dates(): start_date = calculate_dynamic_date(window['-START_PRESET-'].get()) end_date = calculate_dynamic_date(window['-END_PRESET-'].get()) window['-REAL_START-'].update(start_date.strftime('%Y-%m-%d') if start_date else "") window['-REAL_END-'].update(end_date.strftime('%Y-%m-%d') if end_date else "") update_dates() while True: event, values = window.read(timeout=500) if event in (None, '取消'): break if event == '清除定时': window['-START_PRESET-'].update('--') window['-END_PRESET-'].update('--') window['-SCHEDULE_TYPE-'].update('--') window['-SCHEDULE_TIME-'].update('00:00:00') window['-REAL_START-'].update('') window['-REAL_END-'].update('') config['schedule'] = { "start_preset": "--", "end_preset": "--", "schedule_type": "--", "schedule_time": "00:00:00" } with open(CONFIG_FILE, 'w') as f: json.dump(config, f, ensure_ascii=False) global SCHEDULER if SCHEDULER and SCHEDULER.running: SCHEDULER.shutdown() SCHEDULER = None sg.popup("定时设置已重置") if event in ('-START_PRESET-', '-END_PRESET-', '__TIMEOUT__'): update_dates() if event == '保存': try: start_h = int(values['-START_PRESET_H-']) start_m = int(values['-START_PRESET_M-']) start_s = int(values['-START_PRESET_S-']) end_h = int(values['-END_PRESET_H-']) end_m = int(values['-END_PRESET_M-']) end_s = int(values['-END_PRESET_S-']) if not (0 <= start_h <= 23 and 0 <= end_h <= 23): raise ValueError("小时需在00-23之间") if not (0 <= start_m <= 59 and 0 <= end_m <= 59): raise ValueError("分钟需在00-59之间") if not (0 <= start_s <= 59 and 0 <= end_s <= 59): raise ValueError("秒数需在00-59之间") except ValueError as e: sg.popup_error(f"时间输入错误: {str(e)}") continue config['schedule']['start_preset_time'] = f"{values['-START_PRESET_H-']}:{values['-START_PRESET_M-']}:{values['-START_PRESET_S-']}" config['schedule']['end_preset_time'] = f"{values['-END_PRESET_H-']}:{values['-END_PRESET_M-']}:{values['-END_PRESET_S-']}" if values['-START_PRESET-'] != "--": parent_window['-START_HOUR-'].update(values['-START_PRESET_H-']) parent_window['-START_MINUTE-'].update(values['-START_PRESET_M-']) parent_window['-START_SECOND-'].update(values['-START_PRESET_S-']) if values['-END_PRESET-'] != "--": parent_window['-END_HOUR-'].update(values['-END_PRESET_H-']) parent_window['-END_MINUTE-'].update(values['-END_PRESET_M-']) parent_window['-END_SECOND-'].update(values['-END_PRESET_S-']) if not re.match(r'^([0-1]\d|2[0-3]):[0-5]\d:[0-5]\d$', values['-SCHEDULE_TIME-']): sg.popup_error("时间格式应为HH:MM:SS") continue start_preset_val = values['-START_PRESET-'] end_preset_val = values['-END_PRESET-'] calculated_start = calculate_dynamic_date(start_preset_val) calculated_end = calculate_dynamic_date(end_preset_val) if start_preset_val != "--" and calculated_start: date_part = calculated_start.strftime('%Y-%m-%d') # 更新主窗口的开始日期输入框 parent_window['-START_DATE-'].update(date_part) # 同时更新配置中的日期部分 config[ 'start_date'] = f"{date_part} {values['-START_PRESET_H-']}:{values['-START_PRESET_M-']}:{values['-START_PRESET_S-']}" if end_preset_val != "--" and calculated_end: date_part = calculated_end.strftime('%Y-%m-%d') # 更新主窗口的结束日期输入框 parent_window['-END_DATE-'].update(date_part) # 同时更新配置中的日期部分 config[ 'end_date'] = f"{date_part} {values['-END_PRESET_H-']}:{values['-END_PRESET_M-']}:{values['-END_PRESET_S-']}" config['schedule'] = { "start_preset": start_preset_val, "end_preset": end_preset_val, "schedule_type": values['-SCHEDULE_TYPE-'], "schedule_time": values['-SCHEDULE_TIME-'] } with open(CONFIG_FILE, 'w') as f: json.dump(config, f, ensure_ascii=False) setup_scheduler(parent_window) sg.popup("定时设置已保存!") window.close() def setup_scheduler(parent_window): global SCHEDULER if SCHEDULER and SCHEDULER.running: SCHEDULER.shutdown() config = load_config() schedule = config.get('schedule', {}) if schedule.get('schedule_type') == "--" or not schedule.get('schedule_time'): return SCHEDULER = BackgroundScheduler(timezone=BEIJING_TZ) def scheduled_task(): try: now = datetime.now(BEIJING_TZ) print(f"\n[{now.strftime('%Y-%m-%d %H:%M:%S')}] 定时任务启动") parent_window.write_event_value('执行定时任务', None) except Exception as e: print(f"定时任务异常: {str(e)}") try: h, m, s = map(int, schedule['schedule_time'].split(':')) weekday_index = WEEKDAYS.index(schedule['schedule_type']) SCHEDULER.add_job( scheduled_task, 'cron', day_of_week=weekday_index, hour=h, minute=m, second=s, misfire_grace_time=60 ) SCHEDULER.start() next_run = SCHEDULER.get_jobs()[0].next_run_time.astimezone(BEIJING_TZ) print(f"定时任务已激活,每周{WEEKDAYS[weekday_index]} {schedule['schedule_time']} 执行") print(f"下次执行时间: {next_run.strftime('%Y-%m-%d %H:%M:%S')}") except Exception as e: sg.popup_error(f"定时设置错误: {str(e)}") def browser_automation(values, current_order): try: if current_order is None: config = load_config() current_order = config["order"] config = load_config() STAY_DURATION = config.get('stay_duration', 60) schedule = config.get('schedule', {}) start_date = values['-START_DATE-'] start_time = f"{values['-START_HOUR-']}:{values['-START_MINUTE-']}:{values['-START_SECOND-']}" full_start = f"{start_date} {start_time}" if start_date else "" end_date = values['-END_DATE-'] end_time = f"{values['-END_HOUR-']}:{values['-END_MINUTE-']}:{values['-END_SECOND-']}" full_end = f"{end_date} {end_time}" if end_date else "" if not start_date or not end_date: print("错误:日期预设配置无效") return print(f"当前日期范围: {full_start} 至 {full_end}") service = EdgeService(r"C:\Program Files (x86)\Microsoft\Edge\Application\msedgedriver.exe") options = EdgeOptions() options.use_chromium = True driver = webdriver.Edge(service=service, options=options) modified_values = values.copy() modified_values['-FULL_START-'] = full_start modified_values['-FULL_END-'] = full_end driver.get(values['-URL-']) time.sleep(2) for label in current_order: element_config = config['elements'].get(label, {}) if element_config and element_config.get('identifier'): handle_element(driver, label, element_config, modified_values) time.sleep(0.5) print("自动化操作成功完成!") print(f"网页将保持打开状态{STAY_DURATION}秒...") time.sleep(STAY_DURATION) except Exception as e: print(f"执行过程中发生错误: {str(e)}") if 'driver' in locals(): driver.save_screenshot(f'error_{int(time.time())}.png') finally: if 'driver' in locals(): driver.quit() print("浏览器已关闭") def handle_element(driver, label, element_config, config): element_type = ELEMENT_TYPES[element_config['type']] identifier = element_config['identifier'] clean_identifier = identifier.replace(' ', '') schedule_config = config start_identifier = schedule_config.get('start_identifier', '开始时间') end_identifier = schedule_config.get('end_identifier', '结束时间') try: if element_type == 'button': xpath = f"//div[translate(normalize-space(), ' ', '') = '{clean_identifier}']" element = WebDriverWait(driver, 20).until(EC.element_to_be_clickable((By.XPATH, xpath))) driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element) driver.execute_script("arguments[0].click();", element) elif element_type == 'checkbox': xpath = f"//span[translate(normalize-space(), ' ', '')='{clean_identifier}']/preceding-sibling::input[@type='checkbox']" checkbox = WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.XPATH, xpath))) if not checkbox.is_selected(): driver.execute_script("arguments[0].click();", checkbox) elif element_type == 'radio': xpath = f"//input[@type='radio']/following-sibling::text()[contains(., '{identifier}')]/preceding::input[1]" radio = WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.XPATH, xpath))) driver.execute_script("arguments[0].click();", radio) elif element_type == 'dropdown': xpath = f"//span[translate(normalize-space(), ' ', '')='{clean_identifier}']/following::select[1]" select_element = WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.XPATH, xpath))) Select(select_element).select_by_index(0) elif element_type == 'tab': xpath = f"//div[contains(@class,'tab')]/span[translate(normalize-space(), ' ', '')='{clean_identifier}']" tab = WebDriverWait(driver, 20).until(EC.element_to_be_clickable((By.XPATH, xpath))) driver.execute_script("arguments[0].click();", tab) time.sleep(1) elif element_type == 'time': xpath = f"//span[translate(normalize-space(), ' ', '')='{clean_identifier}']/following::input[1]" field = WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.XPATH, xpath))) schedule_config = config.get('schedule', {}) start_id = schedule_config.get('start_identifier', '开始时间') end_id = schedule_config.get('end_identifier', '结束时间') if element_config['identifier'] == start_id: full_value = config.get('-FULL_START-', '') elif element_config['identifier'] == end_id: full_value = config.get('-FULL_END-', '') else: print(f"未配置的时间字段: {element_config['identifier']}") return attempts = [] if full_value: attempts.append(full_value) if ' ' in full_value: attempts.append(full_value.split(' ')[0]) attempts.append(full_value.split(' ')[1]) success = False for attempt in attempts: try: field.clear() field.send_keys(attempt) time.sleep(0.5) current_value = field.get_attribute('value') if current_value.strip() == attempt.strip(): success = True break except Exception as e: print(f"尝试输入 '{attempt}' 失败: {str(e)}") continue if not success: raise ValueError(f"无法输入日期时间: {full_value}") elif element_type == 'text': try: # 查找包含标识文本的元素 base_element = WebDriverWait(driver, 20).until( EC.presence_of_element_located( (By.XPATH, f"//*[contains(text(), '{identifier}')]")) ) # 查找最近的输入框(前、后或父级相邻) input_element = base_element.find_element(By.XPATH, "./following-sibling::input | " + "./preceding-sibling::input | " + "../following-sibling::input | " + "ancestor::div/following-sibling::input" ) # 清空并输入文本 input_element.clear() input_element.send_keys(element_config.get('text', '')) print(f"成功输入文本: {element_config['text']}") except Exception as e: print(f"查找输入框失败: {str(e)}") raise return print(f"成功处理: {label}") except Exception as e: print(f"处理 {label} 时出错: {str(e)}") raise if __name__ == "__main__": create_gui() 以上代码的删除功能键点击无效,点击上下移动界面闪退,修改

最新推荐

recommend-type

基于QT的调色板

【基于QT的调色板】是一个使用Qt框架开发的色彩选择工具,类似于Windows操作系统中常见的颜色选取器。Qt是一个跨平台的应用程序开发框架,广泛应用于桌面、移动和嵌入式设备,支持C++和QML语言。这个调色板功能提供了横竖两种渐变模式,用户可以方便地选取所需的颜色值。 在Qt中,调色板(QPalette)是一个关键的类,用于管理应用程序的视觉样式。QPalette包含了一系列的颜色角色,如背景色、前景色、文本色、高亮色等,这些颜色可以根据用户的系统设置或应用程序的需求进行定制。通过自定义QPalette,开发者可以创建具有独特视觉风格的应用程序。 该调色板功能可能使用了QColorDialog,这是一个标准的Qt对话框,允许用户选择颜色。QColorDialog提供了一种简单的方式来获取用户的颜色选择,通常包括一个调色板界面,用户可以通过滑动或点击来选择RGB、HSV或其他色彩模型中的颜色。 横渐变取色可能通过QGradient实现,QGradient允许开发者创建线性或径向的色彩渐变。线性渐变(QLinearGradient)沿直线从一个点到另一个点过渡颜色,而径向渐变(QRadialGradient)则以圆心为中心向外扩散颜色。在调色板中,用户可能可以通过滑动条或鼠标拖动来改变渐变的位置,从而选取不同位置的颜色。 竖渐变取色则可能是通过调整QGradient的方向来实现的,将原本水平的渐变方向改为垂直。这种设计可以提供另一种方式来探索颜色空间,使得选取颜色更为直观和便捷。 在【colorpanelhsb】这个文件名中,我们可以推测这是与HSB(色相、饱和度、亮度)色彩模型相关的代码或资源。HSB模型是另一种常见且直观的颜色表示方式,与RGB或CMYK模型不同,它以人的感知为基础,更容易理解。在这个调色板中,用户可能可以通过调整H、S、B三个参数来选取所需的颜色。 基于QT的调色板是一个利用Qt框架和其提供的色彩管理工具,如QPalette、QColorDialog、QGradient等,构建的交互式颜色选择组件。它不仅提供了横竖渐变的色彩选取方式,还可能支持HSB色彩模型,使得用户在开发图形用户界面时能更加灵活和精准地控制色彩。
recommend-type

基于springboot二手物品交易网站系统【附万字论文+PPT+包部署+录制讲解视频】.zip

标题基于Spring Boot的二手物品交易网站系统研究AI更换标题第1章引言阐述基于Spring Boot开发二手物品交易网站的研究背景、意义、现状及本文方法与创新点。1.1研究背景与意义介绍二手物品交易的市场需求和Spring Boot技术的适用性。1.2国内外研究现状概述当前二手物品交易网站的发展现状和趋势。1.3论文方法与创新点说明本文采用的研究方法和在系统设计中的创新之处。第2章相关理论与技术介绍开发二手物品交易网站所涉及的相关理论和关键技术。2.1Spring Boot框架解释Spring Boot的核心概念和主要特性。2.2数据库技术讨论适用的数据库技术及其在系统中的角色。2.3前端技术阐述与后端配合的前端技术及其在系统中的应用。第3章系统需求分析详细分析二手物品交易网站系统的功能需求和性能需求。3.1功能需求列举系统应实现的主要功能模块。3.2性能需求明确系统应满足的性能指标和安全性要求。第4章系统设计与实现具体描述基于Spring Boot的二手物品交易网站系统的设计和实现过程。4.1系统架构设计给出系统的整体架构设计和各模块间的交互方式。4.2数据库设计详细阐述数据库的结构设计和数据操作流程。4.3界面设计与实现介绍系统的界面设计和用户交互的实现细节。第5章系统测试与优化说明对系统进行测试的方法和性能优化的措施。5.1测试方法与步骤测试环境的搭建、测试数据的准备及测试流程。5.2测试结果分析对测试结果进行详细分析,验证系统是否满足需求。5.3性能优化措施提出针对系统性能瓶颈的优化建议和实施方案。第6章结论与展望总结研究成果,并展望未来可能的研究方向和改进空间。6.1研究结论概括本文基于Spring Boot开发二手物品交易网站的主要发现和成果。6.2展望与改进讨论未来可能的系统改进方向和新的功能拓展。
recommend-type

基于Python的学生宿舍管理系统的设计与实现+数据库文档

1. 用户与权限管理模块 角色管理: 学生:查看个人住宿信息、提交报修申请、查看卫生检查结果、请假外出登记 宿管人员:分配宿舍床位、处理报修申请、记录卫生检查结果、登记晚归情况 管理员:维护楼栋与房间信息、管理用户账号、统计住宿数据、发布宿舍通知 用户操作: 登录认证:对接学校统一身份认证(模拟实现,用学号 / 工号作为账号),支持密码重置 信息管理:学生完善个人信息(院系、专业、联系电话),管理员维护所有用户信息 权限控制:不同角色仅可见对应功能(如学生无法修改床位分配信息) 2. 宿舍信息管理模块 楼栋与房间管理: 楼栋信息:名称(如 "1 号宿舍楼")、层数、性别限制(男 / 女 / 混合)、管理员(宿管) 房间信息:房间号(如 "101")、户型(4 人间 / 6 人间)、床位数量、已住人数、可用状态 设施信息:记录房间内设施(如空调、热水器、桌椅)的配置与完好状态 床位管理: 床位编号:为每个床位设置唯一编号(如 "101-1" 表示 101 房间 1 号床) 状态标记:标记床位为 "空闲 / 已分配 / 维修中",支持批量查询空闲床位 历史记录:保存床位的分配变更记录(如从学生 A 调换到学生 B 的时间与原因) 3. 住宿分配与调整模块 住宿分配: 新生分配:管理员导入新生名单后,宿管可按专业集中、性别匹配等规则批量分配床位 手动分配:针对转专业、复学学生,宿管手动指定空闲床位并记录分配时间 分配结果公示:学生登录后可查看自己的宿舍信息(楼栋、房间号、床位号、室友列表) 调整管理: 调宿申请:学生提交调宿原因(如室友矛盾、身体原因),选择意向宿舍(需有空位) 审批流程:宿管审核申请,通过后执行床位调换,更新双方住宿信息 换宿记录:保存调宿历史(申请人、原床位、新床位、审批人、时间) 4. 报修与安全管理模块 报修管理: 报修提交:学生选择宿舍、设施类型(如 "
recommend-type

深入学习循环神经网络(RNN)的方法与技巧

资源下载链接为: https://pan.quark.cn/s/8a7ca10dbd74 深入学习循环神经网络(RNN)的方法与技巧(最新、最全版本!打开链接下载即可用!)
recommend-type

MATLAB神经网络优化算法

资源下载链接为: https://pan.quark.cn/s/dd0f9ae8530e MATLAB神经网络优化算法(最新、最全版本!打开链接下载即可用!)
recommend-type

美国国际航空交通数据分析报告(1990-2020)

根据给定的信息,我们可以从中提取和分析以下知识点: 1. 数据集概述: 该数据集名为“U.S. International Air Traffic data(1990-2020)”,记录了美国与国际间航空客运和货运的详细统计信息。数据集涵盖的时间范围从1990年至2020年,这说明它包含了长达30年的时间序列数据,对于进行长期趋势分析非常有价值。 2. 数据来源及意义: 此数据来源于《美国国际航空客运和货运统计报告》,该报告是美国运输部(USDOT)所管理的T-100计划的一部分。T-100计划旨在收集和发布美国和国际航空公司在美国机场的出入境交通报告,这表明数据的权威性和可靠性较高,适用于政府、企业和学术研究等领域。 3. 数据内容及应用: 数据集包含两个主要的CSV文件,分别是“International_Report_Departures.csv”和“International_Report_Passengers.csv”。 a. International_Report_Departures.csv文件可能包含了以下内容: - 离港航班信息:记录了各航空公司的航班号、起飞和到达时间、起飞和到达机场的代码以及国际地区等信息。 - 航空公司信息:可能包括航空公司代码、名称以及所属国家等。 - 飞机机型信息:如飞机类型、座位容量等,这有助于分析不同机型的使用频率和趋势。 - 航线信息:包括航线的起始和目的国家及城市,对于研究航线网络和优化航班计划具有参考价值。 这些数据可以用于航空交通流量分析、机场运营效率评估、航空市场分析等。 b. International_Report_Passengers.csv文件可能包含了以下内容: - 航班乘客信息:可能包括乘客的国籍、年龄、性别等信息。 - 航班类型:如全客机、全货机或混合型航班,可以分析乘客运输和货物运输的比例。 - 乘客数量:记录了各航班或航线的乘客数量,对于分析航空市场容量和增长趋势很有帮助。 - 飞行里程信息:有助于了解国际间不同航线的长度和飞行距离,为票价设置和燃油成本分析提供数据支持。 这些数据可以用于航空客运市场分析、需求预测、收益管理等方面。 4. 数据分析和应用实例: - 航空流量分析:通过分析离港航班数据,可以观察到哪些航线最为繁忙,哪些机场的国际航空流量最大,这有助于航空公司调整航班时刻表和运力分配。 - 市场研究:乘客数据可以揭示不同国家和地区之间的人口流动趋势,帮助航空公司和政府机构了解国际旅行市场的需求变化。 - 飞机利用率:结合飞机机型和飞行频率信息,可以对特定机型的使用率进行分析,评估飞机维护需求和燃油效率。 - 安全监管:通过对比不同航空公司和航班的安全记录,监管机构可以更有效地评估航空公司的安全性能,并采取必要的监管措施。 5. 技术和方法论: 分析此类数据通常涉及数据清洗、数据整合、统计分析、时间序列分析、预测建模等数据科学方法。使用Excel、SQL、R、Python等工具进行数据处理和分析是常见的做法。例如,可以使用Python的Pandas库来清洗和准备数据,使用Matplotlib和Seaborn库来可视化数据,然后利用Scikit-learn或Statsmodels库来构建预测模型。 通过以上知识点的提取和分析,我们可以理解到“U.S. International Air Traffic data(1990-2020)-数据集”的重要性,它不仅记录了跨越30年的航空交通数据,还为各种分析和应用提供了详实的基础信息。对于航空业从业者、政策制定者、研究人员以及数据分析师来说,这是一个极具价值的数据资源。
recommend-type

统计学视角:深入理解最小二乘法的概率论基础

# 1. 最小二乘法的基本概念 最小二乘法(Least Squares Method, LSM)是统计学和数据分析中广泛使用的一种数学优化技术。其主要目的是通过最小化误差的平方和来寻找数据的最佳函数匹配。这种方法最早由高斯提出,但在工程、物理、经济和其他领域中有着广泛的应用。 在本章中,我们将首先了
recommend-type

vscode中使用Codeium

<think>好的,我需要回答用户如何在VSCode中使用Codeium插件的问题。首先,我需要回顾用户提供的引用内容,看看是否有相关信息。用户提供的引用[1]提到了Codeium Chat可以解释代码中的问题,引用[2]提到了在IDEA中的一些问题,可能对VSCode的安装有帮助。用户还提到了安装步骤、基本功能、常见问题等。 首先,我应该分步骤说明安装过程,包括在VSCode扩展商店搜索Codeium并安装。然后,登录部分可能需要用户访问仪表板获取API密钥,引用[2]中提到登录问题,可能需要提醒用户注意网络或权限设置。 接下来是基本功能,比如代码自动补全和Chat功能。引用[1]提到C
recommend-type

UniMoCo:统一框架下的多监督视觉学习方法

在详细解析“unimoco”这个概念之前,我们需要明确几个关键点。首先,“unimoco”代表的是一种视觉表示学习方法,它在机器学习尤其是深度学习领域中扮演着重要角色。其次,文章作者通过这篇论文介绍了UniMoCo的全称,即“Unsupervised, Semi-Supervised and Full-Supervised Visual Representation Learning”,其背后的含义是在于UniMoCo框架整合了无监督学习、半监督学习和全监督学习三种不同的学习策略。最后,该框架被官方用PyTorch库实现,并被提供给了研究者和开发者社区。 ### 1. 对比学习(Contrastive Learning) UniMoCo的概念根植于对比学习的思想,这是一种无监督学习的范式。对比学习的核心在于让模型学会区分不同的样本,通过将相似的样本拉近,将不相似的样本推远,从而学习到有效的数据表示。对比学习与传统的分类任务最大的不同在于不需要手动标注的标签来指导学习过程,取而代之的是从数据自身结构中挖掘信息。 ### 2. MoCo(Momentum Contrast) UniMoCo的实现基于MoCo框架,MoCo是一种基于队列(queue)的对比学习方法,它在训练过程中维持一个动态的队列,其中包含了成对的负样本。MoCo通过 Momentum Encoder(动量编码器)和一个队列来保持稳定和历史性的负样本信息,使得模型能够持续地进行对比学习,即使是在没有足够负样本的情况下。 ### 3. 无监督学习(Unsupervised Learning) 在无监督学习场景中,数据样本没有被标记任何类别或标签,算法需自行发现数据中的模式和结构。UniMoCo框架中,无监督学习的关键在于使用没有标签的数据进行训练,其目的是让模型学习到数据的基础特征表示,这对于那些标注资源稀缺的领域具有重要意义。 ### 4. 半监督学习(Semi-Supervised Learning) 半监督学习结合了无监督和有监督学习的优势,它使用少量的标注数据与大量的未标注数据进行训练。UniMoCo中实现半监督学习的方式,可能是通过将已标注的数据作为对比学习的一部分,以此来指导模型学习到更精准的特征表示。这对于那些拥有少量标注数据的场景尤为有用。 ### 5. 全监督学习(Full-Supervised Learning) 在全监督学习中,所有的训练样本都有相应的标签,这种学习方式的目的是让模型学习到映射关系,从输入到输出。在UniMoCo中,全监督学习用于训练阶段,让模型在有明确指示的学习目标下进行优化,学习到的任务相关的特征表示。这通常用于有充足标注数据的场景,比如图像分类任务。 ### 6. PyTorch PyTorch是一个开源机器学习库,由Facebook的人工智能研究团队开发,主要用于计算机视觉和自然语言处理等任务。它被广泛用于研究和生产环境,并且因其易用性、灵活性和动态计算图等特性受到研究人员的青睐。UniMoCo官方实现选择PyTorch作为开发平台,说明了其对科研社区的支持和对易于实现的重视。 ### 7. 可视化表示学习(Visual Representation Learning) 可视化表示学习的目的是从原始视觉数据中提取特征,并将它们转换为能够反映重要信息且更易于处理的形式。在UniMoCo中,无论是无监督、半监督还是全监督学习,最终的目标都是让模型学习到有效的视觉表示,这些表示可以用于下游任务,如图像分类、目标检测、图像分割等。 ### 8. 标签队列(Label Queue) UniMoCo通过标签队列维护受监管的标签,这可能意味着对于那些半监督或全监督学习的任务,模型在进行对比学习时,会参考这些来自标签队列的数据。标签队列机制能帮助模型更好地利用有限的标注数据,增强模型的泛化能力。 ### 结论 UniMoCo的提出,以及其官方PyTorch实现的发布,将对计算机视觉领域产生深远影响。它不仅提供了一个统一的对比学习框架,使得从无监督到全监督的学习过程更加灵活和高效,而且为研究者们提供了一个强力的工具,以便更好地探索和实现各种视觉任务。UniMoCo的研究和应用前景,为机器学习尤其是深度学习在视觉领域的研究和实践提供了新的视角和可能。
recommend-type

【MATLAB算法精讲】:最小二乘法的实现与案例深度分析

# 1. 最小二乘法的基本原理 最小二乘法是一种数学优化技术,它通过最小化误差的平方和来寻找数据的最佳函数匹配。其核心思想是选择一条曲线,使得所有观察点到这条曲线的距离之和最小。这种方法广泛应用于统计学、信号处理、工程学和经济学等领域,尤其适用于需要通过一组数据点来确定函数参数的情况。 ## 1.1 统计学视角下的最小二乘法 在统计学中,最小二乘法经常用于