## 基于神经网络深度学习的中文分词,采用预训练大模型BERT进行微调完成中文分词
import paddle
import warnings
import os
import paddle.nn as nn
import paddle.optimizer as opt
import matplotlib.pyplot as plt
from paddlenlp.transformers import AutoTokenizer, AutoModelForTokenClassification
from paddlenlp.data import DataCollatorForTokenClassification
from paddlenlp.metrics import ChunkEvaluator
from paddle.io import DataLoader, BatchSampler
from datasets import load_dataset
from pprint import pprint
from functools import partial
from seqeval.metrics.sequence_labeling import get_entities
warnings.filterwarnings('ignore')
def convert_example_to_feature(example, tokenizer, label2id, max_seq_len=512, is_infer=False):
"""将输入样本转换成适合BERT模型的输入特征\n
输入:
- example:单个输入样本
- tokenizer:Bert模型的tokenizer实例
- label2id:用于将标签转id的字典
- max_seq_len:模型处理文本的最大序列长度
- is_infer:是否为模型预测,对于预测数据,不会处理标签数据\n
输出:
- encoded_inputs:转换后的文本输入特征表示
"""
# 利用tokenizer将输入数据转换成特征表示
text = example['text'].strip().split(' ')
encoded_inputs = tokenizer(text, max_seq_len=max_seq_len, is_split_into_words='token', return_length=True)
# 处理带标签的数据,为与输入特征保持一致,需在标签前后加入特殊字符(‘CLS’、‘SEP’、‘PAD’)的映射
if not is_infer:
label = [label2id[item] for item in example['label'].split(' ')][:max_seq_len-2]
encoded_inputs['label'] = [label2id['O']] + label + [label2id['O']] * (len(encoded_inputs['input_ids']) - len(label) - 1)
assert len(encoded_inputs['label']) == len(encoded_inputs['input_ids'])
return encoded_inputs
def collate_fn(batch_data, pad_token_id=0, pad_token_type_id=0, pad_label_id=0):
"""批量数据处理函数\n
输入:
- batch_data:当前待处理的批量数据
- pad_token_id:对于token_id的填充占位符
- pad_token_type_id:对于token_type_id的填充占位符
- pad_label_id:对于label序列的填充占位符\n
输出:
- input_ids_list:输入编码
- token_type_ids_list:token编码
- label_list:标签编码
"""
input_ids_list, token_type_ids_list, label_list = [], [], []
max_len = 0
for example in batch_data:
input_ids, token_type_ids, label = example['input_ids'], example['token_type_ids'], example['label']
# 对各项数据进行文本填充
input_ids_list.append(input_ids)
token_type_ids_list.append(token_type_ids)
label_list.append(label)
# 保存序列最大长度
max_len = max(max_len, len(input_ids))
# 对数据序列进行填充至最大长度
for i in range(len(input_ids_list)):
cur_len = len(input_ids_list[i])
input_ids_list[i] = input_ids_list[i] + [pad_token_id] * (max_len - cur_len)
token_type_ids_list[i] = token_type_ids_list[i] + [pad_token_type_id] * (max_len - cur_len)
label_list[i] = label_list[i] + [pad_label_id] * (max_len - cur_len)
return paddle.to_tensor(input_ids_list), paddle.to_tensor(token_type_ids_list), paddle.to_tensor(label_list)
class BertForTokenClassification(nn.Layer):
"""BERT模型上叠加线性层,用以对输入序列的token进行分类\n
输入:
- bert:BERT模型的实例
- num_classes:分类的类别数,默认为2
- dropout:对于BERT输出向量的dropout概率,如果为none,则会使用BERT内部设置的hidden_dropout_prob\n
输出:
- logits:分类输出
"""
def __init__(self, bert, num_classes, dropout=None):
super(BertForTokenClassification, self).__init__()
self.num_classes = num_classes
self.bert = bert
self.dropout = nn.Dropout(dropout if dropout is not None else self.bert.config['hidden_dropout_prob'])
self.classifier = nn.Linear(self.bert.config['hidden_size'], num_classes)
def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None):
# 将输入传入BERT模型进行处理
outputs = self.bert(input_ids, token_type_ids=token_type_ids, position_ids=position_ids, attention_mask=attention_mask)
# 获取输入序列的特征表示
sequence_output = outputs[0]
sequence_output = self.dropout(sequence_output)
# 通过线性层将特征向量映射为词位标记的logits
logits = self.classifier(sequence_output)
return logits
def evaluate(model, data_loader, metric):
"""模型评估函数\n
输入:
- model:待评估的模型实例
- data_loader:待评估的数据集
- metric:用以统计评估指标的类实例\n
输出:
- precision:精准率
- recall:召回率
- f1_score:F1分数
"""
model.eval()
metric.reset()
precision, recall, f1_score = 0.0, 0.0, 0.0
# 读取dataloader里面的数据
for batch_data in data_loader:
input_ids, token_type_ids, labels, seq_lens = batch_data['input_ids'], batch_data['token_type_ids'], batch_data['label'], batch_data['seq_len']
# 模型预测
logits = model(input_ids, token_type_ids)
preditions = logits.argmax(axis=-1)
# 评估
num_infer_chunks, num_label_chunks, num_correct_chunks = metric.compute(seq_lens, preditions, labels)
metric.update(num_infer_chunks.numpy(), num_label_chunks.numpy(), num_correct_chunks.numpy())
precision, recall, f1_score = metric.accumulate()
return precision, recall, f1_score
def train(model, num_epochs, train_loader, dev_loader, loss_fn, optimizer, metric, eval_steps, log_steps, num_training_steps, save_dir):
"""模型训练函数\n
输入:
- model:待训练的模型实例
- num_epochs:训练回合数
- train_loader:训练集批迭代器
- dev_loader:验证集批迭代器
- loss_fn:损失函数
- optimizer:优化器
- metric:评估器
- eval_steps:评估步数
- log_steps:日志打印步数
- num_training_steps:总的训练步数
- save_dir:模型保存路径\n
输出:
- train_loss_record:损失值记录
- train_score_record:评估得分记录
"""
# 开启训练模式
model.train()
global_step = 0
best_score = 0.0
# 记录训练过程中的损失和在验证集上模型评估的分数
train_loss_record = []
train_score_record = []
# 进行num_epochs轮训练
for epoch in range(num_epochs):
for step, batch_data in enumerate(train_loader):
inputs, token_type_ids, labels = batch_data['input_ids'], batch_data['token_type_ids'], batch_data['label']
# 获取模型预测
logits = model(inputs, token_type_ids)
loss = loss_fn(logits, labels)
train_loss_record.append((global_step, loss.item()))
# 梯度反向传播
loss.backward()
optimizer.step()
optimizer.clear_grad()
if global_step % log_steps == 0:
print(f'[训练阶段] 训练进度:{epoch}/{num_epochs}\t{global_step}/{num_training_steps}\t损失值:{loss.item():.5f}')
if global_step != 0 and (global_step % eval_steps == 0 or global_step == (num_training_steps-1)):
precision, recall, F1 = evaluate(model, dev_loader, metric)
train_score_record.append((global_step, F1))
model.train()
# 如果当前指标为最优指