活动介绍
file-type

单片机串口控制智能灯程序详解

版权申诉

ZIP文件

948B | 更新于2024-10-25 | 124 浏览量 | 0 下载量 举报 收藏
download 限时特惠:#14.90
在本节内容中,将详细探讨标题"KWS.zip_KWS"所指向的资源,它关联到一个特定程序,即"关于利用单片机与电脑之间通过串口的智能灯控制的程序"。同时,我们将重点解读文件名称列表中的"KWS.c"文件,这很可能是该程序的源代码文件。 ### 知识点一:单片机与电脑的通信 首先,我们应当理解单片机与电脑进行通信的基本原理和方法。在本例中,使用的是串口通信。串口通信是一种常见且简便的通信方式,能够实现在单片机与电脑之间传输数据。单片机通常配备有UART(通用异步收发传输器),它可以配置为串行通信模式,而电脑端则可以使用USB转串口、COM端口或其他硬件接口实现与单片机的串口通信。 ### 知识点二:智能灯控制 智能灯控制通常指的是通过电子手段对灯具的亮度、颜色、开关状态进行控制的过程。在本程序中,可能涉及到对智能灯亮度的调节、颜色的变化、定时开关等智能控制功能。这样的智能灯控制可以通过编写相应的软件程序来实现,该程序在单片机中运行,并通过串口与电脑通信以接收控制指令。 ### 知识点三:编程语言和开发环境 标题中的文件名"KWS.c"暗示了这是一个用C语言编写的源代码文件。C语言是一种广泛应用于嵌入式系统和硬件控制编程的语言。它提供了灵活的硬件操作能力,对于编写单片机程序尤其有用。开发者通常需要一个适合嵌入式开发的集成开发环境(IDE),例如Keil、IAR、MPLAB X等,这些IDE支持编写代码、编译、调试和烧录程序到单片机中。 ### 知识点四:单片机程序设计 在单片机程序设计中,通常需要对单片机的各种硬件资源进行配置和编程,包括但不限于串口、I/O端口、定时器、中断等。串口的配置涉及到波特率、数据位、停止位以及校验位的设置。智能灯控制程序可能需要编写中断服务程序来响应来自电脑的指令,同时编写主循环程序来实现对灯的实时控制。 ### 知识点五:串口通信协议设计 在实现智能灯控制程序时,还需要设计一套串口通信协议。协议需要规定数据的格式、指令集以及错误检测和校正机制。例如,一个简单的协议可能包含起始位、操作码(指示命令类型)、数据内容和结束位。电脑端软件需要按照相同的协议来构造命令,并通过串口发送到单片机,单片机再根据协议解析命令并执行相应的操作。 ### 知识点六:调试与测试 编写完程序后,一个重要的步骤是调试与测试。开发者可能需要使用串口调试助手来模拟电脑端发送的控制指令,并观察单片机的响应。调试时可能需要借助逻辑分析仪或示波器来监视串口信号,确保数据准确无误地传输和解析。在测试过程中,还应确保智能灯的控制效果符合预期,并在各种条件下均能稳定运行。 ### 知识点七:应用领域 智能灯控制程序广泛应用于智能家居、智能建筑、舞台灯光控制等领域。在智能家居系统中,用户可以通过电脑或手机等设备控制家中的灯光,实现如自动化场景设置、远程控制等功能。在商业应用中,例如在演播室、舞台等场景中,智能灯光控制可以增强视觉效果,提供丰富多变的灯光表现。 ### 总结 本节内容针对"KWS.zip_KWS"资源进行了深入解析,涵盖了单片机与电脑间串口通信、智能灯控制原理、C语言编程、单片机程序设计、串口通信协议设计、调试与测试以及智能灯的应用领域。在掌握以上知识点的基础上,开发者可以设计并实现功能丰富的智能灯光控制系统。

相关推荐

filetype

class KeyWordSpotter(torch.nn.Module): def __init__( self, ckpt_path, config_path, token_path, lexicon_path, threshold, min_frames=5, max_frames=250, interval_frames=50, score_beam=3, path_beam=20, gpu=-1, is_jit_model=False, ): super().__init__() os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu) with open(config_path, 'r') as fin: configs = yaml.load(fin, Loader=yaml.FullLoader) dataset_conf = configs['dataset_conf'] # feature related self.sample_rate = 16000 self.wave_remained = np.array([]) self.num_mel_bins = dataset_conf['feature_extraction_conf'][ 'num_mel_bins'] self.frame_length = dataset_conf['feature_extraction_conf'][ 'frame_length'] # in ms self.frame_shift = dataset_conf['feature_extraction_conf'][ 'frame_shift'] # in ms self.downsampling = dataset_conf.get('frame_skip', 1) self.resolution = self.frame_shift / 1000 # in second # fsmn splice operation self.context_expansion = dataset_conf.get('context_expansion', False) self.left_context = 0 self.right_context = 0 if self.context_expansion: self.left_context = dataset_conf['context_expansion_conf']['left'] self.right_context = dataset_conf['context_expansion_conf'][ 'right'] self.feature_remained = None self.feats_ctx_offset = 0 # after downsample, offset exist. # model related if is_jit_model: model = torch.jit.load(ckpt_path) # For script model, only cpu is supported. device = torch.device('cpu') else: # Init model from configs model = init_model(configs['model']) load_checkpoint(model, ckpt_path) use_cuda = gpu >= 0 and torch.cuda.is_available() device = torch.device('cuda' if use_cuda else 'cpu') self.device = device self.model = model.to(device) self.model.eval() logging.info(f'model {ckpt_path} loaded.') self.token_table = read_token(token_path) logging.info(f'tokens {token_path} with ' f'{len(self.token_table)} units loaded.') self.lexicon_table = read_lexicon(lexicon_path) logging.info(f'lexicons {lexicon_path} with ' f'{len(self.lexicon_table)} units loaded.') self.in_cache = torch.zeros(0, 0, 0, dtype=torch.float) # decoding and detection related self.score_beam = score_beam self.path_beam = path_beam self.threshold = threshold self.min_frames = min_frames self.max_frames = max_frames self.interval_frames = interval_frames self.cur_hyps = [(tuple(), (1.0, 0.0, []))] self.hit_score = 1.0 self.hit_keyword = None self.activated = False self.total_frames = 0 # frame offset, for absolute time self.last_active_pos = -1 # the last frame of being activated self.result = {} def set_keywords(self, keywords): # 4. parse keywords tokens assert keywords is not None, \ 'at least one keyword is needed, ' \ 'multiple keywords should be splitted with comma(,)' keywords_str = keywords keywords_list = keywords_str.strip().replace(' ', '').split(',') keywords_token = {} keywords_idxset = {0} keywords_strset = {'<blk>'} keywords_tokenmap = {'<blk>': 0} for keyword in keywords_list: strs, indexes = query_token_set(keyword, self.token_table, self.lexicon_table) keywords_token[keyword] = {} keywords_token[keyword]['token_id'] = indexes keywords_token[keyword]['token_str'] = ''.join('%s ' % str(i) for i in indexes) [keywords_strset.add(i) for i in strs] [keywords_idxset.add(i) for i in indexes] for txt, idx in zip(strs, indexes): if keywords_tokenmap.get(txt, None) is None: keywords_tokenmap[txt] = idx token_print = '' for txt, idx in keywords_tokenmap.items(): token_print += f'{txt}({idx}) ' logging.info(f'Token set is: {token_print}') self.keywords_idxset = keywords_idxset self.keywords_token = keywords_token def accept_wave(self, wave): assert isinstance(wave, bytes), \ "please make sure the input format is bytes(raw PCM)" # convert bytes into float32 data = [] for i in range(0, len(wave), 2): value = struct.unpack('<h', wave[i:i + 2])[0] data.append(value) # here we don't divide 32768.0, # because kaldi.fbank accept original input wave = np.array(data) wave = np.append(self.wave_remained, wave) if wave.size < (self.frame_length * self.sample_rate / 1000) \ * self.right_context : self.wave_remained = wave return None wave_tensor = torch.from_numpy(wave).float().to(self.device) wave_tensor = wave_tensor.unsqueeze(0) # add a channel dimension feats = kaldi.fbank(wave_tensor, num_mel_bins=self.num_mel_bins, frame_length=self.frame_length, frame_shift=self.frame_shift, dither=0, energy_floor=0.0, sample_frequency=self.sample_rate) # update wave remained feat_len = len(feats) frame_shift = int(self.frame_shift / 1000 * self.sample_rate) self.wave_remained = wave[feat_len * frame_shift:] if self.context_expansion: assert feat_len > self.right_context, \ "make sure each chunk feat length is large than right context." # pad feats with remained feature from last chunk if self.feature_remained is None: # first chunk # pad first frame at the beginning, # replicate just support last dimension, so we do transpose. feats_pad = F.pad(feats.T, (self.left_context, 0), mode='replicate').T else: feats_pad = torch.cat((self.feature_remained, feats)) ctx_frm = feats_pad.shape[0] - (self.right_context + self.right_context) ctx_win = (self.left_context + self.right_context + 1) ctx_dim = feats.shape[1] * ctx_win feats_ctx = torch.zeros(ctx_frm, ctx_dim, dtype=torch.float32) for i in range(ctx_frm): feats_ctx[i] = torch.cat(tuple( feats_pad[i:i + ctx_win])).unsqueeze(0) # update feature remained, and feats self.feature_remained = \ feats[-(self.left_context + self.right_context):] feats = feats_ctx.to(self.device) if self.downsampling > 1: last_remainder = 0 if self.feats_ctx_offset == 0 \ else self.downsampling - self.feats_ctx_offset remainder = (feats.size(0) + last_remainder) % self.downsampling feats = feats[self.feats_ctx_offset::self.downsampling, :] self.feats_ctx_offset = remainder \ if remainder == 0 else self.downsampling - remainder return feats def decode_keywords(self, t, probs): absolute_time = t + self.total_frames # search next_hyps depend on current probs and hyps. next_hyps = ctc_prefix_beam_search(absolute_time, probs, self.cur_hyps, self.keywords_idxset, self.score_beam) # update cur_hyps. note: the hyps is sort by path score(pnb+pb), # not the keywords' probabilities. cur_hyps = next_hyps[:self.path_beam] self.cur_hyps = cur_hyps def execute_detection(self, t): absolute_time = t + self.total_frames hit_keyword = None start = 0 end = 0 # hyps for detection hyps = [(y[0], y[1][0] + y[1][1], y[1][2]) for y in self.cur_hyps] # detect keywords in decoding paths. for one_hyp in hyps: prefix_ids = one_hyp[0] # path_score = one_hyp[1] prefix_nodes = one_hyp[2] assert len(prefix_ids) == len(prefix_nodes) for word in self.keywords_token.keys(): lab = self.keywords_token[word]['token_id'] offset = is_sublist(prefix_ids, lab) if offset != -1: hit_keyword = word start = prefix_nodes[offset]['frame'] end = prefix_nodes[offset + len(lab) - 1]['frame'] for idx in range(offset, offset + len(lab)): self.hit_score *= prefix_nodes[idx]['prob'] break if hit_keyword is not None: self.hit_score = math.sqrt(self.hit_score) break duration = end - start if hit_keyword is not None: if self.hit_score >= self.threshold and \ self.min_frames <= duration <= self.max_frames \ and (self.last_active_pos == -1 or end - self.last_active_pos >= self.interval_frames): self.activated = True self.last_active_pos = end logging.info( f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"duration {duration}, score {self.hit_score}, Activated.") elif self.last_active_pos > 0 and \ end - self.last_active_pos < self.interval_frames: logging.info( f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"but interval {end-self.last_active_pos} " f"is lower than {self.interval_frames}, Deactivated. ") elif self.hit_score < self.threshold: logging.info(f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"but {self.hit_score} " f"is lower than {self.threshold}, Deactivated. ") elif self.min_frames > duration or duration > self.max_frames: logging.info( f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"but {duration} beyond range" f"({self.min_frames}~{self.max_frames}), Deactivated. ") self.result = { "state": 1 if self.activated else 0, "keyword": hit_keyword if self.activated else None, "start": start * self.resolution if self.activated else None, "end": end * self.resolution if self.activated else None, "score": self.hit_score if self.activated else None } def forward(self, wave_chunk): feature = self.accept_wave(wave_chunk) if feature is None or feature.size(0) < 1: return {} # # the feature is not enough to get result. feature = feature.unsqueeze(0) # add a batch dimension logits, self.in_cache = self.model(feature, self.in_cache) probs = logits.softmax(2) # (batch_size, maxlen, vocab_size) probs = probs[0].cpu() # remove batch dimension for (t, prob) in enumerate(probs): t *= self.downsampling self.decode_keywords(t, prob) self.execute_detection(t) if self.activated: self.reset() # since a chunk include about 30 frames, # once activated, we can jump the latter frames. # TODO: there should give another method to update result, # avoiding self.result being cleared. break # update frame offset self.total_frames += len(probs) * self.downsampling # For streaming kws, the cur_hyps should be reset if the time of # a possible keyword last over the max_frames value you set. # see this issue:https://github.com/duj12/kws_demo/issues/2 if len(self.cur_hyps) > 0 and len(self.cur_hyps[0][0]) > 0: keyword_may_start = int(self.cur_hyps[0][1][2][0]['frame']) if (self.total_frames - keyword_may_start) > self.max_frames: self.reset() return self.result def reset(self): self.cur_hyps = [(tuple(), (1.0, 0.0, []))] self.activated = False self.hit_score = 1.0 def reset_all(self): self.reset() self.wave_remained = np.array([]) self.feature_remained = None self.feats_ctx_offset = 0 # after downsample, offset exist. self.in_cache = torch.zeros(0, 0, 0, dtype=torch.float) self.total_frames = 0 # frame offset, for absolute time self.last_active_pos = -1 # the last frame of being activated self.result = {}请帮我缕清整个脉络

filetype

--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) Cell In[20], line 4 1 # 3. 绘制箱线图或小提琴图 2 # 方法 1:使用 Scanpy 内置函数 3 # 小提琴图 ----> 4 sc.pl.violin( 5 adata[adipocytes_mask], 6 keys='PSMD5', 7 groupby='SAMPLE', 8 rotation=45, 9 title=f"PSMD5 Expression (p={p_value:.3e})" 10 ) 12 # 箱线图 13 sc.pl.boxplot( 14 adata[adipocytes_mask], 15 keys='PSMD5', 16 groupby='SAMPLE', 17 title=f"PSMD5 Expression (p={p_value:.3e})" 18 ) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/legacy_api_wrap/__init__.py:82, in legacy_api.<locals>.wrapper.<locals>.fn_compatible(*args_all, **kw) 79 @wraps(fn) 80 def fn_compatible(*args_all: P.args, **kw: P.kwargs) -> R: 81 if len(args_all) <= n_positional: ---> 82 return fn(*args_all, **kw) 84 args_pos: P.args 85 args_pos, args_rest = args_all[:n_positional], args_all[n_positional:] File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/scanpy/plotting/_anndata.py:967, in violin(***failed resolving arguments***) 965 axs = [ax] 966 for ax, y, ylab in zip(axs, ys, ylabel): --> 967 ax = sns.violinplot( 968 x=x, 969 y=y, 970 data=obs_tidy, 971 order=order, 972 orient="vertical", 973 density_norm=density_norm, 974 ax=ax, 975 **kwds, 976 ) 977 if stripplot: 978 ax = sns.stripplot( 979 x=x, 980 y=y, (...) 986 ax=ax, 987 ) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/seaborn/categorical.py:1770, in violinplot(data, x, y, hue, order, hue_order, orient, color, palette, saturation, fill, inner, split, width, dodge, gap, linewidth, linecolor, cut, gridsize, bw_method, bw_adjust, density_norm, common_norm, hue_norm, formatter, log_scale, native_scale, legend, scale, scale_hue, bw, inner_kws, ax, **kwargs) 1767 kde_kws = dict(cut=cut, gridsize=gridsize, bw_method=bw_method, bw_adjust=bw_adjust) 1768 inner_kws = {} if inner_kws is None else inner_kws.copy() -> 1770 p.plot_violins( 1771 width=width, 1772 dodge=dodge, 1773 gap=gap, 1774 split=split, 1775 color=color, 1776 fill=fill, 1777 linecolor=linecolor, 1778 linewidth=linewidth, 1779 inner=inner, 1780 density_norm=density_norm, 1781 common_norm=common_norm, 1782 kde_kws=kde_kws, 1783 inner_kws=inner_kws, 1784 plot_kws=kwargs, 1785 ) 1787 p._add_axis_labels(ax) 1788 p._adjust_cat_axis(ax, axis=p.orient) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/seaborn/categorical.py:1047, in _CategoricalPlotter.plot_violins(self, width, dodge, gap, split, color, fill, linecolor, linewidth, inner, density_norm, common_norm, kde_kws, inner_kws, plot_kws) 1045 # Plot the main violin body 1046 plot_func = {"x": ax.fill_betweenx, "y": ax.fill_between}[self.orient] -> 1047 plot_func( 1048 inv_val(data[value_var]), 1049 inv_pos(data[self.orient] - offsets[0]), 1050 inv_pos(data[self.orient] + offsets[1]), 1051 **violin["kwargs"] 1052 ) 1054 # Adjust the observation data 1055 obs = violin["observations"] File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/__init__.py:1521, in _preprocess_data.<locals>.inner(ax, data, *args, **kwargs) 1518 @functools.wraps(func) 1519 def inner(ax, *args, data=None, **kwargs): 1520 if data is None: -> 1521 return func( 1522 ax, 1523 *map(cbook.sanitize_sequence, args), 1524 **{k: cbook.sanitize_sequence(v) for k, v in kwargs.items()}) 1526 bound = new_sig.bind(ax, *args, **kwargs) 1527 auto_label = (bound.arguments.get(label_namer) 1528 or bound.kwargs.get(label_namer)) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/axes/_axes.py:5733, in Axes.fill_betweenx(self, y, x1, x2, where, step, interpolate, **kwargs) 5731 def fill_betweenx(self, y, x1, x2=0, where=None, 5732 step=None, interpolate=False, **kwargs): -> 5733 return self._fill_between_x_or_y( 5734 "y", y, x1, x2, 5735 where=where, interpolate=interpolate, step=step, **kwargs) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/axes/_axes.py:5704, in Axes._fill_between_x_or_y(self, ind_dir, ind, dep1, dep2, where, interpolate, step, **kwargs) 5699 kwargs["facecolor"] = self._get_patches_for_fill.get_next_color() 5701 ind, dep1, dep2 = self._fill_between_process_units( 5702 ind_dir, dep_dir, ind, dep1, dep2, **kwargs) -> 5704 collection = mcoll.FillBetweenPolyCollection( 5705 ind_dir, ind, dep1, dep2, 5706 where=where, interpolate=interpolate, step=step, **kwargs) 5708 self.add_collection(collection) 5709 self._request_autoscale_view() File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/collections.py:1332, in FillBetweenPolyCollection.__init__(self, t_direction, t, f1, f2, where, interpolate, step, **kwargs) 1330 self._step = step 1331 verts = self._make_verts(t, f1, f2, where) -> 1332 super().__init__(verts, **kwargs) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/collections.py:1201, in PolyCollection.__init__(self, verts, sizes, closed, **kwargs) 1181 def __init__(self, verts, sizes=None, *, closed=True, **kwargs): 1182 """ 1183 Parameters 1184 ---------- (...) 1199 Forwarded to `.Collection`. 1200 """ -> 1201 super().__init__(**kwargs) 1202 self.set_sizes(sizes) 1203 self.set_verts(verts, closed) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/collections.py:209, in Collection.__init__(self, edgecolors, facecolors, linewidths, linestyles, capstyle, joinstyle, antialiaseds, offsets, offset_transform, norm, cmap, colorizer, pickradius, hatch, urls, zorder, **kwargs) 206 self._offset_transform = offset_transform 208 self._path_effects = None --> 209 self._internal_update(kwargs) 210 self._paths = None File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/artist.py:1233, in Artist._internal_update(self, kwargs) 1226 def _internal_update(self, kwargs): 1227 """ 1228 Update artist properties without prenormalizing them, but generating 1229 errors as if calling `set`. 1230 1231 The lack of prenormalization is to maintain backcompatibility. 1232 """ -> 1233 return self._update_props( 1234 kwargs, "{cls.__name__}.set() got an unexpected keyword argument " 1235 "{prop_name!r}") File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/artist.py:1206, in Artist._update_props(self, props, errfmt) 1204 func = getattr(self, f"set_{k}", None) 1205 if not callable(func): -> 1206 raise AttributeError( 1207 errfmt.format(cls=type(self), prop_name=k), 1208 name=k) 1209 ret.append(func(v)) 1210 if ret: AttributeError: FillBetweenPolyCollection.set() got an unexpected keyword argument 'title'这个错误应该怎么解决

filetype

``` import pandas as pd import numpy as np import matplotlib.pyplot as plt plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False import seaborn as sns from sklearn.model_selection import train_test_split, GridSearchCV from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import SelectKBest, f_classif from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, classification_report, roc_curve, auc, confusion_matrix import statsmodels.api as sm from sklearn.metrics import precision_score, roc_curve plt.style.use('ggplot') # 读取数据 file_path = r'C:\Users\29930\Desktop\插补数据.csv' data = pd.read_csv(file_path,encoding='GBK') data['性别'] = data['性别'].map({'男': 0, '女': 1}) # 编码性别列 arr = data.to_numpy() # 转换为NumPy数组 # 查看前几行数据 print(data.head()) # 检查是否有缺失值 print(data.isnull().sum()) # 填充缺失值(如果有的话) data.ffill(inplace=True) # 分离特征和目标变量 X = data.drop(columns=['慢阻肺']) # 假设第一列为COPD标签 y = data['慢阻肺'].map({'否': 0, '是': 1}) # 关键修改 # 标准化数值型特征 scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # 将标准化后的数据转回DataFrame格式以便后续操作 X_scaled_df = pd.DataFrame(X_scaled, columns=X.columns) # 使用SelectKBest进行单变量选择 selector = SelectKBest(score_func=f_classif, k='all') # 先全部选出来查看得分情况 fit = selector.fit(X_scaled_df, y) # 打印每个特征的重要性评分 feature_scores = pd.DataFrame(list(zip(X_scaled_df.columns, fit.scores_)), columns=['Feature','Score']) feature_scores.sort_values(by='Score', ascending=False, inplace=True) print(feature_scores) # 绘制特征重要性图 plt.figure(figsize=(10, 6)) sns.barplot(x="Score", y="Feature", data=feature_scores) plt.title('特征重要性评分') plt.xlabel('ANOVA F值') plt.ylabel('特征名称') plt.show() # 选择最重要的几个特征 selected_features = feature_scores[feature_scores.Score >= feature_scores.Score.quantile(0.75)].Feature.tolist() # 选取前75%分位数以上的特征 X_selected = X_scaled_df[selected_features] # 划分训练集和测试集 X_train, X_test, y_train, y_test = train_test_split(X_selected, y, test_size=0.2, random_state=42) # 定义逻辑回归模型 logreg = LogisticRegression(solver='liblinear') # 超参数调优 - 这里我们只对正则化强度C做网格搜索 param_grid = {'C': [0.01, 0.1, 1, 10, 100]} grid_search = GridSearchCV(logreg, param_grid, cv=5, scoring='accuracy') grid_search.fit(X_train, y_train) # 输出最佳参数组合及对应的成绩 best_logreg = grid_search.best_estimator_ print("Best parameters:", grid_search.best_params_) print("Best CV Score:", grid_search.best_score_) # 在测试集上应用最优模型 y_pred = best_logreg.predict(X_test) # 计算性能指标 acc = accuracy_score(y_test, y_pred) report = classification_report(y_test, y_pred) conf_mat = confusion_matrix(y_test, y_pred) print(f"Accuracy: {acc:.4f}") print(report) # 绘制混淆矩阵热力图 plt.figure(figsize=(8, 6)) sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues') plt.title('分类结果混淆矩阵') plt.xlabel('预测类别') plt.ylabel('真实类别') plt.show() # ROC曲线 fpr, tpr, _ = roc_curve(y_test, best_logreg.decision_function(X_test)) roc_auc = auc(fpr, tpr) plt.figure(figsize=(8, 6)) plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC Curve (area = {roc_auc:.2f})') plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('ROC曲线(AUC = %0.2f)' % roc_auc) plt.legend(loc="lower right") plt.show() # 添加常数项 X_const = sm.add_constant(X_selected) # 构建OLS线性回归模型(用于展示线性关系) model = sm.Logit(y, X_const).fit() print(model.summary()) # 获取最终模型方程 coefs = list(best_logreg.coef_[0]) + [best_logreg.intercept_[0]] features_with_intercept = ['const'] + selected_features formula_parts = [] for coef, feat in zip(coefs, features_with_intercept): formula_parts.append(f"{coef:+.4f}*{feat}") final_formula = " + ".join(formula_parts) print("\nFinal Early Screening Formula:") print(final_formula.replace('+', ' + ').replace('-', ' - '))```将特征重要性图,混淆矩阵热力图,和ROC曲线画的更精美

filetype

import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import os from matplotlib.gridspec import GridSpec from matplotlib.ticker import PercentFormatter # -------------------------- 全局配置 -------------------------- plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"] # 中文字体 plt.rcParams["axes.unicode_minus"] = False # 正确显示负号 plt. rcParams['font.sans–serif']=['Kaiti'] sns.set_theme(style="whitegrid", palette="deep") # 可视化主题 output_dir = "F:/大数据/第6章/杜邦分析图表" # 输出路径 os.makedirs(output_dir, exist_ok=True) # 自动创建目录 # -------------------------- 日期处理函数 -------------------------- def extract_year(date_value): """从'截止日期'列提取年份(支持多种格式)""" date_str = str(date_value).strip() if "-" in date_str: return int(date_str.split("-")[0]) # 处理2023-12-31 if "年" in date_str: return int(date_str.split("年")[0]) # 处理2023年12月 if len(date_str)>=4 and date_str.isdigit(): return int(date_str[:4]) # 处理20231231 return np.nan # -------------------------- 数据读取与清洗 -------------------------- def load_financial_data(file_path): """加载财务报表并处理日期列(返回带年份列的清洗后DataFrame)""" df = pd.read_excel(file_path, sheet_name=0) # 强制读取第一个工作表 if not isinstance(df, pd.DataFrame): raise TypeError(f"文件{file_path}读取失败,返回类型为{type(df)}") # 查找"截止日期"列(关键列) date_cols = [col for col in df.columns if "截止日期" in col] if not date_cols: raise ValueError(f"文件{file_path}未找到'截止日期'列") # 提取年份并清洗数据 df["年份"] = df[date_cols[0]].apply(extract_year) df = df[df["年份"].notna()].astype({"年份": int}) # 过滤无效年份并转换为整数 return df.sort_values("年份").reset_index(drop=True) # 按年份排序 # -------------------------- 列名动态匹配 -------------------------- def find_column(df, keywords): """动态匹配DataFrame中的列名(支持模糊匹配)""" for col in df.columns: if any(keyword in col for keyword in keywords): return col raise ValueError(f"未找到包含以下关键词的列: {keywords}") # -------------------------- 杜邦分析核心逻辑 -------------------------- def dupont_analysis(balance_sheet_path, income_statement_path): # 加载并清洗数据 bs = load_financial_data(balance_sheet_path) # 资产负债表 is_ = load_financial_data(income_statement_path) # 利润表 # 匹配关键列(支持列名变体) bs_assets = find_column(bs, ["资产总计", "资产合计"]) # 资产总计列 bs_equity = find_column(bs, ["所有者权益", "股东权益", "净资产"]) # 所有者权益列 is_revenue = find_column(is_, ["营业收入", "营业总收入"]) # 营业收入列 is_net_profit = find_column(is_, ["净利润", "归属于母公司", "归母净利润"]) # 净利润列 # 按年份合并数据(仅保留共同年份) merged_df = bs[["年份", bs_assets, bs_equity]].merge( is_[["年份", is_revenue, is_net_profit]], on="年份", how="inner" ) merged_df.columns = ["年份", "资产总计", "所有者权益合计", "营业收入", "净利润"] # 统一列名 # 计算杜邦指标 merged_df["ROE"] = (merged_df["净利润"] / merged_df["所有者权益合计"]) * 100 # 净资产收益率 merged_df["净利润率"] = (merged_df["净利润"] / merged_df["营业收入"]) * 100 # 净利润率 merged_df["平均总资产"] = (merged_df["资产总计"] + merged_df["资产总计"].shift(1)) / 2 # 平均总资产 merged_df["总资产周转率"] = merged_df["营业收入"] / merged_df["平均总资产"].fillna(merged_df["资产总计"]) # 总资产周转率(处理首年) merged_df["权益乘数"] = merged_df["资产总计"] / merged_df["所有者权益合计"] # 权益乘数 merged_df["ROA"] = merged_df["净利润率"] * merged_df["总资产周转率"] # 总资产收益率 return merged_df.dropna() # 过滤缺失值 # -------------------------- 可视化输出(完整标题版) -------------------------- def visualize_duPont(df): # -------------------- 图表1:杜邦核心指标趋势图 -------------------- fig = plt.figure(figsize=(16, 10), constrained_layout=True) fig.suptitle("格力电器杜邦分析核心指标趋势(2018-2023)", fontsize=18, fontweight="bold", y=1.02) # 主标题 gs = GridSpec(2, 2, figure=fig, height_ratios=[2, 1]) # 子图布局(2行2列) # ROE趋势(主指标) ax1 = fig.add_subplot(gs[0, :]) sns.lineplot(data=df, x="年份", y="ROE", marker="o", linewidth=3, color="#E64B35", ax=ax1) ax1.set_title("核心指标:净资产收益率(ROE)", fontsize=16, pad=15) # 子标题 ax1.set_ylabel("ROE(%)", fontsize=14) ax1.yaxis.set_major_formatter(PercentFormatter()) ax1.grid(linestyle="--", alpha=0.6) # 添加数据标签 for x, y in df[["年份", "ROE"]].values: ax1.text(x, y+0.5, f"{y:.1f}%", ha="center", fontsize=12, color="#E64B35") # 净利润率趋势(柱状图) ax2 = fig.add_subplot(gs[1, 0]) sns.barplot(data=df, x="年份", y="净利润率", color="#4DBBD5", ax=ax2) ax2.set_title("盈利能力:净利润率", fontsize=15, pad=12) # 子标题 ax2.set_ylabel("净利润率(%)", fontsize=13) ax2.yaxis.set_major_formatter(PercentFormatter()) # 添加柱状图标签 for p in ax2.patches: height = p.get_height() ax2.text(p.get_x()+p.get_width()/2., height+0.2, f"{height:.1f}%", ha="center", fontsize=11) # 总资产周转率趋势(线图) ax3 = fig.add_subplot(gs[1, 1]) sns.lineplot(data=df, x="年份", y="总资产周转率", marker="s", linewidth=2.5, color="#3C5488", ax=ax3) ax3.set_title("运营效率:总资产周转率", fontsize=15, pad=12) # 子标题 ax3.set_ylabel("周转率(次)", fontsize=13) ax3.grid(linestyle="--", alpha=0.6) # 添加趋势阴影 ax3.fill_between(df["年份"], df["总资产周转率"], alpha=0.1, color="#3C5488") plt.savefig(f"{output_dir}/杜邦核心指标趋势图.png", dpi=300, bbox_inches="tight") # 保存时自动调整边界 # -------------------- 图表2:ROE分解瀑布图 -------------------- fig, ax = plt.subplots(figsize=(14, 8)) base_year = df["年份"].min() current_year = df["年份"].max() ax.set_title(f"{base_year}年-{current_year}年ROE驱动因素分解", fontsize=18, fontweight="bold", pad=20) # 主标题 # 计算各因素贡献值 base = df[df["年份"] == base_year].iloc[0] current = df[df["年份"] == current_year].iloc[0] factors = [ ("基年ROE", base["ROE"], "#3C5488"), ("净利润率贡献", base["ROE"]*(current["净利润率"]/base["净利润率"]-1), "#4DBBD5"), ("总资产周转率贡献", base["ROE"]*(current["净利润率"]/base["净利润率"])*(current["总资产周转率"]/base["总资产周转率"]-1), "#00A087"), ("权益乘数贡献", base["ROE"]*(current["净利润率"]/base["净利润率"])*(current["总资产周转率"]/base["总资产周转率"])*(current["权益乘数"]/base["权益乘数"]-1), "#F39B7F"), ("当前年ROE", current["ROE"], "#E64B35"), ] # 绘制瀑布图 y_pos = np.arange(len(factors)) bars = ax.barh(y_pos, [f[1] for f in factors], color=[f[2] for f in factors], edgecolor="black", height=0.6) ax.set_yticks(y_pos) ax.set_yticklabels([f[0] for f in factors], fontsize=12) ax.set_xlabel("ROE贡献值(%)", fontsize=14) ax.xaxis.set_major_formatter(PercentFormatter()) # 添加数值标签 for bar, (name, value, _) in zip(bars, factors): ax.text(bar.get_width()+0.5 if value>0 else bar.get_width()-1.5, bar.get_y()+bar.get_height()/2, f"{value:.1f}%", va="center", fontsize=11) plt.savefig(f"{output_dir}/ROE驱动因素分解图.png", dpi=300, bbox_inches="tight") # -------------------- 图表3:指标相关性热力图 -------------------- fig, ax = plt.subplots(figsize=(12, 9)) ax.set_title("杜邦分析核心指标相关性矩阵", fontsize=18, fontweight="bold", pad=25) # 主标题 corr = df[["ROE", "净利润率", "总资产周转率", "权益乘数", "ROA"]].corr().round(2) # 绘制热力图 sns.heatmap(corr, annot=True, cmap="coolwarm", vmin=-1, vmax=1, linewidths=0.5, annot_kws={"fontsize": 14}, square=True, cbar_kws={"shrink": 0.8, "label": "相关系数(r)"}) ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right", fontsize=12) ax.set_yticklabels(ax.get_yticklabels(), rotation=0, fontsize=12) plt.savefig(f"{output_dir}/指标相关性热力图.png", dpi=300, bbox_inches="tight") print(f"所有完整图表已保存至:{output_dir}") # -------------------------- 主程序 -------------------------- if __name__ == "__main__": try: # 替换为实际文件路径(确保Excel文件包含"截止日期"列) balance_sheet_path = "F:/大数据/第6章/格力资产负债表.xlsx" income_statement_path = "F:/大数据/第6章/格力利润表.xlsx" # 执行杜邦分析 analysis_df = dupont_analysis(balance_sheet_path, income_statement_path) # 保存分析结果(保留2位小数) analysis_df.round(2).to_excel(f"{output_dir}/格力杜邦分析数据.xlsx", index=False) print(f"分析数据已保存至:{output_dir}/格力杜邦分析数据.xlsx") # 生成完整图表 visualize_duPont(analysis_df) except Exception as e: print(f"程序运行错误: {str(e)}") 运行代码

filetype

variable_name = ["区域", "周长", "紧密度", "籽粒长度", "籽粒宽度", "不对称系数", "籽粒腹沟长度"] for key in variable_name: variable_results = grouped_results[key] # 创建折线图 plt.figure(figsize=(12, 6)) sns.lineplot( x='特征变量', y='准确率数值', data=variable_results, marker='o', sort=False ) # 图表美化 plt.title('支持向量机模型准确率对比') plt.xlabel('特征名称') plt.ylabel('准确率') plt.grid(alpha=0.3) plt.tight_layout() # 标记最大值点 max_acc = variable_results['准确率数值'].max() max_feature = variable_results.loc[variable_results['准确率数值'].idxmax(), '特征变量'] plt.annotate( f'峰值: {max_acc:.2f}', xy=(max_feature, max_acc), xytext=(15, -30), textcoords='offset points', arrowprops=dict(arrowstyle='->') ) plt.show() --------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-212-eefc89d78ba1> in <module> 10 data=variable_results, 11 marker='o', ---> 12 sort=False 13 ) 14 /usr/local/lib/python3.6/site-packages/seaborn/_decorators.py in inner_f(*args, **kwargs) 44 ) 45 kwargs.update({k: arg for k, arg in zip(sig.parameters, args)}) ---> 46 return f(**kwargs) 47 return inner_f 48 /usr/local/lib/python3.6/site-packages/seaborn/relational.py in lineplot(x, y, hue, size, style, data, palette, hue_order, hue_norm, sizes, size_order, size_norm, dashes, markers, style_order, units, estimator, ci, n_boot, seed, sort, err_style, err_kws, legend, ax, **kwargs) 686 data=data, variables=variables, 687 estimator=estimator, ci=ci, n_boot=n_boot, seed=seed, --> 688 sort=sort, err_style=err_style, err_kws=err_kws, legend=legend, 689 ) 690 /usr/local/lib/python3.6/site-packages/seaborn/relational.py in __init__(self, data, variables, estimator, ci, n_boot, seed, sort, err_style, err_kws, legend) 365 ) 366 --> 367 super().__init__(data=data, variables=variables) 368 369 self.estimator = estimator /usr/local/lib/python3.6/site-packages/seaborn/_core.py in __init__(self, data, variables) 602 def __init__(self, data=None, variables={}): 603 --> 604 self.assign_variables(data, variables) 605 606 for var, cls in self._semantic_mappings.items(): /usr/local/lib/python3.6/site-packages/seaborn/_core.py in assign_variables(self, data, variables) 666 self.input_format = "long" 667 plot_data, variables = self._assign_variables_longform( --> 668 data, **variables, 669 ) 670 /usr/local/lib/python3.6/site-packages/seaborn/_core.py in _assign_variables_longform(self, data, **kwargs) 900 901 err = f"Could not interpret value `{val}` for parameter `{key}`" --> 902 raise ValueError(err) 903 904 else: ValueError: Could not interpret value `准确率数值` for parameter `y`

filetype

### 三、BERT模型注意力机制分析 print("\n#### BERT模型注意力模式分析") # 安全处理token ID的函数(使用纯TensorFlow操作) def safe_token_ids(input_ids, vocab_size): """确保所有token ID在有效范围内(纯TensorFlow实现)""" return tf.clip_by_value(input_ids, 0, vocab_size - 1) # 获取注意力权重的函数(带安全检查) def get_attention_weights(model, input_text, tokenizer, layer_idx=-1): """获取BERT模型指定层的注意力权重""" # 编码文本 inputs = tokenizer( input_text, return_tensors="tf", max_length=100, padding="max_length", truncation=True ) # 获取vocab_size用于安全检查 vocab_size = tokenizer.vocab_size # 安全处理input_ids(使用纯TensorFlow操作) if 'input_ids' in inputs: inputs['input_ids'] = tf.map_fn( lambda x: safe_token_ids(x, vocab_size), inputs['input_ids'], dtype=tf.int32 ) # 运行模型获取注意力权重 outputs = model(**inputs, output_attentions=True) attentions = outputs.attentions[layer_idx] # 取最后一层注意力 attentions = tf.squeeze(attentions, axis=0) # 移除批次维度 attentions = tf.reduce_mean(attentions, axis=0) # 平均所有头的注意力 # 获取输入tokens input_ids = inputs["input_ids"][0].numpy() tokens = tokenizer.convert_ids_to_tokens(input_ids) return tokens, attentions.numpy() # 随机选取测试集中的样本进行分析 sample_idx = np.random.randint(0, len(X_test)) sample_text = X_test.iloc[sample_idx] true_label = "抑郁" if y_test.iloc[sample_idx] == 1 else "正常" # 获取预测结果 predictions = bert_model.predict(test_ds.take(1)) logits = predictions.logits pred_prob = tf.nn.sigmoid(logits)[0, 0].numpy() pred_label = "抑郁" if pred_prob > 0.5 else "正常" print(f"\n分析样本: {'抑郁' if true_label == '抑郁' else '正常'}类文本") print(f"文本内容: {sample_text[:100]}...") print(f"真实标签: {true_label}, 预测标签: {pred_label}, 概率: {pred_prob:.4f}") # 获取注意力权重(带安全检查) tokens, attentions = get_attention_weights( bert_model, sample_text, tokenizer, layer_idx=-1 # 最后一层注意力 ) # 过滤掉特殊tokens valid_tokens = [] valid_attentions = [] for token, attn in zip(tokens, attentions): if token not in ['[PAD]', '[UNK]', '[CLS]', '[SEP]', '[MASK]']: valid_tokens.append(token) valid_attentions.append(attn) # 确保有足够的有效tokens用于绘图 if len(valid_tokens) > 1: # 绘制注意力热力图(修正为二维输入) plt.figure(figsize=(15, 3)) sns.heatmap([valid_attentions], yticklabels=['注意力权重'], xticklabels=valid_tokens, cmap='YlOrRd', cbar_kws={'label': '注意力强度'}, annot=False) # 避免文本过多 plt.title('BERT模型注意力权重分布') plt.xticks(rotation=90, fontsize=8) # 旋转x轴标签并减小字体 plt.tight_layout() plt.show() # 绘制注意力条形图(作为备选可视化) plt.figure(figsize=(15, 5)) top_n = min(30, len(valid_tokens)) # 只显示前30个token top_indices = np.argsort(valid_attentions)[-top_n:] top_tokens = [valid_tokens[i] for i in top_indices] top_attentions = [valid_attentions[i] for i in top_indices] sns.barplot(x=top_tokens, y=top_attentions, palette='YlOrRd') plt.title('BERT模型注意力权重最高的词汇') plt.xlabel('词汇') plt.ylabel('注意力权重') plt.xticks(rotation=45, ha='right') plt.tight_layout() plt.show() else: print("有效tokens数量不足,无法绘制注意力热力图") 问题为ValueError Traceback (most recent call last) Cell In[32], line 174 171 if len(valid_tokens) > 1: 172 # 绘制注意力热力图(修正为二维输入) 173 plt.figure(figsize=(15, 3)) --> 174 sns.heatmap([valid_attentions], 175 yticklabels=['注意力权重'], 176 xticklabels=valid_tokens, 177 cmap='YlOrRd', 178 cbar_kws={'label': '注意力强度'}, 179 annot=False) # 避免文本过多 180 plt.title('BERT模型注意力权重分布') 181 plt.xticks(rotation=90, fontsize=8) # 旋转x轴标签并减小字体 File D:\Anaconda\envs\pytorch1\lib\site-packages\seaborn\matrix.py:446, in heatmap(data, vmin, vmax, cmap, center, robust, annot, fmt, annot_kws, linewidths, linecolor, cbar, cbar_kws, cbar_ax, square, xticklabels, yticklabels, mask, ax, **kwargs) 365 """Plot rectangular data as a color-encoded matrix. 366 367 This is an Axes-level function and will draw the heatmap into the (...) 443 444 """ 445 # Initialize the plotter object --> 446 plotter = _HeatMapper(data, vmin, vmax, cmap, center, robust, annot, fmt, 447 annot_kws, cbar, cbar_kws, xticklabels, 448 yticklabels, mask) 450 # Add the pcolormesh kwargs here 451 kwargs["linewidths"] = linewidths File D:\Anaconda\envs\pytorch1\lib\site-packages\seaborn\matrix.py:110, in _HeatMapper.__init__(self, data, vmin, vmax, cmap, center, robust, annot, fmt, annot_kws, cbar, cbar_kws, xticklabels, yticklabels, mask) 108 else: 109 plot_data = np.asarray(data) --> 110 data = pd.DataFrame(plot_data) 112 # Validate the mask and convert to DataFrame 113 mask = _matrix_mask(data, mask) File D:\Anaconda\envs\pytorch1\lib\site-packages\pandas\core\frame.py:672, in DataFrame.__init__(self, data, index, columns, dtype, copy) 662 mgr = dict_to_mgr( 663 # error: Item "ndarray" of "Union[ndarray, Series, Index]" has no 664 # attribute "name" (...) 669 typ=manager, 670 ) 671 else: --> 672 mgr = ndarray_to_mgr( 673 data, 674 index, 675 columns, 676 dtype=dtype, 677 copy=copy, 678 typ=manager, 679 ) 681 # For data is list-like, or Iterable (will consume into list) 682 elif is_list_like(data): File D:\Anaconda\envs\pytorch1\lib\site-packages\pandas\core\internals\construction.py:304, in ndarray_to_mgr(values, index, columns, dtype, copy, typ) 299 values = values.reshape(-1, 1) 301 else: 302 # by definition an array here 303 # the dtypes will be coerced to a single dtype --> 304 values = _prep_ndarray(values, copy=copy) 306 if dtype is not None and not is_dtype_equal(values.dtype, dtype): 307 shape = values.shape File D:\Anaconda\envs\pytorch1\lib\site-packages\pandas\core\internals\construction.py:555, in _prep_ndarray(values, copy) 553 values = values.reshape((values.shape[0], 1)) 554 elif values.ndim != 2: --> 555 raise ValueError(f"Must pass 2-d input. shape={values.shape}") 557 return values ValueError: Must pass 2-d input. shape=(1, 98, 100)
​ 怎么解决,标出错误的地方

filetype

import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns # 读取数据 df = pd.read_csv('pre_data1-9(1).csv') # 将数据采集时间转换为日期时间类型 df['数据采集时间'] = pd.to_datetime(df['数据采集时间']) # 提取月份信息 df['月份'] = df['数据采集时间'].dt.month # **🚀 统计不同报警等级的次数** alarm_grade_counts = df['最高报警等级'].value_counts().sort_index().reset_index() alarm_grade_counts.columns = ['报警等级', '次数'] # **🚀 按月统计各报警等级的次数** alarm_grade_monthly = df.groupby(['月份', '最高报警等级']).size().unstack().fillna(0) alarm_grade_monthly.columns = [f"{int(col)}级" for col in alarm_grade_monthly.columns] # **🚀 重新定义 1 小时为时间区间** df['时间区间'] = df['数据采集时间'].dt.hour.astype(str) + '-' + (df['数据采集时间'].dt.hour + 1).astype(str) # **🚀 统计各时间段的报警等级次数** alarm_grade_time = df.groupby(['时间区间', '最高报警等级']).size().unstack().fillna(0) # **🚀 统计不同时间段的用车次数** usage_by_time = df.groupby('时间区间').size() # **📊 绘制报警等级次数柱状图** plt.figure(figsize=(10, 6)) sns.barplot(x='报警等级', y='次数', data=alarm_grade_counts, palette="Blues") plt.title('故障报警等级次数柱状图') plt.xlabel('报警等级') plt.ylabel('次数') plt.grid(axis='y', linestyle="--", alpha=0.7) plt.tight_layout() plt.savefig(r"C:\Users\wei\Pictures\故障报警等级次数柱状图.png", dpi=300) plt.show() # **📊 绘制各报警等级的月度趋势** plt.figure(figsize=(10, 6)) for grade in alarm_grade_monthly.columns: plt.plot(alarm_grade_monthly.index, alarm_grade_monthly[grade], marker='o', label=grade) plt.title('各报警等级次数趋势图') plt.xlabel('月份') plt.ylabel('次数') plt.grid(True, linestyle="--", alpha=0.7) plt.legend(title="报警等级") plt.tight_layout() plt.savefig(r"C:\Users\wei\Pictures\各报警等级次数趋势图.png", dpi=300) plt.show() # **📊 绘制报警等级时间分布热力图(1 小时间隔)** plt.figure(figsize=(12, 8)) sns.heatmap(alarm_grade_time, annot=True, cmap='Blues', fmt=".0f") plt.title('报警等级时间分布热力图(1 小时间隔)') plt.xlabel('报警等级') plt.ylabel('时间区间') plt.xticks(rotation=45) plt.tight_layout() plt.savefig(r"C:\Users\wei\Pictures\报警等级时间分布热力图_1小时.png", dpi=300) plt.show() # **📊 绘制不同时间段的用车次数(1 小时间隔)** plt.figure(figsize=(12, 6)) sns.barplot(x=usage_by_time.index, y=usage_by_time.values, palette="coolwarm") for x, y in zip(usage_by_time.index, usage_by_time.values): plt.annotate(f'{y}', (x, y), textcoords='offset points', xytext=(0, 5), ha='center') plt.title("不同时间段的用车次数(1 小时间隔)") plt.xlabel("时间区间") plt.ylabel("用车次数") plt.xticks(rotation=45) plt.grid(axis='y', linestyle="--", alpha=0.7) plt.tight_layout() plt.savefig(r"C:\Users\wei\Pictures\不同时间段的用车次数_1小时.png", dpi=300) plt.show() # **📌 输出统计数据** print('📌 各报警等级的总次数:\n', alarm_grade_counts) print('📌 各报警等级的月度趋势:\n', alarm_grade_monthly) print('📌 各时间段的报警次数(1 小时间隔):\n', alarm_grade_time) print('📌 不同时间段的用车次数(1 小时间隔):\n', usage_by_time) 优化代码

filetype

--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[63], line 11 9 plt.subplots(nrows=4, ncols=2) 10 sc.pl.umap(adata, color=["leiden_0.2","leiden_0.5", "leiden_0.8","leiden_1.0", "leiden_1.2","leiden_1.5","leiden_1.8","leiden_2.0"], show=False) ---> 11 plt.savefig("umap_leiden.pdf", ncol=2, bbox_inches='tight') 12 plt.show() File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/pyplot.py:1251, in savefig(*args, **kwargs) 1248 fig = gcf() 1249 # savefig default implementation has no return, so mypy is unhappy 1250 # presumably this is here because subclasses can return? -> 1251 res = fig.savefig(*args, **kwargs) # type: ignore[func-returns-value] 1252 fig.canvas.draw_idle() # Need this if 'transparent=True', to reset colors. 1253 return res File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/figure.py:3490, in Figure.savefig(self, fname, transparent, **kwargs) 3488 for ax in self.axes: 3489 _recursively_make_axes_transparent(stack, ax) -> 3490 self.canvas.print_figure(fname, **kwargs) File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/backend_bases.py:2184, in FigureCanvasBase.print_figure(self, filename, dpi, facecolor, edgecolor, orientation, format, bbox_inches, pad_inches, bbox_extra_artists, backend, **kwargs) 2180 try: 2181 # _get_renderer may change the figure dpi (as vector formats 2182 # force the figure dpi to 72), so we need to set it again here. 2183 with cbook._setattr_cm(self.figure, dpi=dpi): -> 2184 result = print_method( 2185 filename, 2186 facecolor=facecolor, 2187 edgecolor=edgecolor, 2188 orientation=orientation, 2189 bbox_inches_restore=_bbox_inches_restore, 2190 **kwargs) 2191 finally: 2192 if bbox_inches and restore_bbox: File /disk201/lihx/anaconda3/envs/lihxbase/lib/python3.13/site-packages/matplotlib/backend_bases.py:2040, in FigureCanvasBase._switch_canvas_and_return_print_method.<locals>.<lambda>(*args, **kwargs) 2036 optional_kws = { # Passed by print_figure for other renderers. 2037 "dpi", "facecolor", "edgecolor", "orientation", 2038 "bbox_inches_restore"} 2039 skip = optional_kws - {*inspect.signature(meth).parameters} -> 2040 print_method = functools.wraps(meth)(lambda *args, **kwargs: meth( 2041 *args, **{k: v for k, v in kwargs.items() if k not in skip})) 2042 else: # Let third-parties do as they see fit. 2043 print_method = meth TypeError: FigureCanvasPdf.print_pdf() got an unexpected keyword argument 'ncol'

filetype

修改润色代码,让代码更完整,更符合科研论文标准:import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import statsmodels.api as sm from statsmodels.tsa.stattools import adfuller from statsmodels.graphics.tsaplots import plot_acf, plot_pacf from statsmodels.tsa.arima.model import ARIMA from scipy.stats import shapiro from statsmodels.stats.diagnostic import acorr_ljungbox import seaborn as sns import warnings import itertools from sklearn.metrics import mean_absolute_error, mean_squared_error from dateutil.relativedelta import relativedelta # 忽略特定类型的警告 warnings.filterwarnings('ignore', category=FutureWarning) # 设置中文字体和负号显示 plt.rcParams['font.family'] = 'SimHei' plt.rcParams['axes.unicode_minus'] = False # ====================== # 数据处理 # ====================== data_folder = 'E:\\桌面\\大论文\\数据' data_path = os.path.join(data_folder, '预测数据3.0.xlsx') # 构造完整的文件路径 try: # 读取 Excel 文件 df = pd.read_excel(data_path) except FileNotFoundError: print(f"文件 {data_path} 未找到,请检查文件路径。") exit(1) # 提取需要的列 ts = df[['年份', '发病率']] print("原始数据预览:") print(ts.head()) # 数据划分 n_sample = ts.shape[0] n_train = int(0.7 * n_sample) # 70% 训练数据 n_test = n_sample - n_train # 30% 测试数据 ts_train = ts.iloc[:n_train].set_index('年份')['发病率'] # 训练集 ts_test = ts.iloc[n_train:].set_index('年份')['发病率'] # 测试集 # 绘制原始时间序列图 plt.figure(figsize=(12, 6)) plt.plot(ts['年份'], ts['发病率'], 'o-', color='blue', linewidth=2, markersize=6) plt.title('2005-2023年乙型肝炎发病率趋势', fontsize=14) plt.xlabel('年份', fontsize=12) plt.ylabel('发病率(/10万)', fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.show() # ====================== # 数据平稳化处理 # ====================== # 一阶差分 ts_diff = ts['发病率'].diff().dropna() ts_train_diff = ts_diff.iloc[:n_train - 1] # 训练集差分 # 绘制差分后序列 plt.figure(figsize=(12, 6)) plt.plot(ts['年份'][1:], ts_diff, 'o-', color='green', linewidth=2, markersize=6) plt.title('发病率一阶差分序列', fontsize=14) plt.xlabel('年份', fontsize=12) plt.ylabel('发病率差分值', fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.show() # ====================== # 平稳性检验 # ====================== def adf_test(series, title=''): """执行ADF检验并输出结果""" result = adfuller(series.dropna()) print(f'\n{title}ADF检验结果:') print(f'ADF统计量: {result[0]:.4f}') print(f'p值: {result[1]:.4f}') print('临界值:') for key, value in result[4].items(): print(f'\t{key}: {value:.4f}') print('是否平稳: ' + ('是' if result[1] <= 0.05 else '否')) adf_test(ts['发病率'], '原始数据') adf_test(ts_diff, '一阶差分后数据') # ====================== # 白噪声检验 # ====================== def white_noise_test(series, lags=6): """执行白噪声检验""" lb_test = acorr_ljungbox(series.dropna(), lags=lags) p_values = lb_test['lb_pvalue'] is_white_noise = all(p_values > 0.05) print(f'\n白噪声检验结果 (滞后阶数 {lags}):') print('p值:', p_values.values) print('是否白噪声:', is_white_noise) white_noise_test(ts['发病率']) white_noise_test(ts_diff) # ====================== # 自相关和偏自相关分析 # ====================== fig, ax = plt.subplots(2, 1, figsize=(12, 10)) plot_acf(ts_diff, lags=20, ax=ax[0]) ax[0].set_title('自相关函数(ACF)', fontsize=14) plot_pacf(ts_diff, lags=20, ax=ax[1], method='ywm') ax[1].set_title('偏自相关函数(PACF)', fontsize=14) plt.tight_layout() plt.show() # ====================== # ARIMA参数优化 (固定d=0) # ====================== print("\n开始ARIMA参数网格搜索...") pq_range = range(0, 5) # 减少搜索范围提高效率 results_bic = pd.DataFrame(columns=pq_range, index=pq_range) for p, q in itertools.product(pq_range, pq_range): if p == 0 and q == 0: results_bic.loc[p, q] = np.nan continue try: model = ARIMA(ts_train_diff, order=(p, 0, q)) res = model.fit() results_bic.loc[p, q] = res.bic except Exception as e: results_bic.loc[p, q] = np.nan # 找到最优参数 results_bic = results_bic.astype(float) min_bic = results_bic.min().min() best_p, best_q = results_bic.stack().idxmin() print(f'\n最优参数: p={best_p}, q={best_q}, BIC={min_bic:.2f}') # 绘制BIC热力图 plt.figure(figsize=(10, 8)) sns.heatmap(results_bic, annot=True, fmt=".1f", cmap="coolwarm", cbar_kws={'label': 'BIC值'}, mask=results_bic.isnull()) plt.title('ARIMA模型BIC值热力图', fontsize=14) plt.xlabel('q值', fontsize=12) plt.ylabel('p值', fontsize=12) plt.tight_layout() plt.show() # ====================== # 模型训练 # ====================== print(f"\n训练ARIMA({best_p},1,{best_q})模型...") model = ARIMA(ts_train, order=(best_p, 1, best_q)) # 使用原始数据,d=1 result = model.fit() print(result.summary()) # ====================== # 模型诊断 # ====================== residuals = result.resid # 残差正态性检验 plt.figure(figsize=(12, 10)) plt.subplot(2, 2, 1) residuals.plot(ax=plt.gca()) plt.title('残差序列', fontsize=12) plt.grid(True, alpha=0.3) plt.subplot(2, 2, 2) sm.qqplot(residuals, line='45', fit=True, ax=plt.gca()) plt.title('残差QQ图', fontsize=12) plt.subplot(2, 2, 3) sns.histplot(residuals, kde=True, ax=plt.gca()) plt.title('残差分布', fontsize=12) plt.subplot(2, 2, 4) plot_acf(residuals, lags=20, ax=plt.gca()) plt.title('残差ACF', fontsize=12) plt.suptitle('模型残差诊断', fontsize=16) plt.tight_layout() plt.show() # 残差白噪声检验 white_noise_test(residuals) # ====================== # 模型预测 # ====================== # 测试集预测 forecast = result.get_forecast(steps=n_test) forecast_mean = forecast.predicted_mean forecast_ci = forecast.conf_int() # 未来预测 (2024年) future_years = 1 future_forecast = result.get_forecast(steps=n_test + future_years) future_mean = future_forecast.predicted_mean[-future_years:] future_ci = future_forecast.conf_int()[-future_years:] # 生成未来年份 last_year = ts['年份'].iloc[-1] future_years_list = [last_year + i for i in range(1, future_years + 1)] # 合并结果 all_years = np.concatenate([ts['年份'], future_years_list]) all_actual = np.concatenate([ts['发病率'], [np.nan] * future_years]) all_forecast = np.concatenate([ np.full(n_train, np.nan), forecast_mean.iloc[:n_test], future_mean ]) # ====================== # 模型评估 # ====================== test_actual = ts_test.values test_pred = forecast_mean.iloc[:n_test].values mae = mean_absolute_error(test_actual, test_pred) rmse = np.sqrt(mean_squared_error(test_actual, test_pred)) mape = np.mean(np.abs((test_actual - test_pred) / test_actual)) * 100 print("\n模型测试集性能:") print(f"MAE: {mae:.4f}") print(f"RMSE: {rmse:.4f}") print(f"MAPE: {mape:.2f}%") # ====================== # 可视化结果 # ====================== plt.figure(figsize=(16, 8)) # 历史数据 plt.plot(ts['年份'], ts['发病率'], 'o-', label='历史实际值', color='#1f77b4', linewidth=2, markersize=8) # 测试集预测 plt.plot(ts_test.index, test_pred, 's--', label='测试集预测值', color='#ff7f0e', linewidth=2, markersize=8) # 未来预测 plt.plot(future_years_list, future_mean, 'D-', label='2024年预测值', color='#d62728', linewidth=2, markersize=8) # 置信区间 plt.fill_between( np.concatenate([ts_test.index, future_years_list]), np.concatenate([forecast_ci.iloc[:, 0], future_ci.iloc[:, 0]]), np.concatenate([forecast_ci.iloc[:, 1], future_ci.iloc[:, 1]]), color='gray', alpha=0.2, label='95%置信区间' ) # 分界线 plt.axvline(x=ts_test.index[0], color='gray', linestyle='--', label='预测起点') plt.axvline(x=last_year, color='purple', linestyle=':', label='当前年份') plt.title('乙型肝炎发病率ARIMA模型预测', fontsize=16) plt.xlabel('年份', fontsize=12) plt.ylabel('发病率(/10万)', fontsize=12) plt.legend(loc='best') plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.show() # ====================== # 输出预测结果 # ====================== print("\n2024年预测结果:") future_df = pd.DataFrame({ '年份': future_years_list, '预测发病率': future_mean.values, '下限': future_ci.iloc[:, 0].values, '上限': future_ci.iloc[:, 1].values }) print(future_df) # ====================== # 拟合值分析 # ====================== fitted_values = result.fittedvalues plt.figure(figsize=(14, 7)) plt.plot(ts['年份'], ts['发病率'], 'o-', label='实际值', color='#1f77b4') plt.plot(fitted_values.index, fitted_values, 's--', label='拟合值', color='#ff7f0e') residuals = ts['发病率'] - fitted_values.reindex(ts['年份']).values for year, actual, fitted in zip(ts['年份'], ts['发病率'], fitted_values): plt.plot([year, year], [actual, fitted], 'r--', alpha=0.3) plt.title('模型拟合效果 (2005-2023年)', fontsize=16) plt.xlabel('年份', fontsize=12) plt.ylabel('发病率(/10万)', fontsize=12) plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() # 拟合残差分析 plt.figure(figsize=(14, 6)) plt.bar(ts['年份'], residuals, color='#2ca02c', alpha=0.7) plt.axhline(0, color='r', linestyle='--') plt.title('模型拟合残差', fontsize=16) plt.xlabel('年份', fontsize=12) plt.ylabel('残差值', fontsize=12) plt.grid(True, alpha=0.3) plt.tight_layout() plt.show()

filetype

``` from bertopic import BERTopic import numpy as np import pandas as pd from umap import UMAP from hdbscan import HDBSCAN from sklearn.feature_extraction.text import CountVectorizer from bertopic.vectorizers import ClassTfidfTransformer import re import nltk from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS from nltk.tokenize import word_tokenize from wordcloud import WordCloud import matplotlib.pyplot as plt # 加载原始文本数据(仍需用于主题表示) df = pd.read_csv('tokenized_abstract.csv', encoding='utf-8') sentences = df['Tokenized_Abstract'].tolist() print('文本条数: ', len(sentences)) print('预览第一条: ', sentences[0]) # 检查缺失值 print("缺失值数量:", df['Tokenized_Abstract'].isna().sum()) # 检查非字符串类型 non_str_mask = df['Tokenized_Abstract'].apply(lambda x: not isinstance(x, str)) print("非字符串样本:\n", df[non_str_mask]['Tokenized_Abstract'].head()) vectorizer_model = None from sentence_transformers import SentenceTransformer # 加载时间数据 df['Date'] = pd.to_datetime(df['Date']) # 从Date列提取年份 years = df['Date'].dt.year print(years) # Step 1 - Extract embeddings embedding_model = SentenceTransformer("C:\\Users\\18267\\.cache\\huggingface\\hub\\models--sentence-transformers--all-mpnet-base-v2\\snapshots\\9a3225965996d404b775526de6dbfe85d3368642") embeddings = np.load('clean_emb_last.npy') print(f"嵌入的形状: {embeddings.shape}") # Step 2 - Reduce dimensionality umap_model = UMAP(n_neighbors=7, n_components=10, min_dist=0.0, metric='cosine',random_state=42) # Step 3 - Cluster reduced embeddings hdbscan_model = HDBSCAN(min_samples=7, min_cluster_size=60,metric='euclidean', cluster_selection_method='eom', prediction_data=True) # Step 4 - Tokenize topics # Combine custom stop words with scikit-learn's English stop words custom_stop_words = ['h2', 'storing', 'storage', 'include', 'comprise', 'utility', 'model', 'disclosed', 'embodiment', 'invention', 'prior', 'art', 'according', 'present', 'method', 'system', 'device', 'may', 'also', 'use', 'used', 'provide', 'wherein', 'configured', 'predetermined', 'plurality', 'comprising', 'consists', 'following', 'characterized', 'claim', 'claims', 'said', 'first', 'second', 'third', 'fourth', 'fifth', 'one', 'two', 'three','hydrogen'] # Create combined stop words set all_stop_words = set(custom_stop_words).union(ENGLISH_STOP_WORDS) vectorizer_model = CountVectorizer(stop_words=list(all_stop_words)) # Step 5 - Create topic representation ctfidf_model = ClassTfidfTransformer() # All steps together topic_model = BERTopic( embedding_model=embedding_model, # Step 1 - Extract embeddings umap_model=umap_model, # Step 2 - Reduce dimensionality hdbscan_model=hdbscan_model, # Step 3 - Cluster reduced embeddings vectorizer_model=vectorizer_model, # Step 4 - Tokenize topics ctfidf_model=ctfidf_model, # Step 5 - Extract topic words top_n_words=50 ) # 拟合模型 topics, probs = topic_model.fit_transform(documents=sentences, # 仍需提供文档用于主题词生成 embeddings=embeddings # 注入预计算嵌入) ) # 获取主题聚类信息 topic_info = topic_model.get_topic_info() print(topic_info)```使用BERTopic的动态主题功能,传入时间戳参数,并在拟合后分析主题随时间的变化。设置时间戳 t1=2000-2010 年,t2=2011-2018 年,t3=2019-2024 年。最终,将当前阶段和前一阶段的 c-TF-IDF 平均值作为当前阶段的权重分数,取权重分数前 15 的单词作为动态主题的关键词,形成动态主题词列表。在动态主题方面,分别选取主题在每个阶段改进 c-TF-IDF 值排名前 15 的单词作为关键词绘制演化图分析主题在不同阶段的动态变化。请你提供方案实现上述操作。

filetype

import numpy as np import matplotlib.pyplot as plt import seaborn as sns import pandas as pd from matplotlib.gridspec import GridSpec # 设置中文显示 plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"] plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题 # 定义6种情况参数 cases = [ [0.10, 4, 2, 0.10, 18, 3, 0.10, 6, 3, 56, 6, 5], [0.20, 4, 2, 0.20, 18, 3, 0.20, 6, 3, 56, 6, 5], [0.10, 4, 2, 0.10, 18, 3, 0.10, 6, 3, 56, 30, 5], [0.20, 4, 1, 0.20, 18, 1, 0.20, 6, 2, 56, 30, 5], [0.10, 4, 8, 0.20, 18, 1, 0.10, 6, 2, 56, 10, 5], [0.05, 4, 2, 0.05, 18, 3, 0.05, 6, 3, 56, 10, 40] ] # 决策组合生成 (d1, d2, d3, d4) decisions = [(d1, d2, d3, d4) for d1 in [0, 1] for d2 in [0, 1] for d3 in [0, 1] for d4 in [0, 1]] def simulate_production(params, decision, num_parts=200, max_rounds=10): """生产模拟函数""" p1_defect, p1_price, p1_ins, p2_defect, p2_price, p2_ins, prod_defect, \ assemble_cost, prod_ins, price, exch_loss, dis_cost = params d1, d2, d3, d4 = decision # 初始化零件 p1_list = np.random.choice([0, 1], size=num_parts, p=[1-p1_defect, p1_defect]) p2_list = np.random.choice([0, 1], size=num_parts, p=[1-p2_defect, p2_defect]) # 成本跟踪 inspect_cost = 0 assemble_total = 0 disassemble_total = 0 exchange_count = 0 sales = 0 pending = 0 for _ in range(max_rounds): if len(p1_list) == 0 or len(p2_list) == 0: break # 零件检测 if d1: inspect_cost += len(p1_list) * p1_ins p1_ok = p1_list[p1_list == 0] else: p1_ok = p1_list.copy() if d2: inspect_cost += len(p2_list) * p2_ins p2_ok = p2_list[p2_list == 0] else: p2_ok = p2_list.copy() # 装配 n = min(len(p1_ok), len(p2_ok)) if n == 0: break p1_used = p1_ok[:n] p2_used = p2_ok[:n] p1_list = p1_ok[n:] p2_list = p2_ok[n:] assemble_total += n * assemble_cost # 成品质量判定 products = [] for i in range(n): p1q, p2q = p1_used[i], p2_used[i] if p1q == 0 and p2q == 0: if np.random.rand() < prod_defect: prod_qual = 1 else: prod_qual = 0 else: prod_qual = 1 products.append((prod_qual, p1q, p2q)) # 成品检测 if d3: inspect_cost += n * prod_ins market_ready = [p for p in products if p[0] == 0] defective = [p for p in products if p[0] == 1] else: market_ready = products defective = [] # 市场销售处理 for prod in market_ready: qual, _, _ = prod if qual == 0: if pending > 0: pending -= 1 else: sales += 1 else: exchange_count += 1 pending += 1 if d4: defective.append(prod) # 不合格品处理 if d4: disassemble_total += len(defective) * dis_cost for prod in defective: _, p1q, p2q = prod p1_list = np.append(p1_list, p1q) p2_list = np.append(p2_list, p2q) # 利润计算 purchase_cost = num_parts * (p1_price + p2_price) total_cost = purchase_cost + inspect_cost + assemble_total + disassemble_total + exchange_count * exch_loss profit = sales * price - total_cost return profit # 蚁群优化算法 def optimized_ant_colony(case_idx, num_simulations=200): params = cases[case_idx] pheromone = np.ones(len(decisions)) heuristic = np.zeros(len(decisions)) # 预计算启发式信息 for i, dec in enumerate(decisions): profits = [simulate_production(params, dec) for _ in range(20)] heuristic[i] = np.mean(profits) min_heuristic = np.min(heuristic) if min_heuristic < 0: heuristic = heuristic - min_heuristic + 1 best_profit = -np.inf best_decision = None # 蚁群参数 alpha, beta, rho, Q = 1.0, 2.0, 0.1, 100 num_ants, max_iter = 30, 50 # 蚁群迭代 for _ in range(max_iter): ant_profits = [] ant_decisions = [] for _ in range(num_ants): prob = (pheromone**alpha) * (heuristic**beta) prob_sum = np.sum(prob) if prob_sum <= 0 or np.isnan(prob_sum): prob = np.ones(len(decisions)) / len(decisions) else: prob = prob / prob_sum if np.any(prob < 0) or np.isnan(prob).any(): prob = np.ones(len(decisions)) / len(decisions) idx = np.random.choice(len(decisions), p=prob) decision = decisions[idx] profits = [simulate_production(params, decision) for _ in range(5)] avg_profit = np.mean(profits) ant_profits.append(avg_profit) ant_decisions.append(idx) if avg_profit > best_profit: best_profit = avg_profit best_decision = decision # 更新信息素 pheromone *= (1 - rho) for idx, profit in zip(ant_decisions, ant_profits): pheromone[idx] += Q * profit / max(1, abs(best_profit)) # 最终验证 final_profits = [simulate_production(params, best_decision) for _ in range(num_simulations)] return best_decision, np.mean(final_profits), final_profits # 主执行流程 print("开始优化过程...") results = [] for case_idx in range(6): print(f"处理情况 {case_idx+1}...") # 运行蚁群优化 best_decision, avg_profit, _ = optimized_ant_colony(case_idx) # 评估所有决策 decision_profits = [] for dec in decisions: profits = [simulate_production(cases[case_idx], dec) for _ in range(200)] decision_profits.append((dec, np.mean(profits))) # 按利润排序 decision_profits.sort(key=lambda x: x[1], reverse=True) top10 = decision_profits[:10] # 存储结果 results.append({ "case": case_idx+1, "best_decision": best_decision, "avg_profit": avg_profit, "top10": top10 }) # 创建单一热力图显示60个决策组合(按二进制顺序排列) def plot_single_heatmap(results): """创建包含60个方块的热力图(6情况×10决策)""" plt.figure(figsize=(20, 12)) # 准备数据 data = [] annots = [] # 全局颜色映射 all_profits = [] for res in results: for dec, profit in res["top10"]: all_profits.append(profit) vmin = min(all_profits) vmax = max(all_profits) # 构建数据矩阵(6行×10列) heatmap_data = np.zeros((len(results), 10)) annot_matrix = np.empty((len(results), 10), dtype=object) for case_idx, res in enumerate(results): case_num = res["case"] top10 = res["top10"] # 按二进制顺序排列当前情况的top10决策 sorted_top10 = sorted(top10, key=lambda x: x[0][0]*8 + x[0][1]*4 + x[0][2]*2 + x[0][3]) # 填充数据 for col_idx, (dec, profit) in enumerate(sorted_top10): heatmap_data[case_idx, col_idx] = profit annot_matrix[case_idx, col_idx] = f"{dec[0]}{dec[1]}{dec[2]}{dec[3]}\n{profit:.0f}" # 创建热力图(使用红色系颜色映射) ax = sns.heatmap( heatmap_data, annot=annot_matrix, fmt="", cmap="Reds", # 使用红色系颜色映射 linewidths=0.5, linecolor='gray', cbar_kws={'label': '平均利润'}, vmin=vmin, vmax=vmax ) # 设置坐标轴标签 ax.set_xticks(np.arange(10)) ax.set_xticklabels([f"决策 {i+1}" for i in range(10)], rotation=0) ax.set_yticks(np.arange(len(results)) + 0.5) ax.set_yticklabels([f"情况 {i+1}" for i in range(len(results))]) # 设置标题和标签 plt.title("6种生产情况的前10决策组合利润(按二进制顺序排列)", fontsize=18, pad=20) ax.set_xlabel("决策组合(按二进制顺序排列)", fontsize=14) ax.set_ylabel("生产情况", fontsize=14) plt.tight_layout() plt.savefig("single_heatmap.png", dpi=300, bbox_inches='tight') print("单一热力图已保存: single_heatmap.png") plt.show() # 生成单一热力图 print("\n生成单一热力图...") plot_single_heatmap(results) # 最终结果输出 print("\n优化结果总结:") for res in results: dec = res["best_decision"] print(f"情况 {res['case']}: 最优决策 {dec} | 平均利润: {res['avg_profit']:.2f}") print("\n所有优化完成! 单一热力图已保存") 采用随机数每次运算结果不应该都一样

四散
  • 粉丝: 86
上传资源 快速赚钱