活动介绍

利用pandas将‘basketball.csv’中的数据导入名为dataset的DataFrame对象中,并使用drop_duplicates()函数对dataset中的Date列进行去重,不保留副本。将去重结果dealed和dataset返回。 测试说明 若导入csv文件的结果正确,将输出'Correctly read CSV file!'. 若去重结果正确,将输出'Correctly drop duplicated parts!'.

时间: 2025-06-24 15:44:13 浏览: 14
### 使用 Pandas 处理 CSV 文件并去重 为了完成此任务,可以通过 `pandas` 库实现以下操作: 1. **读取 CSV 数据**:使用 `pd.read_csv()` 函数将 `'basketball.csv'` 中的数据加载到 DataFrame 对象中。 2. **去重 Date 列**:调用 `drop_duplicates()` 方法对指定列(这里是 `Date` 列)执行去重操作。设置参数 `subset='Date'` 来限定仅针对该列进行去重,并通过 `keep='first'` 参数保留第一次出现的记录[^1]。 3. **返回结果**:分别返回原始数据集和经过去重处理的结果。 以下是完整的代码实现: ```python import pandas as pd def duplicate(): ''' 返回值: dealed: 对dataset的Date列进行去重后的DataFrame类型数据 dataset: 初始读入的basketball.csv的数据 ''' # 导入CSV文件至DataFrame对象 dataset = pd.read_csv('basketball.csv') # 去重Date列,不保留副本 dealed = dataset.drop_duplicates(subset='Date', keep='first', inplace=False) # 返回去重后的结果以及原始数据 return dealed, dataset ``` #### 关键点解析 - **`pd.read_csv('basketball.csv')`**: 将 CSV 文件中的数据加载为一个 DataFrame 对象[^5]。 - **`drop_duplicates(subset='Date', keep='first', inplace=False)`**: - `subset='Date'`: 指定只对 `Date` 列进行去重。 - `keep='first'`: 保留首次出现的重复项。 - `inplace=False`: 不修改原 DataFrame 的内容,而是返回一个新的 DataFrame。 --- ### 注意事项 如果需要进一步验证或调试代码,可以加入打印语句来观察中间结果: ```python print(dataset.head()) # 查看原始数据前几行 print(dealed.head()) # 查看去重后数据前几行 ``` 此外,确保运行环境已安装 `pandas` 库。如果没有安装,可通过以下命令安装: ```bash pip install pandas ``` ---
阅读全文

相关推荐

不是用的xgb吗:import pandas as pd import xgboost as xgb from sklearn.preprocessing import MinMaxScaler dataset1 = pd.read_csv('data/dataset1.csv') dataset1.label.replace(-1,0,inplace=True) dataset2 = pd.read_csv('data/dataset2.csv') dataset2.label.replace(-1,0,inplace=True) dataset3 = pd.read_csv('data/dataset3.csv') dataset1.drop_duplicates(inplace=True) dataset2.drop_duplicates(inplace=True) dataset3.drop_duplicates(inplace=True) dataset12 = pd.concat([dataset1,dataset2],axis=0) dataset1_y = dataset1.label dataset1_x = dataset1.drop(['user_id','label','day_gap_before','day_gap_after'],axis=1) # 'day_gap_before','day_gap_after' cause overfitting, 0.77 dataset2_y = dataset2.label dataset2_x = dataset2.drop(['user_id','label','day_gap_before','day_gap_after'],axis=1) dataset12_y = dataset12.label dataset12_x = dataset12.drop(['user_id','label','day_gap_before','day_gap_after'],axis=1) dataset3_preds = dataset3[['user_id','coupon_id','date_received']] dataset3_x = dataset3.drop(['user_id','coupon_id','date_received','day_gap_before','day_gap_after'],axis=1) print dataset1_x.shape,dataset2_x.shape,dataset3_x.shape dataset1 = xgb.DMatrix(dataset1_x,label=dataset1_y) dataset2 = xgb.DMatrix(dataset2_x,label=dataset2_y) dataset12 = xgb.DMatrix(dataset12_x,label=dataset12_y) dataset3 = xgb.DMatrix(dataset3_x) params={'booster':'gbtree', 'objective': 'rank:pairwise', 'eval_metric':'auc', 'gamma':0.1, 'min_child_weight':1.1, 'max_depth':5, 'lambda':10, 'subsample':0.7, 'colsample_bytree':0.7, 'colsample_bylevel':0.7, 'eta': 0.01, 'tree_method':'exact', 'seed':0, 'nthread':12 } #train on dataset1, evaluate on dataset2 #watchlist = [(dataset1,'train'),(dataset2,'val')] #model = xgb.train(params,dataset1,num_boost_round=3000,evals=watchlist,early_stopping_rounds=300) watchlist = [(dataset12,'train')] model = xgb.train(params,dataset12,num_boost_round=3500,evals=watchlist) #predict test set dataset3_preds['label'] = model.predict(dataset3) dataset3_preds.label = MinMaxScaler().fit_transform(dataset3_preds.label.reshape(-1, 1)) dataset3_preds.sort_values(by=['coupon_id','label'],inplace=True) dataset3_preds.to_csv("xgb_preds.csv",index=None,header=None) print dataset3_preds.describe() #save feature score feature_score = model.get_fscore() feature_score = sorted(feature_score.items(), key=lambda x:x[1],reverse=True) fs = [] for (key,value) in feature_score: fs.append("{0},{1}\n".format(key,value)) with open('xgb_feature_score.csv','w') as f: f.writelines("feature,score\n") f.writelines(fs)

帮我修改代码,一是原始数据只有30天,不是32天,二是完播率预测结果数值一样,需要调优模型;三是预测结果的行数应该与testA_pred_did.csv保持一致。import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime from sklearn.metrics import roc_auc_score # 添加AUC计算 # 修复:在函数定义后添加缩进的代码块 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 if dtype: # 确保dtype不为空 for col in chunk.columns: if col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() chunks.append(chunk) if chunks: return pd.concat(chunks, ignore_index=True) return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 修复:确保所有函数都有缩进的代码块 def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame,即使某些为空 return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 修复:添加缺失的函数定义 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本(使用集合操作) exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 修复:添加缺失的函数定义 def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征(使用索引加速合并) if not did_features.empty and 'did' in did_features.columns: df = df.merge(did_features, on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: df = df.merge(vid_info, on='vid', how='left') # 用户行为统计 user_click_count = pd.Series(dtype='int') if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') if not user_click_count.empty: df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 user_total_play = pd.Series(dtype='float') if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') if not user_total_play.empty: df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 # 视频热度统计 video_click_count = pd.Series(dtype='int') if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') if not video_click_count.empty: df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 avg_play_time = pd.Series(dtype='float') if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') if not avg_play_time.empty: df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 修复:添加缺失的函数定义 def get_categorical_features(df, base_features): """动态获取存在的分类特征""" existing_features = [] for feature in base_features: if feature in df.columns: try: # 尝试转换为数值,如果是数值则跳过 pd.to_numeric(df[feature], errors='raise') except: existing_features.append(feature) # 确保转换为category类型 df[feature] = df[feature].astype('category').cat.as_ordered() return existing_features # 修复:添加缺失的函数定义 def build_play_dataset(hist_play, vid_info, did_features, hist_click): """构建完播率数据集,优化内存使用""" if hist_play.empty: print("⚠️ 历史播放数据为空,无法构建完播率数据集") return pd.DataFrame() # 基础数据 play_data = hist_play[['did', 'vid', 'play_time']].copy() # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: play_data = play_data.merge( vid_info[['vid', 'item_duration']], on='vid', how='left' ) else: play_data['item_duration'] = 1.0 # 默认值 # 计算完播率 play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 添加用户特征 if not did_features.empty and 'did' in did_features.columns: play_data = play_data.merge( did_features, on='did', how='left' ) # 添加视频特征 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col != 'item_duration'] play_data = play_data.merge( vid_info[vid_cols], on='vid', how='left' ) # 用户平均完播率 play_data['user_avg_completion'] = play_data.groupby('did')['completion_rate'].transform('mean') play_data['user_play_count'] = play_data.groupby('did')['completion_rate'].transform('count') # 视频平均完播率 play_data['video_avg_completion'] = play_data.groupby('vid')['completion_rate'].transform('mean') play_data['video_completion_std'] = play_data.groupby('vid')['completion_rate'].transform('std') # 用户-视频互动特征 if not hist_click.empty and 'did' in hist_click.columns and 'vid' in hist_click.columns: user_vid_clicks = hist_click.groupby(['did', 'vid']).size().reset_index(name='user_vid_clicks') play_data = play_data.merge(user_vid_clicks, on=['did', 'vid'], how='left') else: play_data['user_vid_clicks'] = 0 # 填充缺失值 play_data['user_avg_completion'].fillna(play_data['completion_rate'].mean(), inplace=True) play_data['user_play_count'].fillna(1, inplace=True) play_data['video_avg_completion'].fillna(play_data['completion_rate'].median(), inplace=True) play_data['video_completion_std'].fillna(0, inplace=True) play_data['user_vid_clicks'].fillna(0, inplace=True) return play_data # 修复:添加缺失的函数定义 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果 - 修改为只保留点击概率最高的vid""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 存储预测结果 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates # 修改:只保留每个did点击概率最高的vid result = test_data.sort_values('click_prob', ascending=False).groupby('did').head(1) # 选择需要的列 result = result[['did', 'vid', 'completion_rate']].copy() # 重命名列 result.columns = ['did', 'vid', 'predicted_completion_rate'] return result # 主程序流程 if __name__ == "__main__": # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 - 只有在数据中存在时才添加 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) # 添加可选特征到dtypes(仅当列存在时) for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() # 加载历史数据 - 确保所有变量都被定义 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) # 打印历史数据状态 print(f"历史曝光数据形状: {hist_exposure.shape if not hist_exposure.empty else '空'}") print(f"历史点击数据形状: {hist_click.shape if not hist_click.empty else '空'}") print(f"历史播放数据形状: {hist_play.shape if not hist_play.empty else '空'}") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集,因为历史曝光或点击数据为空") click_train_data = pd.DataFrame() # 添加特征 - 确保所有参数都已定义 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, # 确保hist_click已定义 hist_play # 确保hist_play已定义 ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 基础分类特征列表 base_categorical_features = [ 'item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype', 'day_of_week', 'hour' ] # 动态获取存在的分类特征 categorical_features = [] if not click_train_data.empty: categorical_features = get_categorical_features(click_train_data, base_categorical_features) print(f"使用的分类特征: {categorical_features}") else: print("⚠️ 点击训练数据为空,无法获取分类特征") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练模型(优化参数) params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } model_click = None if not X_train.empty: train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features) print("开始训练点击预测模型...") model_click = lgb.train( params, train_data, num_boost_round=1500, valid_sets=[val_data], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) # 计算并输出AUC if not X_val.empty and not y_val.empty and model_click: y_val_pred = model_click.predict(X_val) auc_score = roc_auc_score(y_val, y_val_pred) print(f"📊 点击率模型在验证集上的AUC: {auc_score:.6f}") with open('model_metrics.txt', 'w') as f: f.write(f"点击率模型AUC: {auc_score:.6f}\n") else: print("⚠️ 训练数据为空,跳过点击预测模型训练") # 构建完播率数据集 print("开始构建完播率数据集...") if 'hist_play' in globals() and 'vid_info' in globals() and 'did_features' in globals() and 'hist_click' in globals(): play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) else: print("⚠️ 无法构建完播率数据集,因为所需变量未定义") play_train_data = pd.DataFrame() # 训练完播率模型 model_play = None if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 获取完播率模型的分类特征 play_categorical_features = [] if not play_train_data.empty: play_categorical_features = get_categorical_features(play_train_data, base_categorical_features) print(f"完播率模型使用的分类特征: {play_categorical_features}") else: print("⚠️ 完播率训练数据为空,无法获取分类特征") # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } if not X_train_play.empty: train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=play_categorical_features) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=play_categorical_features) print("开始训练完播率模型...") model_play = lgb.train( params_reg, train_data_play, num_boost_round=2000, valid_sets=[val_data_play], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: print("⚠️ 训练数据为空,跳过完播率模型训练") # 保存模型 if model_click: model_click.save_model('click_model.txt') if model_play: model_play.save_model('play_model.txt') joblib.dump(base_categorical_features, 'categorical_features.pkl') # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' # 修改:保存为无表头CSV submission.to_csv(output_file, index=False, header=False) print(f"预测结果已保存至: {output_file}") print(f"结果格式: 共 {len(submission)} 行") print(f"列信息: [did, vid, predicted_completion_rate]") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果")

import pandas as pd import math as mt import numpy as np from sklearn.model_selection import train_test_split from Recommenders import SVDRecommender triplet_dataset_sub_song_merged = triplet_dataset_sub_song_mergedpd triplet_dataset_sub_song_merged_sum_df = triplet_dataset_sub_song_merged[['user','listen_count']].groupby('user').sum().reset_index() triplet_dataset_sub_song_merged_sum_df.rename(columns={'listen_count':'total_listen_count'},inplace=True) triplet_dataset_sub_song_merged = pd.merge(triplet_dataset_sub_song_merged,triplet_dataset_sub_song_merged_sum_df) triplet_dataset_sub_song_merged['fractional_play_count'] = triplet_dataset_sub_song_merged['listen_count']/triplet_dataset_sub_song_merged small_set = triplet_dataset_sub_song_merged user_codes = small_set.user.drop_duplicates().reset_index() song_codes = small_set.song.drop_duplicates().reset_index() user_codes.rename(columns={'index':'user_index'}, inplace=True) song_codes.rename(columns={'index':'song_index'}, inplace=True) song_codes['so_index_value'] = list(song_codes.index) user_codes['us_index_value'] = list(user_codes.index) small_set = pd.merge(small_set,song_codes,how='left') small_set = pd.merge(small_set,user_codes,how='left') mat_candidate = small_set[['us_index_value','so_index_value','fractional_play_count']] data_array = mat_candidate.fractional_play_count.values row_array = mat_candidate.us_index_value.values col_array = mat_candidate.so_index_value.values data_sparse = coo_matrix((data_array, (row_array, col_array)),dtype=float) K=50 urm = data_sparse MAX_PID = urm.shape[1] MAX_UID = urm.shape[0] recommender = SVDRecommender(K) U, S, Vt = recommender.fit(urm) Compute recommendations for test users uTest = [1,6,7,8,23] uTest_recommended_items = recommender.recommend(uTest, urm, 10) Output recommended songs in a dataframe recommendations = pd.DataFrame(columns=['user','song', 'score','rank']) for user in uTest: rank = 1 for song_index in uTest_recommended_items[user, 0:10]: song = small_set.loc[small_set['so_index_value'] == song_index].iloc[0] # Get song details recommendations = recommendations.append({'user': user, 'song': song['title'], 'score': song['fractional_play_count'], 'rank': rank}, ignore_index=True) rank += 1 display(recommendations)这段代码报错了,为什么?给出修改后的 代码

1.给出模型的auc;2.模型的预测结果,第一列是did,第二列只保留点击概率最高的vid,第三列是预计的完播率,按照上面三个要求调整代码并检查无误:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime # 1. 增强数据加载函数(添加列存在性检查) def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() chunks.append(chunk) if chunks: return pd.concat(chunks, ignore_index=True) return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 2. 优化历史数据加载(添加列存在性检查) def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame,即使某些为空 return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 定义内存优化的数据类型(添加列存在性检查) dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 - 只有在数据中存在时才添加 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) # 添加可选特征到dtypes(仅当列存在时) for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() # 加载历史数据 - 确保所有变量都被定义 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) # 打印历史数据状态 print(f"历史曝光数据形状: {hist_exposure.shape if not hist_exposure.empty else '空'}") print(f"历史点击数据形状: {hist_click.shape if not hist_click.empty else '空'}") print(f"历史播放数据形状: {hist_play.shape if not hist_play.empty else '空'}") # 3. 优化点击数据集构建(添加空数据检查) def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本(使用集合操作) exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 使用10%负样本比例 - 确保hist_click已定义 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集,因为历史曝光或点击数据为空") click_train_data = pd.DataFrame() # 4. 优化特征工程(解决分类特征问题) def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征(使用索引加速合并) if not did_features.empty and 'did' in did_features.columns: df = df.merge(did_features, on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: df = df.merge(vid_info, on='vid', how='left') # 用户行为统计 user_click_count = pd.Series(dtype='int') if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') if not user_click_count.empty: df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 user_total_play = pd.Series(dtype='float') if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') if not user_total_play.empty: df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 # 视频热度统计 video_click_count = pd.Series(dtype='int') if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') if not video_click_count.empty: df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 avg_play_time = pd.Series(dtype='float') if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') if not avg_play_time.empty: df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 添加特征 - 确保所有参数都已定义 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, # 确保hist_click已定义 hist_play # 确保hist_play已定义 ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 5. 修复分类特征问题 def get_categorical_features(df, base_features): """动态获取存在的分类特征""" existing_features = [] for feature in base_features: if feature in df.columns: try: # 尝试转换为数值,如果是数值则跳过 pd.to_numeric(df[feature], errors='raise') except: existing_features.append(feature) # 确保转换为category类型 df[feature] = df[feature].astype('category').cat.as_ordered() return existing_features # 基础分类特征列表 base_categorical_features = [ 'item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype', 'day_of_week', 'hour' ] # 动态获取存在的分类特征 categorical_features = [] if not click_train_data.empty: categorical_features = get_categorical_features(click_train_data, base_categorical_features) print(f"使用的分类特征: {categorical_features}") else: print("⚠️ 点击训练数据为空,无法获取分类特征") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练模型(优化参数) params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } if not X_train.empty: train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features) print("开始训练点击预测模型...") model_click = lgb.train( params, train_data, num_boost_round=1500, valid_sets=[val_data], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") # 6. 优化完播率数据集构建 def build_play_dataset(hist_play, vid_info, did_features, hist_click): """构建完播率数据集,优化内存使用""" if hist_play.empty: print("⚠️ 历史播放数据为空,无法构建完播率数据集") return pd.DataFrame() # 基础数据 play_data = hist_play[['did', 'vid', 'play_time']].copy() # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: play_data = play_data.merge( vid_info[['vid', 'item_duration']], on='vid', how='left' ) else: play_data['item_duration'] = 1.0 # 默认值 # 计算完播率 play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 添加用户特征 if not did_features.empty and 'did' in did_features.columns: play_data = play_data.merge( did_features, on='did', how='left' ) # 添加视频特征 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col != 'item_duration'] play_data = play_data.merge( vid_info[vid_cols], on='vid', how='left' ) # 用户平均完播率 play_data['user_avg_completion'] = play_data.groupby('did')['completion_rate'].transform('mean') play_data['user_play_count'] = play_data.groupby('did')['completion_rate'].transform('count') # 视频平均完播率 play_data['video_avg_completion'] = play_data.groupby('vid')['completion_rate'].transform('mean') play_data['video_completion_std'] = play_data.groupby('vid')['completion_rate'].transform('std') # 用户-视频互动特征 if not hist_click.empty and 'did' in hist_click.columns and 'vid' in hist_click.columns: user_vid_clicks = hist_click.groupby(['did', 'vid']).size().reset_index(name='user_vid_clicks') play_data = play_data.merge(user_vid_clicks, on=['did', 'vid'], how='left') else: play_data['user_vid_clicks'] = 0 # 填充缺失值 play_data['user_avg_completion'].fillna(play_data['completion_rate'].mean(), inplace=True) play_data['user_play_count'].fillna(1, inplace=True) play_data['video_avg_completion'].fillna(play_data['completion_rate'].median(), inplace=True) play_data['video_completion_std'].fillna(0, inplace=True) play_data['user_vid_clicks'].fillna(0, inplace=True) return play_data print("开始构建完播率数据集...") # 确保所有参数都已定义 if 'hist_play' in globals() and 'vid_info' in globals() and 'did_features' in globals() and 'hist_click' in globals(): play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) else: print("⚠️ 无法构建完播率数据集,因为所需变量未定义") play_train_data = pd.DataFrame() # 7. 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 获取完播率模型的分类特征 if not play_train_data.empty: play_categorical_features = get_categorical_features(play_train_data, base_categorical_features) print(f"完播率模型使用的分类特征: {play_categorical_features}") else: play_categorical_features = [] print("⚠️ 完播率训练数据为空,无法获取分类特征") # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } if not X_train_play.empty: train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=play_categorical_features) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=play_categorical_features) print("开始训练完播率模型...") model_play = lgb.train( params_reg, train_data_play, num_boost_round=2000, valid_sets=[val_data_play], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") # 保存模型 if model_click: model_click.save_model('click_model.txt') if model_play: model_play.save_model('play_model.txt') joblib.dump(base_categorical_features, 'categorical_features.pkl') # 8. 添加预测流程 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 生成提交格式 submission = test_data.groupby('did').apply( lambda x: ' '.join(x.sort_values('score', ascending=False)['vid'].astype(str)[:100]) ).reset_index(name='vid_list') return submission # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果")

帮我检查代码:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime # 1. 增强数据加载函数(添加列存在性检查) def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() chunks.append(chunk) if chunks: return pd.concat(chunks, ignore_index=True) return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 2. 优化历史数据加载(添加列存在性检查) def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame,即使某些为空 return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 定义内存优化的数据类型(添加列存在性检查) dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 - 只有在数据中存在时才添加 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) # 添加可选特征到dtypes(仅当列存在时) for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() # 加载历史数据 - 确保所有变量都被定义 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) # 打印历史数据状态 print(f"历史曝光数据形状: {hist_exposure.shape if not hist_exposure.empty else '空'}") print(f"历史点击数据形状: {hist_click.shape if not hist_click.empty else '空'}") print(f"历史播放数据形状: {hist_play.shape if not hist_play.empty else '空'}") # 3. 优化点击数据集构建(添加空数据检查) def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本(使用集合操作) exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 使用10%负样本比例 - 确保hist_click已定义 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集,因为历史曝光或点击数据为空") click_train_data = pd.DataFrame() # 4. 优化特征工程(解决分类特征问题) def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征(使用索引加速合并) if not did_features.empty and 'did' in did_features.columns: df = df.merge(did_features, on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: df = df.merge(vid_info, on='vid', how='left') # 用户行为统计 user_click_count = pd.Series(dtype='int') if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') if not user_click_count.empty: df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 user_total_play = pd.Series(dtype='float') if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') if not user_total_play.empty: df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 # 视频热度统计 video_click_count = pd.Series(dtype='int') if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') if not video_click_count.empty: df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 avg_play_time = pd.Series(dtype='float') if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') if not avg_play_time.empty: df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 添加特征 - 确保所有参数都已定义 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, # 确保hist_click已定义 hist_play # 确保hist_play已定义 ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 5. 修复分类特征问题 def get_categorical_features(df, base_features): """动态获取存在的分类特征""" existing_features = [] for feature in base_features: if feature in df.columns: try: # 尝试转换为数值,如果是数值则跳过 pd.to_numeric(df[feature], errors='raise') except: existing_features.append(feature) # 确保转换为category类型 df[feature] = df[feature].astype('category').cat.as_ordered() return existing_features # 基础分类特征列表 base_categorical_features = [ 'item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype', 'day_of_week', 'hour' ] # 动态获取存在的分类特征 categorical_features = [] if not click_train_data.empty: categorical_features = get_categorical_features(click_train_data, base_categorical_features) print(f"使用的分类特征: {categorical_features}") else: print("⚠️ 点击训练数据为空,无法获取分类特征") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练模型(优化参数) params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } if not X_train.empty: train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features) print("开始训练点击预测模型...") model_click = lgb.train( params, train_data, num_boost_round=1500, valid_sets=[val_data], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") # 6. 优化完播率数据集构建 def build_play_dataset(hist_play, vid_info, did_features, hist_click): """构建完播率数据集,优化内存使用""" if hist_play.empty: print("⚠️ 历史播放数据为空,无法构建完播率数据集") return pd.DataFrame() # 基础数据 play_data = hist_play[['did', 'vid', 'play_time']].copy() # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: play_data = play_data.merge( vid_info[['vid', 'item_duration']], on='vid', how='left' ) else: play_data['item_duration'] = 1.0 # 默认值 # 计算完播率 play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 添加用户特征 if not did_features.empty and 'did' in did_features.columns: play_data = play_data.merge( did_features, on='did', how='left' ) # 添加视频特征 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col != 'item_duration'] play_data = play_data.merge( vid_info[vid_cols], on='vid', how='left' ) # 用户平均完播率 play_data['user_avg_completion'] = play_data.groupby('did')['completion_rate'].transform('mean') play_data['user_play_count'] = play_data.groupby('did')['completion_rate'].transform('count') # 视频平均完播率 play_data['video_avg_completion'] = play_data.groupby('vid')['completion_rate'].transform('mean') play_data['video_completion_std'] = play_data.groupby('vid')['completion_rate'].transform('std') # 用户-视频互动特征 if not hist_click.empty and 'did' in hist_click.columns and 'vid' in hist_click.columns: user_vid_clicks = hist_click.groupby(['did', 'vid']).size().reset_index(name='user_vid_clicks') play_data = play_data.merge(user_vid_clicks, on=['did', 'vid'], how='left') else: play_data['user_vid_clicks'] = 0 # 填充缺失值 play_data['user_avg_completion'].fillna(play_data['completion_rate'].mean(), inplace=True) play_data['user_play_count'].fillna(1, inplace=True) play_data['video_avg_completion'].fillna(play_data['completion_rate'].median(), inplace=True) play_data['video_completion_std'].fillna(0, inplace=True) play_data['user_vid_clicks'].fillna(0, inplace=True) return play_data print("开始构建完播率数据集...") # 确保所有参数都已定义 if 'hist_play' in globals() and 'vid_info' in globals() and 'did_features' in globals() and 'hist_click' in globals(): play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) else: print("⚠️ 无法构建完播率数据集,因为所需变量未定义") play_train_data = pd.DataFrame() # 7. 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 获取完播率模型的分类特征 if not play_train_data.empty: play_categorical_features = get_categorical_features(play_train_data, base_categorical_features) print(f"完播率模型使用的分类特征: {play_categorical_features}") else: play_categorical_features = [] print("⚠️ 完播率训练数据为空,无法获取分类特征") # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } if not X_train_play.empty: train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=play_categorical_features) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=play_categorical_features) print("开始训练完播率模型...") model_play = lgb.train( params_reg, train_data_play, num_boost_round=2000, valid_sets=[val_data_play], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") # 保存模型 if model_click: model_click.save_model('click_model.txt') if model_play: model_play.save_model('play_model.txt') joblib.dump(base_categorical_features, 'categorical_features.pkl') # 8. 添加预测流程 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 生成提交格式 submission = test_data.groupby('did').apply( lambda x: ' '.join(x.sort_values('score', ascending=False)['vid'].astype(str)[:100]) ).reset_index(name='vid_list') return submission # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果") # 8. 调整预测流程以满足新格式要求 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果,格式为:did, vid, completion_rate""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 为每个用户选择得分最高的视频 submission = test_data.sort_values('score', ascending=False).groupby('did').head(1) # 选择需要的列 submission = submission[['did', 'vid', 'completion_rate']].copy() # 重命名列以符合要求 submission.columns = ['did', 'vid', 'completion_rate'] # 确保数据格式正确 submission['did'] = submission['did'].astype(str) submission['vid'] = submission['vid'].astype(str) submission['completion_rate'] = submission['completion_rate'].round(4) # 保留4位小数 return submission # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' # 确保输出格式正确:did, vid, completion_rate submission.to_csv(output_file, index=False, header=True) print(f"预测结果已保存至: {output_file}") # 打印前5行示例 print("\n预测结果示例:") print(submission.head()) else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果")

帮我检查优化代码,尤其是减少内存占用:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime from scipy.sparse import hstack, csr_matrix, save_npz, load_npz import sys import psutil from sklearn.metrics import log_loss, mean_absolute_error # 内存优化函数 def optimize_dtypes(df): """优化DataFrame的数据类型以减少内存占用""" if df.empty: return df # 转换整数列为最小可用类型 int_cols = df.select_dtypes(include=['int']).columns if not int_cols.empty: df[int_cols] = df[int_cols].apply(pd.to_numeric, downcast='integer') # 转换浮点列为最小可用类型 float_cols = df.select_dtypes(include=['float']).columns if not float_cols.empty: df[float_cols] = df[float_cols].apply(pd.to_numeric, downcast='float') # 转换对象列为分类类型 obj_cols = df.select_dtypes(include=['object']).columns for col in obj_cols: num_unique = df[col].nunique() num_total = len(df) if num_unique / num_total < 0.5: # 如果唯一值比例小于50% df[col] = df[col].astype('category') return df # 内存监控函数 def memory_monitor(step_name=""): """监控内存使用情况""" process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB") return mem_info.rss / (1024 ** 2) # 返回MB # 增强数据加载函数 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() # 优化数据类型 chunk = optimize_dtypes(chunk) chunks.append(chunk) if chunks: result = pd.concat(chunks, ignore_index=True) # 再次整体优化 result = optimize_dtypes(result) return result return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 稀疏矩阵转换函数 def to_sparse_matrix(df, columns): """将分类特征转换为稀疏矩阵表示""" sparse_matrices = [] for col in columns: if col in df.columns: # 处理NaN值 df[col] = df[col].fillna('MISSING') # 创建稀疏矩阵 sparse_mat = csr_matrix(pd.get_dummies(df[col], sparse=True).values) sparse_matrices.append(sparse_mat) # 水平堆叠所有稀疏矩阵 if sparse_matrices: return hstack(sparse_matrices) return None # 增量训练函数 def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000): """分块增量训练模型以减少内存占用""" model = None for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"): chunk_end = min(i + chunk_size, len(X)) X_chunk = X.iloc[i:chunk_end] y_chunk = y.iloc[i:chunk_end] train_data = lgb.Dataset( X_chunk, label=y_chunk, categorical_feature=categorical_features ) if model is None: model = lgb.train( params, train_data, num_boost_round=num_rounds, keep_training_booster=True ) else: model = lgb.train( params, train_data, num_boost_round=num_rounds, init_model=model, keep_training_booster=True ) return model # 主处理流程 def main(): """主处理流程,包含完整的内存优化策略""" # 初始内存监控 start_mem = memory_monitor("初始内存") # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) memory_monitor("加载核心数据后") # 添加可选特征到dtypes for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() memory_monitor("重新加载数据后") # 加载历史数据 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) memory_monitor("加载历史数据后") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集") click_train_data = pd.DataFrame() memory_monitor("构建点击数据集后") # 添加特征 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, hist_play ) else: print("⚠️ 点击数据集为空,跳过特征构建") memory_monitor("添加特征后") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() memory_monitor("划分数据集后") # 训练模型参数 params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } # 增量训练点击模型 if not X_train.empty: print("开始训练点击预测模型...") model_click = train_incremental(X_train, y_train, categorical_features, params, num_rounds=1500, chunk_size=100000) # 在验证集上评估 val_preds = model_click.predict(X_val) val_logloss = log_loss(y_val, val_preds) print(f"验证集LogLoss: {val_logloss:.4f}") else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") memory_monitor("训练点击模型后") # 构建完播率数据集 print("开始构建完播率数据集...") play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) memory_monitor("构建完播率数据集后") # 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } # 增量训练完播率模型 if not X_train_play.empty: print("开始训练完播率模型...") model_play = train_incremental(X_train_play, y_train_play, play_categorical_features, params_reg, num_rounds=2000, chunk_size=100000) # 在验证集上评估 val_preds = model_play.predict(X_val_play) val_mae = mean_absolute_error(y_val_play, val_preds) print(f"验证集MAE: {val_mae:.4f}") else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") memory_monitor("训练完播率模型后") # 保存模型 if model_click: model_click.save_model('click_model.txt') print("点击预测模型已保存") if model_play: model_play.save_model('play_model.txt') print("完播率预测模型已保存") # 预测流程 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果") # 最终内存报告 end_mem = memory_monitor("处理完成") print(f"总内存消耗: {end_mem - start_mem:.2f} MB") # 历史数据加载函数 def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 点击数据集构建 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本 exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 特征工程函数 def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征 if not did_features.empty and 'did' in did_features.columns: # 只取需要的列 did_cols = [col for col in did_features.columns if col not in ['did'] or col == 'did'] df = df.merge(did_features[did_cols], on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col not in ['vid'] or col == 'vid'] df = df.merge(vid_info[vid_cols], on='vid', how='left') # 用户行为统计 if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 预测函数 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: # 分块预测避免内存问题 click_probs = [] chunk_size = 100000 for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] click_probs.extend(model_click.predict(chunk)) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 # 分块预测 completion_rates = [] for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] completion_rates.extend(model_play.predict(chunk)) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 为每个用户选择得分最高的视频 submission = test_data.sort_values('score', ascending=False).groupby('did').head(1) # 选择需要的列 submission = submission[['did', 'vid', 'completion_rate']].copy() # 重命名列 submission.columns = ['did', 'vid', 'completion_rate'] # 确保数据格式正确 submission['did'] = submission['did'].astype(str) submission['vid'] = submission['vid'].astype(str) submission['completion_rate'] = submission['completion_rate'].round(4) return submission # 主程序入口 if __name__ == "__main__": main()

请帮我检查优化代码,并完整输出结果:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation from sklearn.model_selection import train_test_split from sklearn.metrics import roc_auc_score import chardet def detect_encoding(file_path): with open(file_path, 'rb') as f: result = chardet.detect(f.read(10000)) return result['encoding'], result['confidence'] def load_all_data(days=32): see_list, click_list, play_list = [], [], [] dtypes = {'did': 'category', 'vid': 'category'} for i in range(1, days + 1): day = f"{i:02d}" # 加载 see 数据 see = pd.read_csv(f'see_{day}.csv', encoding='latin1', dtype=dtypes) if 'did' not in see.columns or 'vid' not in see.columns: raise ValueError(f"see_{day}.csv 缺少必要字段") see['day'] = day see_list.append(see) # 加载 click 数据 click = pd.read_csv( f'click_{day}.csv', encoding='ISO-8859-1', on_bad_lines='skip', dtype=dtypes ) if 'click_time' not in click.columns: raise ValueError(f"click_{day}.csv 缺少 click_time 字段") click['date'] = pd.to_datetime(click['click_time']).dt.date click_list.append(click[['did', 'vid', 'date']]) # 加载 play 数据 play = pd.read_csv( f'playplus_{day}.csv', engine='python', encoding_errors='ignore', dtype=dtypes ) if 'play_time' not in play.columns: raise ValueError(f"playplus_{day}.csv 缺少 play_time 字段") play_list.append(play[['did', 'vid', 'play_time']]) all_see = pd.concat(see_list).drop_duplicates(['did', 'vid']) all_click = pd.concat(click_list).drop_duplicates(['did', 'vid']) all_play = pd.concat(play_list).groupby(['did', 'vid'], observed=True).sum().reset_index() return all_see, all_click, all_play def prepare_samples(all_see, all_click, all_play): video_info = pd.read_csv('vid_info_table.csv', encoding='gbk', dtype={'vid': 'category'}) # 合并基础数据 samples = all_see.merge(all_play, on=['did', 'vid'], how='left').fillna({'play_time': 0}) samples = samples.merge(video_info, on='vid', how='left') # 计算完成率(仅用于分析,不用于预测) samples['completion_rate'] = (samples['play_time'] / samples['item_duration']).clip(0, 1).astype(np.float32) # 点击标记 click_flag = all_click.groupby(['did', 'vid']).size().reset_index(name='clicked') click_flag['clicked'] = 1 samples = samples.merge(click_flag, on=['did', 'vid'], how='left').fillna({'clicked': 0}) samples['clicked'] = samples['clicked'].astype(np.int8) # 标签定义 samples['label'] = np.select( [ (samples['completion_rate'] > 0.9), (samples['clicked'] == 1) ], [2, 1], # 2=完成, 1=点击 default=0 # 0=曝光未点击 ) # 二分类目标(点击或完成为正类) samples['binary_label'] = samples['label'].apply(lambda x: 1 if x >= 1 else 0).astype(int) # 计算用户点击率(修正版) user_exposure = all_see.groupby('did').size().rename('exposure_count') user_click_count = all_click.groupby('did').size().rename('click_count') user_click_rate = (user_click_count / user_exposure).fillna(0).astype(np.float32) # 视频流行度 video_popularity = all_click.groupby('vid').size().rename('video_popularity') # 映射特征 samples['user_click_rate'] = samples['did'].map(user_click_rate).fillna(0) samples['video_popularity'] = samples['vid'].map(video_popularity).fillna(0) # 修复:保存唯一用户点击率(关键修复点) user_click_rate_df = pd.DataFrame({ 'did': user_click_rate.index, 'user_click_rate': user_click_rate.values }).drop_duplicates('did') # 修复:保存唯一视频流行度 video_popularity_df = pd.DataFrame({ 'vid': video_popularity.index, 'video_popularity': video_popularity.values }).drop_duplicates('vid') # 保存特征 user_click_rate_df.to_csv('user_click_rate.csv', index=False) video_popularity_df.to_csv('video_popularity.csv', index=False) return samples, user_click_rate, video_popularity def train_model(samples): # 仅使用可复现的特征 features = ['user_click_rate', 'video_popularity'] X = samples[features] y = samples['binary_label'] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) lgb_train = lgb.Dataset(X_train, y_train) lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train) params = { 'boosting_type': 'gbdt', 'objective': 'binary', 'metric': 'auc', 'num_leaves': 31, 'learning_rate': 0.05, 'feature_fraction': 0.9, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'verbose': -1 } model = lgb.train( params, lgb_train, num_boost_round=100, valid_sets=[lgb_train, lgb_eval], callbacks=[ early_stopping(stopping_rounds=20), log_evaluation(period=50) ] ) y_pred = model.predict(X_test) auc_score = roc_auc_score(y_test, y_pred) print(f"Validation AUC: {auc_score:.4f}") return model, features, auc_score def predict_new_data(model, feature_columns, test_file): # 读取测试数据 test_data = pd.read_csv(test_file, dtype={'did': 'category', 'vid': 'category'}) # 修复:正确读取特征映射 user_click_rate_df = pd.read_csv('user_click_rate.csv') video_popularity_df = pd.read_csv('video_popularity.csv') # 计算全局均值用于填充新用户/新视频 global_user_rate = user_click_rate_df['user_click_rate'].mean() global_video_pop = video_popularity_df['video_popularity'].mean() # 创建映射字典 user_click_map = user_click_rate_df.set_index('did')['user_click_rate'].to_dict() video_pop_map = video_popularity_df.set_index('vid')['video_popularity'].to_dict() # 映射特征 test_data['user_click_rate'] = test_data['did'].map(user_click_map).fillna(global_user_rate) test_data['video_popularity'] = test_data['vid'].map(video_pop_map).fillna(global_video_pop) # 预测 test_data['click_prob'] = model.predict(test_data[feature_columns]) # 生成结果 top_predictions = test_data.sort_values('click_prob', ascending=False).groupby('did').head(1) result = top_predictions[['did', 'vid', 'click_prob']].copy() result.columns = ['did', 'vid', 'click_prob'] result.to_csv('prediction_result.csv', index=False) return result if __name__ == '__main__': encoding, confidence = detect_encoding('see_01.csv') print(f"编码: {encoding}, 置信度: {confidence:.2f}") all_see, all_click, all_play = load_all_data() samples, _, _ = prepare_samples(all_see, all_click, all_play) model, features, auc_score = train_model(samples) result = predict_new_data(model, features, 'testA_did_show.csv')

检查并优化:import pandas as pd import numpy as np import lightgbm as lgb import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import psutil from sklearn.metrics import log_loss, mean_absolute_error from scipy.sparse import hstack, csr_matrix, save_npz, load_npz import warnings warnings.filterwarnings('ignore') # 内存优化函数 - 增强版 def optimize_dtypes(df, downcast_int=True, downcast_float=True, category_threshold=0.5): """优化DataFrame的数据类型以减少内存占用""" if df.empty: return df # 转换整数列为最小可用类型 if downcast_int: int_cols = df.select_dtypes(include=['int']).columns for col in int_cols: df[col] = pd.to_numeric(df[col], downcast='integer') # 转换浮点列为float32 if downcast_float: float_cols = df.select_dtypes(include=['float']).columns for col in float_cols: # 优先转换为float32而不是downcast='float'以获得更好控制 df[col] = df[col].astype(np.float32) # 转换对象列为分类类型 obj_cols = df.select_dtypes(include=['object']).columns for col in obj_cols: num_unique = df[col].nunique() num_total = len(df) if num_unique / num_total < category_threshold: df[col] = df[col].astype('category') return df # 增强数据加载函数 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=50000, verbose=True): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 获取文件大小用于进度条 file_size = os.path.getsize(file_path) / (1024 ** 2) # MB desc = f"加载 {os.path.basename(file_path)} ({file_size:.1f}MB)" # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=desc, disable=not verbose): # 优化数据类型 chunk = optimize_dtypes(chunk) chunks.append(chunk) if chunks: result = pd.concat(chunks, ignore_index=True) # 再次整体优化 result = optimize_dtypes(result) return result return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 稀疏矩阵转换函数 - 优化版 def to_sparse_matrix(df, columns, fillna='MISSING', dtype=np.int8): """将分类特征转换为稀疏矩阵表示""" from sklearn.preprocessing import OneHotEncoder # 预处理数据 sparse_data = df[columns].fillna(fillna).astype(str) # 使用OneHotEncoder替代get_dummies以获得更好性能 encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=True, dtype=dtype) sparse_matrix = encoder.fit_transform(sparse_data) return sparse_matrix, encoder # 增量训练函数 - 优化内存管理 def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000): """分块增量训练模型以减少内存占用""" model = None callbacks = [lgb.early_stopping(stopping_rounds=50, verbose=0), lgb.log_evaluation(period=100)] for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"): chunk_end = min(i + chunk_size, len(X)) # 使用视图避免复制数据 X_chunk = X.iloc[i:chunk_end] y_chunk = y.iloc[i:chunk_end] # 创建数据集后立即释放原始数据 train_data = lgb.Dataset( X_chunk, label=y_chunk, categorical_feature=categorical_features, free_raw_data=True # 训练后释放原始数据 ) if model is None: model = lgb.train( params, train_data, num_boost_round=num_rounds, callbacks=callbacks, keep_training_booster=True ) else: model = lgb.train( params, train_data, num_boost_round=num_rounds, init_model=model, callbacks=callbacks, keep_training_booster=True ) # 显式释放内存 del train_data, X_chunk, y_chunk gc.collect() return model # 历史数据加载函数 - 优化内存 def load_historical_data(days=32, verbose=True): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据", disable=not verbose): day_str = f"{day:02d}" # 加载曝光数据 - 仅加载必要列 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely( see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}, verbose=verbose ) if not see.empty: see_list.append(see) del see # 加载点击数据 - 优化日期处理 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely( click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}, verbose=verbose ) if not click.empty and 'click_time' in click.columns: # 直接解析日期为数值类型 click_dates = pd.to_datetime(click['click_time'], errors='coerce') click['date'] = click_dates.dt.strftime('%Y%m%d').astype('int32') click = click.drop(columns=['click_time']) click_list.append(click[['did', 'vid', 'date']]) del click, click_dates # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely( play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category', 'play_time': 'float32'}, verbose=verbose ) if not play.empty: play_list.append(play) del play gc.collect() # 使用concat时避免创建中间对象 return ( pd.concat(see_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 点击数据集构建 - 内存优化版 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1, verbose=True): """构建点击数据集,包含负样本采样 - 内存优化版""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 - 使用视图避免复制 pos_samples = hist_click[['did', 'vid']].copy() pos_samples['label'] = 1 # 创建曝光集索引用于高效查找 exposure_index = hist_exposure.set_index(['did', 'vid']).index # 分块处理负样本 neg_chunks = [] chunk_size = 500000 total_rows = len(hist_exposure) for start in tqdm(range(0, total_rows, chunk_size), desc="构建负样本", disable=not verbose): end = min(start + chunk_size, total_rows) chunk = hist_exposure.iloc[start:end] # 使用索引查找未点击的曝光 chunk['is_clicked'] = chunk.set_index(['did', 'vid']).index.isin(hist_click.set_index(['did', 'vid']).index) neg_chunk = chunk[~chunk['is_clicked']][['did', 'vid']] if not neg_chunk.empty and sample_ratio < 1.0: neg_chunk = neg_chunk.sample(frac=sample_ratio, random_state=42) neg_chunks.append(neg_chunk) del chunk, neg_chunk # 合并负样本 neg_samples = pd.concat(neg_chunks, ignore_index=True) neg_samples['label'] = 0 # 合并正负样本 click_data = pd.concat([pos_samples, neg_samples], ignore_index=True, copy=False) # 释放内存 del exposure_index, pos_samples, neg_samples, neg_chunks gc.collect() return click_data # 在合并操作前添加检查 if 'total' not in df.columns: print("警告:'total' 列不存在于 DataFrame 中") print("可用列名:", df.columns.tolist()) # 尝试找出可能的拼写错误 possible_matches = [col for col in df.columns if 'total' in col.lower()] if possible_matches: print("可能的匹配列:", possible_matches) # 特征工程函数 - 内存优化版 def add_click_features(df, did_features, vid_info, hist_click, hist_play, verbose=True): """添加关键特征,避免内存溢出 - 优化版""" if df.empty: return df # 1. 合并设备特征 - 仅选择必要列 if not did_features.empty and 'did' in did_features.columns: did_cols = ['did'] + [col for col in did_features.columns if col.startswith('f')] df = df.merge(did_features[did_cols], on='did', how='left') # 2. 合并视频特征 - 仅选择必要列 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = ['vid', 'item_duration'] + [col for col in vid_info.columns if col in ['item_cid', 'item_type']] df = df.merge(vid_info[vid_cols], on='vid', how='left') # 3. 预聚合统计特征 - 减少重复计算 stats = {} # 用户行为统计 if not hist_click.empty: stats['user_click_count'] = hist_click.groupby('did').size().astype('int32') stats['video_click_count'] = hist_click.groupby('vid').size().astype('int32') if not hist_play.empty: stats['user_total_play'] = hist_play.groupby('did')['play_time'].sum().astype('float32') stats['avg_play_time'] = hist_play.groupby('vid')['play_time'].mean().astype('float32') # 4. 合并统计特征 for name, stat_df in tqdm(stats.items(), desc="添加统计特征", disable=not verbose): if name in df.columns: continue df = df.merge(stat_df.rename(name), how='left', left_on=name.split('_')[1], right_index=True) # 5. 填充缺失值 - 使用更高效的方法 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0, 'item_duration': df['item_duration'].median() if 'item_duration' in df else 30.0 } for col, default in fill_values.items(): if col in df: # 使用inplace填充减少内存分配 df[col].fillna(default, inplace=True) # 6. 添加时间特征 - 使用数值替代分类 if 'date' in df: # 直接计算数值特征,避免创建datetime对象 df['day_of_week'] = (df['date'] % 7).astype('int8') df['is_weekend'] = (df['day_of_week'] >= 5).astype('int8') df.drop(columns=['date'], inplace=True, errors='ignore') return df # 主处理流程 - 内存优化版 def main(): """主处理流程,包含完整的内存优化策略""" # 初始内存监控 start_mem = memory_monitor("初始内存") # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) memory_monitor("加载核心数据后") # 加载历史数据 - 减少加载天数 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=14) # 减少到14天 memory_monitor("加载历史数据后") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) # 立即释放不再需要的数据 del hist_exposure, hist_click gc.collect() else: print("⚠️ 无法构建点击数据集") click_train_data = pd.DataFrame() memory_monitor("构建点击数据集后") # 添加特征 - 使用增量方式 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click if 'hist_click' in locals() else pd.DataFrame(), hist_play ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 立即释放内存 del hist_play gc.collect() memory_monitor("添加特征后") # 准备训练数据 - 使用视图避免复制 if not click_train_data.empty: cols_to_drop = ['did', 'vid', 'label'] if 'date' in click_train_data.columns: cols_to_drop.append('date') X = click_train_data.drop(columns=cols_to_drop, errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series(dtype='float32') print("⚠️ 点击训练数据为空") # 划分数据集 - 使用索引避免复制 if len(X) > 0: indices = np.arange(len(X)) train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=42, stratify=y) X_train, X_val = X.iloc[train_idx], X.iloc[val_idx] y_train, y_val = y.iloc[train_idx], y.iloc[val_idx] else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 释放click_train_data del click_train_data, X, y, indices gc.collect() memory_monitor("划分数据集后") # 训练模型参数 - 调整为更节省内存的参数 params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 31, # 减少叶子节点数 'learning_rate': 0.05, 'feature_fraction': 0.7, # 减少特征使用比例 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 200, # 增加最小样本数 'verbosity': -1, 'max_depth': -1, # 避免过深 'seed': 42 } # 增量训练点击模型 if len(X_train) > 0: print("开始训练点击预测模型...") model_click = train_incremental(X_train, y_train, [], params, num_rounds=1000, chunk_size=100000) # 在验证集上评估 if len(X_val) > 0: # 分块预测避免内存峰值 chunk_size = 50000 val_preds = [] for i in range(0, len(X_val), chunk_size): chunk = X_val.iloc[i:i+chunk_size] val_preds.extend(model_click.predict(chunk)) val_logloss = log_loss(y_val, val_preds) print(f"验证集LogLoss: {val_logloss:.4f}") else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") # 释放训练数据 del X_train, X_val, y_train, y_val gc.collect() memory_monitor("训练点击模型后") # 最终内存报告 end_mem = memory_monitor("处理完成") print(f"总内存消耗: {end_mem - start_mem:.2f} MB") # 内存监控函数 def memory_monitor(step_name=""): """监控内存使用情况""" process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB") return mem_info.rss / (1024 ** 2) # 返回MB if __name__ == "__main__": main()

帮我检查代码,如有可以加速计算可以优化:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime from sklearn.metrics import roc_auc_score, mean_absolute_error # 全局变量存储特征列表 click_features = [] play_features = [] base_categorical_features = [] # 优化内存使用:减小chunksize,添加更多内存清理 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=50000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 - 减小chunksize以降低内存峰值 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 if dtype: # 确保dtype不为空 for col in chunk.columns: if col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() chunks.append(chunk) if chunks: result_df = pd.concat(chunks, ignore_index=True) del chunks gc.collect() return result_df return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 修复播放数据加载问题 def load_historical_data(days=30): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() else: print(f"⚠️ 曝光数据文件不存在: {see_path}") # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'did' in click.columns and 'vid' in click.columns: click_list.append(click[['did', 'vid']]) del click gc.collect() else: print(f"⚠️ 点击数据文件不存在: {click_path}") # 修复播放数据加载问题 - 尝试多种可能的文件名格式 play_paths = [ f'playplus_{day_str}.csv', # 原始文件名 f'play_{day_str}.csv', # 可能的其他格式 f'playplus_{day}.csv', # 无前导零 f'play_{day}.csv' # 无前导零 ] play_loaded = False for play_path in play_paths: if os.path.exists(play_path): play = load_data_safely( play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'} ) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play play_loaded = True print(f"✅ 成功加载播放数据: {play_path}") break if not play_loaded: print(f"⚠️ 播放数据文件不存在: 尝试了 {play_paths}") # 每处理3天数据清理一次内存 if day % 3 == 0: gc.collect() # 确保返回三个DataFrame,即使某些为空 return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 优化内存:使用更高效的方法处理负样本 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本(使用索引加速操作) exposure_index = hist_exposure.set_index(['did', 'vid']).index click_index = hist_click.set_index(['did', 'vid']).index # 找出未点击的曝光 negative_index = exposure_index.difference(click_index) # 创建负样本DataFrame if not negative_index.empty: negative_samples = pd.DataFrame( list(negative_index), columns=['did', 'vid'] ) negative_samples['label'] = 0 # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_index, click_index, negative_index, negative_samples gc.collect() return click_data # 优化内存:减少合并操作,使用更高效的特征添加方法 def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征(使用索引加速合并) if not did_features.empty and 'did' in did_features.columns: # 只选择需要的列 did_cols = ['did'] + [col for col in did_features.columns if col.startswith('f')] df = df.merge(did_features[did_cols], on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: # 只选择分类特征 vid_cols = ['vid'] + [col for col in vid_info.columns if col in ['item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact']] df = df.merge(vid_info[vid_cols], on='vid', how='left') # 确保始终创建 'user_click_count' 列 df['user_click_count'] = 0 # 用户行为统计 - 使用预计算的统计量 if not hist_click.empty and 'did' in hist_click.columns: # 计算用户点击次数 user_click_count = hist_click.groupby('did').size().rename('user_click_count') # 直接添加到df,避免创建中间变量 df = df.join(user_click_count, on='did', how='left') # 填充缺失值 df['user_click_count'] = df['user_click_count'].fillna(0) # 确保始终创建 'user_total_play' 列 df['user_total_play'] = 0 if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: # 计算用户总播放时间 user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') df = df.join(user_total_play, on='did', how='left') # 填充缺失值 df['user_total_play'] = df['user_total_play'].fillna(0) # 确保始终创建 'video_click_count' 列 df['video_click_count'] = 0 # 视频热度统计 if not hist_click.empty and 'vid' in hist_click.columns: # 计算视频点击次数 video_click_count = hist_click.groupby('vid').size().rename('video_click_count') df = df.join(video_click_count, on='vid', how='left') # 填充缺失值 df['video_click_count'] = df['video_click_count'].fillna(0) # 确保始终创建 'avg_play_time' 列 df['avg_play_time'] = 0 if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: # 计算平均播放时间 avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') df = df.join(avg_play_time, on='vid', how='left') # 填充缺失值 df['avg_play_time'] = df['avg_play_time'].fillna(0) # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 移除日期相关特征 if 'date' in df: df = df.drop(columns=['date'], errors='ignore') return df # 优化内存:使用更小的数据类型 def get_categorical_features(df, base_features): """动态获取存在的分类特征""" existing_features = [] for feature in base_features: if feature in df.columns: try: # 尝试转换为数值,如果是数值则跳过 pd.to_numeric(df[feature], errors='raise') except: existing_features.append(feature) # 确保转换为category类型 df[feature] = df[feature].astype('category').cat.as_ordered() return existing_features # 优化内存:减少中间变量,使用transform避免创建大型临时DataFrame def build_play_dataset(hist_play, vid_info, did_features, hist_click): """构建完播率数据集,优化内存使用""" if hist_play.empty: print("⚠️ 历史播放数据为空,无法构建完播率数据集") return pd.DataFrame() # 基础数据 - 只选择需要的列 play_data = hist_play[['did', 'vid', 'play_time']].copy() # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: play_data = play_data.merge( vid_info[['vid', 'item_duration']], on='vid', how='left' ) else: play_data['item_duration'] = 1.0 # 默认值 # 计算完播率 play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 添加用户特征 - 只选择数值特征 if not did_features.empty and 'did' in did_features.columns: did_cols = ['did'] + [col for col in did_features.columns if col.startswith('f')] play_data = play_data.merge( did_features[did_cols], on='did', how='left' ) # 添加视频特征 - 只选择分类特征 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = ['vid'] + [col for col in vid_info.columns if col in ['item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact']] play_data = play_data.merge( vid_info[vid_cols], on='vid', how='left' ) # 用户平均完播率 - 使用transform避免创建大型临时DataFrame play_data['user_avg_completion'] = play_data.groupby('did')['completion_rate'].transform('mean') play_data['user_play_count'] = play_data.groupby('did')['completion_rate'].transform('count') # 视频平均完播率 play_data['video_avg_completion'] = play_data.groupby('vid')['completion_rate'].transform('mean') play_data['video_completion_std'] = play_data.groupby('vid')['completion_rate'].transform('std') # 用户-视频互动特征 if not hist_click.empty and 'did' in hist_click.columns and 'vid' in hist_click.columns: # 使用transform避免创建大型临时DataFrame user_vid_clicks = hist_click.groupby(['did', 'vid']).size().reset_index(name='user_vid_clicks') play_data = play_data.merge(user_vid_clicks, on=['did', 'vid'], how='left') else: play_data['user_vid_clicks'] = 0 # 添加交互特征 - 确保训练和预测时特征一致 play_data['interaction_feature'] = (play_data['user_click_count'] * play_data['video_click_count']).astype('float32') # 填充缺失值 play_data['user_avg_completion'].fillna(play_data['completion_rate'].mean(), inplace=True) play_data['user_play_count'].fillna(1, inplace=True) play_data['video_avg_completion'].fillna(play_data['completion_rate'].median(), inplace=True) play_data['video_completion_std'].fillna(0, inplace=True) play_data['user_vid_clicks'].fillna(0, inplace=True) return play_data # 优化内存:分批预测,避免一次性加载所有测试数据 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果 - 确保结果行数与test_users一致""" if test_users.empty: print("⚠️ 测试用户数据为空,无法进行预测") return pd.DataFrame() # 确保每个测试用户都有记录 if test_exposure.empty: # 如果没有曝光数据,使用默认视频 print("⚠️ 测试曝光数据为空,使用默认视频") test_data = test_users.copy() test_data['vid'] = vid_info['vid'].iloc[0] if not vid_info.empty else 'default_vid' else: # 合并测试数据,确保包含所有测试用户 test_data = test_users.merge(test_exposure, on='did', how='left') # 处理可能缺失的vid most_common_vid = test_exposure['vid'].mode()[0] if not test_exposure.empty else 'default_vid' test_data['vid'] = test_data['vid'].fillna(most_common_vid) # 分批处理测试数据以避免内存溢出 chunk_size = 50000 # 每批处理5万行 results = [] for i in tqdm(range(0, len(test_data), chunk_size), desc="分批预测"): chunk = test_data.iloc[i:i+chunk_size].copy() # 添加特征 chunk = add_click_features( chunk, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(chunk, base_categorical_features) # 预测点击率 X_chunk = chunk.drop(columns=['did', 'vid'], errors='ignore') click_probs = [] if model_click and not X_chunk.empty: # 确保特征数量一致 if len(X_chunk.columns) != len(click_features): print(f"⚠️ 点击模型特征数量不一致: 训练时 {len(click_features)}, 预测时 {len(X_chunk.columns)}") # 对齐特征 missing_features = set(click_features) - set(X_chunk.columns) extra_features = set(X_chunk.columns) - set(click_features) # 添加缺失特征 for feature in missing_features: X_chunk[feature] = 0 # 移除多余特征 X_chunk = X_chunk[click_features] click_probs = model_click.predict(X_chunk) else: click_probs = [0.5] * len(chunk) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_chunk.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: chunk = chunk.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: chunk['item_duration'] = 1.0 # 添加交互特征 - 确保与训练时一致 chunk['interaction_feature'] = (chunk['user_click_count'] * chunk['video_click_count']).astype('float32') # 准备预测数据 X_play_chunk = chunk.drop(columns=['did', 'vid'], errors='ignore') # 确保特征数量一致 if len(X_play_chunk.columns) != len(play_features): print(f"⚠️ 完播率模型特征数量不一致: 训练时 {len(play_features)}, 预测时 {len(X_play_chunk.columns)}") # 对齐特征 missing_features = set(play_features) - set(X_play_chunk.columns) extra_features = set(X_play_chunk.columns) - set(play_features) # 添加缺失特征 for feature in missing_features: X_play_chunk[feature] = 0 # 移除多余特征 X_play_chunk = X_play_chunk[play_features] completion_rates = model_play.predict(X_play_chunk) else: completion_rates = [0.7] * len(chunk) # 默认值 # 存储预测结果 chunk['click_prob'] = click_probs chunk['completion_rate'] = completion_rates # 修改:确保每个did只有一行结果,选取点击概率最高的vid chunk_result = chunk.sort_values('click_prob', ascending=False).groupby('did').head(1) # 选择需要的列 chunk_result = chunk_result[['did', 'vid', 'completion_rate']].copy() results.append(chunk_result) # 清理内存 del chunk, X_chunk, click_probs, completion_rates, chunk_result gc.collect() # 合并所有批次结果 if results: result = pd.concat(results, ignore_index=True) else: result = pd.DataFrame(columns=['did', 'vid', 'completion_rate']) # 重命名列 result.columns = ['did', 'vid', 'predicted_completion_rate'] # 确保结果行数与测试用户一致 if len(result) != len(test_users): missing_dids = set(test_users['did']) - set(result['did']) print(f"⚠️ 警告: {len(missing_dids)} 个用户缺失预测结果,使用默认值填充") default_df = pd.DataFrame({ 'did': list(missing_dids), 'vid': most_common_vid, 'predicted_completion_rate': np.mean(result['predicted_completion_rate']) if not result.empty else 0.7 }) result = pd.concat([result, default_df], ignore_index=True) return result # 主程序流程 if __name__ == "__main__": # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 - 只有在数据中存在时才添加 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 - 分批加载 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) # 添加可选特征到dtypes(仅当列存在时) for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() # 加载历史数据 - 确保所有变量都被定义 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=30) # 打印历史数据状态 print(f"历史曝光数据形状: {hist_exposure.shape if not hist_exposure.empty else '空'}") print(f"历史点击数据形状: {hist_click.shape if not hist_click.empty else '空'}") print(f"历史播放数据形状: {hist_play.shape if not hist_play.empty else '空'}") # 如果播放数据为空,尝试替代方案 if hist_play.empty: print("⚠️ 警告: 历史播放数据为空,尝试使用点击数据作为替代") # 使用点击数据作为播放数据的替代 hist_play = hist_click.copy() hist_play['play_time'] = 1.0 # 添加默认播放时间 print(f"使用替代播放数据形状: {hist_play.shape}") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集,因为历史曝光或点击数据为空") click_train_data = pd.DataFrame() # 添加特征 - 确保所有参数都已定义 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, # 确保hist_click已定义 hist_play # 确保hist_play已定义 ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 基础分类特征列表 - 移除日期相关特征 base_categorical_features = [ 'item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype' ] # 动态获取存在的分类特征 categorical_features = [] if not click_train_data.empty: categorical_features = get_categorical_features(click_train_data, base_categorical_features) print(f"使用的分类特征: {categorical_features}") else: print("⚠️ 点击训练数据为空,无法获取分类特征") # 准备训练数据 if not click_train_data.empty: # 移除所有日期相关字段 X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练模型(优化参数) params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1, 'max_bin': 255 # 减少bin数量以降低内存 } model_click = None if not X_train.empty: train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features) print("开始训练点击预测模型...") model_click = lgb.train( params, train_data, num_boost_round=1500, valid_sets=[val_data], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) # 保存点击模型使用的特征 global click_features click_features = list(X_train.columns) joblib.dump(click_features, 'click_features.pkl') # 计算并输出AUC if not X_val.empty and not y_val.empty and model_click: y_val_pred = model_click.predict(X_val) auc_score = roc_auc_score(y_val, y_val_pred) print(f"📊 点击率模型在验证集上的AUC: {auc_score:.6f}") with open('model_metrics.txt', 'w') as f: f.write(f"点击率模型AUC: {auc_score:.6f}\n") # 清理内存 del X_train, X_val, y_train, y_val, train_data, val_data gc.collect() else: print("⚠️ 训练数据为空,跳过点击预测模型训练") # 构建完播率数据集 print("开始构建完播率数据集...") if not hist_play.empty: play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) else: print("⚠️ 无法构建完播率数据集,因为播放数据为空") play_train_data = pd.DataFrame() # 训练完播率模型 model_play = None if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 获取完播率模型的分类特征 play_categorical_features = [] if not play_train_data.empty: play_categorical_features = get_categorical_features(play_train_data, base_categorical_features) print(f"完播率模型使用的分类特征: {play_categorical_features}") else: print("⚠️ 完播率训练数据为空,无法获取分类特征") # 训练参数 - 优化内存使用 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, # 减少树复杂度 'learning_rate': 0.03, 'feature_fraction': 0.7, 'bagging_fraction': 0.7, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 100, 'verbosity': -1, 'max_bin': 255 # 减少bin数量以降低内存 } if not X_train_play.empty: train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=play_categorical_features) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=play_categorical_features) print("开始训练完播率模型...") model_play = lgb.train( params_reg, train_data_play, num_boost_round=1000, # 减少迭代次数 valid_sets=[val_data_play], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) # 保存完播率模型使用的特征 global play_features play_features = list(X_train_play.columns) joblib.dump(play_features, 'play_features.pkl') # 评估模型 y_pred_val = model_play.predict(X_val_play) mae = mean_absolute_error(y_val_play, y_pred_val) print(f"📊 完播率模型在验证集上的MAE: {mae:.6f}") with open('model_metrics.txt', 'a') as f: f.write(f"完播率模型MAE: {mae:.6f}\n") # 清理内存 del X_train_play, X_val_play, y_train_play, y_val_play, train_data_play, val_data_play gc.collect() else: print("⚠️ 训练数据为空,跳过完播率模型训练") # 保存模型 if model_click: model_click.save_model('click_model.txt') if model_play: model_play.save_model('play_model.txt') joblib.dump(base_categorical_features, 'categorical_features.pkl') # 如果是从文件加载模型,需要加载特征列表 if not model_click: try: model_click = lgb.Booster(model_file='click_model.txt') click_features = joblib.load('click_features.pkl') print("✅ 从文件加载点击模型和特征") except: print("⚠️ 无法加载点击模型") if not model_play: try: model_play = lgb.Booster(model_file='play_model.txt') play_features = joblib.load('play_features.pkl') print("✅ 从文件加载完播率模型和特征") except: print("⚠️ 无法加载完播率模型") # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 验证行数一致性 if len(submission) != len(to_predict_users): print(f"⚠️ 行数不一致: 预测结果 {len(submission)} 行, 测试用户 {len(to_predict_users)} 行") # 处理缺失的DID missing_dids = set(to_predict_users['did']) - set(submission['did']) if missing_dids: print(f"添加缺失的 {len(missing_dids)} 个用户") default_vid = vid_info['vid'].iloc[0] if not vid_info.empty else 'default_vid' missing_df = pd.DataFrame({ 'did': list(missing_dids), 'vid': default_vid, 'predicted_completion_rate': submission['predicted_completion_rate'].mean() }) submission = pd.concat([submission, missing_df], ignore_index=True) # 保存结果 if not submission.empty: print(f"预测结果行数: {len(submission)} (应与测试用户行数一致)") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' # 保存为无表头CSV submission.to_csv(output_file, index=False, header=False) print(f"预测结果已保存至: {output_file}") print(f"结果格式: 共 {len(submission)} 行") print(f"列信息: [did, vid, predicted_completion_rate]") # 验证结果分布 print(f"完播率预测值分布: min={submission['predicted_completion_rate'].min():.4f}, " f"max={submission['predicted_completion_rate'].max():.4f}, " f"mean={submission['predicted_completion_rate'].mean():.4f}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果")

大家在看

recommend-type

.NET frxamework v2.0 64位

Microsoft .NET framework 2.0 64位可再发行组件包将安装 .NET framework 运行库,以及运行面向 .NET framework 2.0 版开发的 64 位应用程序所需的相关文件。
recommend-type

服务质量管理-NGBOSS能力架构

服务质量管理 二级能力名称 服务质量管理 二级能力编号 CMCM.5.4 概述 监测、分析和控制客户感知的服务表现 相关子能力描述 能够主动的将网络性能数据通告给前端客服人员; 能够根据按照客户价值来划分的客户群来制定特殊的SLA指标; 能够为最有价值的核心客户群进行网络优化; 对于常规的维护问题,QoS能够由网元设备自动完成,比如,对于网络故障的自恢复能力和优先客户的使用权; 能够把潜在的网络问题与客户进行主动的沟通; 能够分析所有的服务使用的质量指标; 能够根据关键的服务质量指标检测与实际的差距,提出改进建议; Service request 服务请求---请求管理。 客户的分析和报告:对关闭的请求、用户联系和相关的报告进行分析。 Marketing collateral的散发和marketing Collateral 的散发后的线索跟踪
recommend-type

AUTOSAR_MCAL_WDG.zip

This User Manual describes NXP Semiconductors AUTOSAR Watchdog ( Wdg ) for S32K14X . AUTOSAR Wdg driver configuration parameters and deviations from the specification are described in Wdg Driver chapter of this document. AUTOSAR Wdg driver requirements and APIs are described in the AUTOSAR Wdg driver software specification document.
recommend-type

MATLABSimulinkCommunicationSystemmaster_matlab_matlabsimulink_

MATLAB通信系统仿真历程,基于参考书《详解MATLAB/Simulink通信系统建模仿真》。都是里面的例子
recommend-type

multisim 实现四位二进制密码锁功能密码锁.rar

1、在锁的控制电路中储存一个可修改的四位二进制代码作为密码,当输入代码与锁的密码相等时,进入开锁状态使锁打开。开锁状态时绿灯亮。 2、从第一个按键触动后的5秒内未将锁打开,则电路进入自锁状态,使之无法再打开,并由扬声器发出持续10秒的报警信号。自锁状态时红灯亮。

最新推荐

recommend-type

【java毕业设计】喀什美食订餐网源码(ssm+mysql+说明文档+LW+PPT).zip

基于SSM框架的喀什美食订餐网的前台包含了用户注册、菜品列表、菜品排行、在线点餐和菜品收藏功能,下面是对这些功能的详细介绍: (1)用户注册功能:允许新用户创建自己的账户,并提供基本信息如用户名、密码等。注册完成后,用户可以登录到系统中进行后续操作。 (2)菜品列表功能:展示了所有可供选择的菜品,每个菜品都配有详细的描述、图片和价格等信息。用户可以通过搜索功能快速找到自己感兴趣的菜品,也可以按照分类或关键词进行筛选和排序。 (3)菜品排行功能:显示了当前最受欢迎或高评分的菜品,帮助用户挑选热门菜品,并参考其他用户的评价和推荐。 (4)在线点餐功能:允许用户在浏览菜品后直接下单,选择餐厅、菜品数量和其他相关选项。用户还可以添加特殊要求或备注,以满足个性化的需求。提交订单后,用户可以实时查看订单状态并付款。 完整前后端源码,部署后可正常运行! 环境说明 开发语言:Java后端 框架:ssm,mybatis JDK版本:JDK1.8+ 数据库:mysql 5.7+ 数据库工具:Navicat11+ 开发软件:eclipse/idea Maven包:Maven3.3+ 部署容器:tomcat7.5+
recommend-type

电力电子仿真技术解析:MMC、HVDC与微电网的建模与应用

电力电子仿真领域的关键技术,重点讨论了模块化多电平换流器(MMC)仿真、高压直流输电(HVDC)仿真以及微电网仿真。首先,通过MATLAB Simulink环境构建MMC仿真模型,研究其工作原理和性能特点,特别是如何通过级联子模块实现高电压、低谐波的效果。其次,探讨了柔性直流输电(VSC-HVDC)仿真,涉及SPWM、NLM、CPS-PWM等调制技术的应用,展示了这些技术如何提升系统的灵活性和稳定性。最后,针对微电网仿真,模拟了风电、光伏、储能等多种能源形式的协同工作,优化微电网的设计和运行。 适合人群:从事电力系统研究的技术人员、高校师生及相关领域的研究人员。 使用场景及目标:适用于电力系统设计、优化和故障诊断的研究项目,帮助理解和掌握电力电子仿真技术的基本原理和实际应用。 其他说明:文中提供了部分MATLAB伪代码示例,便于读者理解和实践。同时鼓励进一步咨询和探讨相关话题,以深化对电力电子仿真的认识。
recommend-type

基于群智能算法优化随机森林分类预测的MATLAB实现及性能对比

利用多种群智能算法(如粒子群优化、阿基米德优化、黏菌优化、麻雀优化和狼群优化)对随机森林(RF)进行参数优化的方法及其MATLAB代码实现。重点讨论了每种算法的工作原理、代码片段及其实验效果。实验结果显示,在UCI乳腺癌数据集上,不同算法优化后的RF模型表现各异,其中麻雀算法优化的RF模型达到了最高的准确率95.7%。 适合人群:对机器学习尤其是随机森林算法有一定了解的研究人员和技术爱好者,熟悉MATLAB编程环境。 使用场景及目标:适用于希望提高随机森林模型分类预测性能的研究项目或应用开发。主要目标是通过引入不同的群智能算法来优化随机森林的关键参数(如树的数量和最小叶子节点数),从而提升模型的整体性能。 其他说明:文中还提供了一些实用的小技巧,例如可以先使用收敛速度快的算法确定大致参数区间,然后采用精度更高的算法进行细调。此外,作者提到可以通过私信获取最新的混合优化方案。
recommend-type

基于 C# WinForm 技术的字体编辑器

在 Windows 桌面应用开发中,文本编辑工具是非常常见的需求,而字体样式的自定义更是提升用户体验的重要功能。本文将基于 C# WinForm 技术,解析两个字体编辑器版本(基础版与进阶版)的实现思路,带你了解如何从零开始构建一个支持字体自定义的文本编辑工具。 项目结构概览 整个项目包含两个主要版本: 字体编辑器(基础):实现基本的字体样式设置功能 字体编辑器(进阶):在基础版之上增加了右键菜单、更丰富的字体设置选项 两个版本均基于.NET Framework 4.7.2 开发,采用 WinForm 传统桌面应用架构,主要包含主窗体(Form1)和字体设置窗体(Form2)两个核心界面。
recommend-type

【办公自动化】Python自动生成Excel报表系统:数据处理、图表生成与邮件发送全流程详解

内容概要:本文介绍了如何利用Python自动化生成专业的Excel报表,涵盖数据提取、格式美化、图表自动生成等功能,帮助用户从重复性劳动中解放出来。文章详细描述了整个流程,包括环境准备与库安装、数据准备与读取、创建Excel报表框架、设计专业表格样式、添加专业图表、插入自动计算公式以及完整代码实现。此外,还提供了扩展功能,如邮件自动发送报表和定时自动生成报表。 适合人群:具备一定编程基础,尤其是对Python有一定了解,并希望提高办公效率的办公人员或数据分析师。 使用场景及目标:①需要定期生成销售报表或其他类型的数据报表;②希望通过自动化工具减少手动操作,提高工作效率;③希望掌握Python在办公自动化中的应用技巧,特别是结合pandas、openpyxl等库进行数据处理和Excel操作。 阅读建议:此资源不仅提供了完整的源码和详细注释,更重要的是引导读者理解每个步骤背后的逻辑和技术细节。因此,在学习过程中,建议读者跟随代码逐步实践,同时理解每一步骤的目的和实现方法。
recommend-type

Notes App API开发与使用指南

### API基础知识 #### 标题分析:“notes-app-api” 从标题“notes-app-api”可以推断,此API(Application Programming Interface,应用程序接口)是专为一个名为“notes-app”的应用程序设计的。这种API通常被用来允许不同的软件组件之间进行通信。在这个案例中,“notes-app”可能是一款笔记应用,该API提供了笔记数据的获取、更新、删除等操作的接口。 #### 描述分析:“API休息说明” 在提供的“API休息说明”中,我们可以看到几个重要的操作指令: 1. **指令“dev”:** `npm run dev` - 这是一个用于启动开发模式的命令。通常情况下,`npm run dev`会使用Node.js环境下的某种热重载功能,让开发者在开发过程中实时看到代码更改的效果。 - `npm`是Node.js的包管理器,用于安装项目所需的依赖、运行脚本等。 - `dev`是脚本命令的缩写,实际对应的是`package.json`文件中定义的某个开发环境下的脚本命令。 2. **指令“服务”:** `npm start` - 这是一个用于启动应用程序服务的命令。 - 同样利用Node.js的`npm`包管理器执行,其目的是部署应用程序,使其对外提供服务。 3. **指令“构建”:** `npm run build` - 这是用于构建项目的命令,通常会将源代码进行压缩、转译等操作,生成用于生产环境的代码。 - 例如,如果项目使用了TypeScript,构建过程可能包括将TypeScript代码编译成JavaScript,因为浏览器不能直接运行TypeScript代码。 #### 标签分析:“TypeScript” TypeScript是JavaScript的超集,提供了静态类型检查和ES6+的特性。使用TypeScript可以提高代码的可读性和可维护性,同时在编译阶段发现潜在的错误。 1. **TypeScript的特性:** - **静态类型检查:** 有助于在开发阶段捕捉类型错误,降低运行时错误的概率。 - **ES6+特性支持:** TypeScript支持最新的JavaScript语法和特性,可以使用装饰器、异步编程等现代JavaScript特性。 - **丰富的配置选项:** 开发者可以根据项目需求进行各种配置,如模块化系统、编译目标等。 2. **TypeScript的使用场景:** - 大型项目:在大型项目中,TypeScript有助于维护和扩展代码库。 - 多人协作:团队开发时,类型定义有助于减少沟通成本,提高代码一致性。 - 错误敏感应用:如金融、医疗等领域的应用,可以利用TypeScript的静态类型检查减少bug。 #### 文件分析:“压缩包子文件的文件名称列表: notes-app-api-develop” 这个文件列表中包含了“notes-app-api-develop”,它表明存在一个与开发相关的压缩包或存档文件。这个文件很可能包含了应用程序的源代码,通常还会包括`package.json`文件,这个文件定义了项目的依赖关系和可运行的脚本命令。在开发和部署过程中,开发者通常会根据`package.json`中定义的脚本来执行不同的任务,如`npm run dev`或`npm start`等。 ### Docker使用说明 在描述中还提到了使用Docker的命令: 1. **构建镜像:** `docker build -t notes-api .` - 这个命令用于构建一个名为`notes-api`的Docker镜像。 - `.`表示Dockerfile在当前目录。 - `-t`指定镜像的名称和标签。 2. **运行容器:** `docker run -d -it -p 3005:3005 notes-api` - 该命令用于从`notes-api`镜像启动一个容器,并在后台运行。 - `-d`表示后台运行容器。 - `-it`则是将容器的标准输入打开,并分配一个伪终端。 - `-p 3005:3005`将容器内部的3005端口映射到宿主机的3005端口,允许宿主机与容器内部的服务进行通信。 Docker作为容器化平台,它允许开发者打包应用及其依赖环境为一个轻量级、可移植的容器,这样无论在什么环境下,都能保证应用运行的一致性。这一点对于开发和部署API来说尤为重要,因为它确保了开发环境与生产环境的一致性,减少了所谓的“在我的机器上可以运行”的问题。 ### 总结 综上所述,“notes-app-api”是一个为笔记应用程序设计的API,开发者可以通过一系列npm脚本命令进行开发、服务和构建操作。此外,该API还涉及到使用Docker进行容器化部署的流程。最后,提及的TypeScript标签意味着该项目在前端开发上采用了TypeScript,以期获得更加健壮和可维护的代码库。了解这些知识点对开发、部署和维护现代Web应用至关重要。
recommend-type

【PMSM建模与测试最佳实践】:MATLAB电机仿真模型的权威指导

# 1. PMSM电机模型的基础知识 在现代工业自动化和电动车辆领域,永磁同步电机(PMSM
recommend-type

如何通过四元数避免万向节死锁?

<think>好的,用户想了解如何使用四元数来避免万向节锁问题,尤其是在3D旋转中保持姿态的稳定性。首先,我需要回忆万向节锁的基本概念和四元数如何解决这个问题。万向节锁发生在使用欧拉角时,当两个旋转轴对齐导致失去一个自由度。而四元数通过四维空间的旋转避免了这种顺序依赖。 接下来,我应该解释万向节锁的原因,比如三个轴依次旋转时,某个轴可能与其他轴对齐,导致无法正确旋转。然后对比四元数的优势,比如四元数的连续性和无奇异性。需要提到四元数的数学表示,如单位四元数和旋转插值方法(如球面线性插值),以及它们如何避免万向节锁。 还要考虑用户可能的实际应用场景,比如游戏开发或机器人学,是否需要示例代码?
recommend-type

Python实现Couchbase大规模数据复制技术

标题中提到的技术“couchbase-massive-replication”是一种针对Couchbase数据库的开源Python开发工具,专门用于高效地实现跨集群的大量存储桶和索引的复制。Couchbase是一个高性能、可扩展、容错的NoSQL文档数据库,它支持同步分布式复制(XDCR),能够实现跨地域的数据复制。 描述部分详细阐述了该技术的主要用途和优势。它解决了一个常见问题:在进行XDCR复制时,迁移大量存储桶可能会遇到需要手动检查并迁移缺失存储桶的繁琐步骤。Couchbase-massive-replication技术则允许用户在源和目标集群之间无需进行存储桶配置,简化了迁移过程。开发者可以通过简单的curl请求,向集群发送命令,从而实现大规模存储桶的自动化迁移。 此外,为了帮助用户更容易部署和使用该技术,项目提供了一个Dockerfile,允许用户通过Docker容器来运行程序。Docker是一种流行的容器化平台,可以将应用及其依赖打包到一个可移植的容器中,便于部署和扩展。用户只需执行几个Docker命令,即可快速启动一个名为“cbmigrator”的容器,版本为0.1。启动容器后,可以通过发送简单的POST请求来操作迁移任务。 项目中还提到了Docker Hub,这是一个公共的Docker镜像注册中心,用户可以在其中找到并拉取其他用户分享的镜像,其中就包括了“cbmigrator”镜像,即demir94/cbmigrator:0.1。这大大降低了部署和使用该技术的门槛。 根据标签“Python”,我们可以推断出该项目是使用Python开发的。Python是一种广泛使用的高级编程语言,以其简洁的语法和强大的库支持而闻名。该项目中Python的使用意味着用户可能需要具备一定的Python基础知识,以便对项目进行定制或故障排除。Python的动态类型系统和解释执行机制,使得开发过程中可以快速迭代和测试。 最后,从提供的压缩包子文件的文件名称列表“couchbase-massive-replication-main”来看,该项目的源代码文件夹可能遵循了通用的开源项目结构,其中“main”文件夹通常包含了项目的主要代码和入口文件。用户在获取项目后,可以在这个文件夹中找到相关的代码文件,包括配置文件、数据库模型、业务逻辑实现以及API接口等。 综合来看,这个项目涉及的技术点包括: - Couchbase数据库:一种文档数据库,广泛用于构建可扩展的应用程序。 - XDCR(Cross-Datacenter Replication):Couchbase提供的跨数据中心数据复制机制,实现数据的无缝迁移和灾难恢复。 - Python编程语言:用来开发该项目的高级编程语言,以其易读性和简洁的语法著称。 - Docker容器化技术:用于打包、分发和运行应用程序的平台,提供了一种便捷的部署方式。 - Docker Hub:一个存放和分享Docker镜像的平台,可以简化镜像的查找、下载和管理过程。 这个项目对于需要在多个Couchbase集群间迁移大量数据的开发者和运维人员来说是一个宝贵的资源,因为它大大简化了存储桶迁移的过程,并提高了操作的便利性和效率。
recommend-type

【MATLAB电机性能评估案例】:仿真环境下的深度研究

# 1. MATLAB在电机性能评估中的应用概述 电机作为现代工业中不可或缺的电力传动设备,其性能优劣直接影响整个系统的可靠性和效率。在众多的电机性能评估工具中,MATLAB凭借其强大的数值计算能力和丰富的工具箱资源,成为该领域研究和工程实践中的有力工具。本章将对MATLAB在电机性能评估中的应用进行概述,并介绍其在电机仿真、故障诊断和性能优化等方面的具体应用前景和价值。MA