Bertcrf实体识别
作者:互联网
作者:昆特Alex 链接:https://www.zhihu.com/question/455063660/answer/2570541435 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 import torch import torch.nn as nn from transformers import BertModel, BertConfig from torchcrf import CRF import os class Bert_CRF(nn.Module): # BiLSTM加上并无多大用处,速度还慢了,可去掉LSTM层 def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256): super(Bert_CRF, self).__init__() self.tag_to_ix = tag_to_ix self.tagset_size = len(tag_to_ix) self.hidden_dim = hidden_dim self.embedding_dim = embedding_dim self.bert = BertModel.from_pretrained("hfl/chinese-roberta-wwm-ext") self.dropout = nn.Dropout(p=0.1) self.linear = nn.Linear(embedding_dim, self.tagset_size) self.crf = CRF(self.tagset_size, batch_first=True) def _get_features(self, sentence): with torch.no_grad(): outputs = self.bert(sentence) enc = outputs.last_hidden_state enc = self.dropout(enc) feats = self.linear(enc) return feats def forward(self, sentence, tags, mask, is_test=False): emissions = self._get_features(sentence) if not is_test: # Training,validation return loss loss=-self.crf.forward(emissions, tags, mask, reduction='mean') return loss else: # Testing,return decoding decode=self.crf.decode(emissions, mask) return decode #工具类 作者:昆特Alex 链接:https://www.zhihu.com/question/455063660/answer/2570541435 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 import torch from torch.utils.data import Dataset from transformers import BertTokenizer import pandas as pd tokenizer = BertTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext") ner_type = pd.read_csv("model_data/bio_type.txt") # 包含ner所有类别的txt文件 ners = ner_type["label"].tolist() VOCAB = [] for n in ners: VOCAB.extend(["B-" + n, "I-"+ n]) VOCAB.extend(['<PAD>', '[CLS]', '[SEP]', "O"]) tag2idx = {tag: idx for idx, tag in enumerate(VOCAB)} idx2tag = {idx: tag for idx, tag in enumerate(VOCAB)} MAX_LEN = 256 class NerDataset(Dataset): ''' Generate our dataset ''' def __init__(self, f_path, inference_df = None): self.sents = [] self.tags_li = [] if inference_df is not None: data = inference_df else: data = pd.read_csv(f_path) tags = data["label"].to_list() words = data["word"].to_list() print("f_path is {} len_word is {} len tag is {}".format(f_path, len(words), len(tags))) word, tag = [], [] for char, t in zip(words, tags): if char != '。': word.append(char) tag.append(t) else: if len(word) >= MAX_LEN-2: self.sents.append(['[CLS]'] + word[:MAX_LEN] +[char] + ['[SEP]']) self.tags_li.append(['[CLS]'] + tag[:MAX_LEN] + [t] + ['[SEP]']) else: self.sents.append(['[CLS]'] + word + [char] + ['[SEP]']) self.tags_li.append(['[CLS]'] + tag + [t] + ['[SEP]']) word, tag = [], [] if word: if len(word) >= MAX_LEN-2: self.sents.append(['[CLS]'] + word[:MAX_LEN] + ['[SEP]']) self.tags_li.append(['[CLS]'] + tag[:MAX_LEN] + ['[SEP]']) else: self.sents.append(['[CLS]'] + word + ['[SEP]']) self.tags_li.append(['[CLS]'] + tag + ['[SEP]']) word, tag = [], [] def __getitem__(self, idx): words, tags = self.sents[idx], self.tags_li[idx] token_ids = tokenizer.convert_tokens_to_ids(words) laebl_ids = [tag2idx[tag] for tag in tags] seqlen = len(laebl_ids) return token_ids, laebl_ids, seqlen def __len__(self): return len(self.sents) def PadBatch(batch): maxlen = max([i[2] for i in batch]) token_tensors = torch.LongTensor([i[0] + [0] * (maxlen - len(i[0])) for i in batch]) label_tensors = torch.LongTensor([i[1] + [0] * (maxlen - len(i[1])) for i in batch]) mask = (token_tensors > 0) return token_tensors, label_tensors, mask #训练 作者:昆特Alex 链接:https://www.zhihu.com/question/455063660/answer/2570541435 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 import torch import torch.nn as nn import torch.optim as optim from torch.utils import data import os import warnings import argparse import numpy as np from sklearn import metrics from transformers import AdamW, get_linear_schedule_with_warmup import pandas as pd from models import Bert_CRF from utils import NerDataset, PadBatch, VOCAB, tokenizer, tag2idx, idx2tag def train(e, model, iterator, optimizer, scheduler, criterion, device): model.train() losses = 0.0 step = 0 for i, batch in enumerate(iterator): step += 1 x, y, z = batch x = x.to(device) y = y.to(device) z = z.to(device) loss = model(x, y, z) losses += loss.item() loss.backward() optimizer.step() scheduler.step() optimizer.zero_grad() print("Epoch: {}, Loss:{:.4f}".format(e, losses/step)) def validate(e, model, iterator, device): model.eval() Y, Y_hat = [], [] losses = 0 step = 0 with torch.no_grad(): for i, batch in enumerate(iterator): step += 1 x, y, z = batch x = x.to(device) y = y.to(device) z = z.to(device) y_hat = model(x, y, z, is_test=True) loss = model(x, y, z) losses += loss.item() # Save prediction for j in y_hat: Y_hat.extend(j) # Save labels mask = (z==1) y_orig = torch.masked_select(y, mask) Y.append(y_orig.cpu()) Y = torch.cat(Y, dim=0).numpy() Y_hat = np.array(Y_hat) acc = (Y_hat == Y).mean()*100 print("Epoch: {}, Val Loss:{:.4f}, Val Acc:{:.3f}%".format(e, losses/step, acc)) return model, losses/step, acc def test(model, iterator, device): model.eval() Y, Y_hat = [], [] with torch.no_grad(): for i, batch in enumerate(iterator): x, y, z = batch x = x.to(device) z = z.to(device) y_hat = model(x, y, z, is_test=True) # Save prediction for j in y_hat: Y_hat.extend(j) # Save labels mask = (z==1).cpu() y_orig = torch.masked_select(y, mask) Y.append(y_orig) Y = torch.cat(Y, dim=0).numpy() y_true = [idx2tag[i] for i in Y] y_pred = [idx2tag[i] for i in Y_hat] return y_true, y_pred if __name__=="__main__": ner_type = pd.read_csv("model_data/type.txt") ners = ner_type["label"].tolist() labels = [] for n in ners: labels.extend(["B-" + n, "I-"+ n]) print("all type len is {}".format(len(labels))) best_model = None _best_val_loss = np.inf _best_val_acc = -np.inf parser = argparse.ArgumentParser() parser.add_argument("--batch_size", type=int, default=256) parser.add_argument("--lr", type=float, default=0.0005) parser.add_argument("--n_epochs", type=int, default=40) parser.add_argument("--trainset", type=str, default="model_data/train.csv") parser.add_argument("--validset", type=str, default="model_data/valid.csv") parser.add_argument("--testset", type=str, default="model_data/test.csv") ner = parser.parse_args() model = Bert_CRF(tag2idx).cuda() print('Initial model Done.') train_dataset = NerDataset(ner.trainset) print("train data len is {}".format(len(train_dataset))) eval_dataset = NerDataset(ner.validset) print("validset data len is {}".format(len(eval_dataset))) test_dataset = NerDataset(ner.testset) print("test_dataset len is {}".format(len(test_dataset))) print('Load Data Done.') train_iter = data.DataLoader(dataset=train_dataset, batch_size=ner.batch_size, shuffle=True, num_workers=4, collate_fn=PadBatch) eval_iter = data.DataLoader(dataset=eval_dataset, batch_size=ner.batch_size, shuffle=False, num_workers=4, collate_fn=PadBatch) test_iter = data.DataLoader(dataset=test_dataset, batch_size=ner.batch_size, shuffle=False, num_workers=4, collate_fn=PadBatch) optimizer = AdamW(model.parameters(), lr=ner.lr, eps=1e-6) len_dataset = len(train_dataset) epoch = ner.n_epochs batch_size = ner.batch_size total_steps = (len_dataset // batch_size) * epoch if len_dataset % batch_size == 0 else (len_dataset // batch_size + 1) * epoch warm_up_ratio = 0.1 # Define 10% steps scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = warm_up_ratio * total_steps, num_training_steps = total_steps) criterion = nn.CrossEntropyLoss(ignore_index=0) for epoch in range(1, ner.n_epochs+1): train(epoch, model, train_iter, optimizer, scheduler, criterion, device) candidate_model, loss, acc = validate(epoch, model, eval_iter, device) if loss < _best_val_loss and acc > _best_val_acc: best_model = candidate_model _best_val_loss = loss _best_val_acc = acc y_test, y_pred = test(best_model, test_iter, device) print(metrics.classification_report(y_test, y_pred, labels=labels, digits=3)) torch.save(best_model.state_dict(), "checkpoint/0704_ner.pt") test_data = pd.read_csv("model_data/0704_bio_test.csv") y_test_useful = [] y_pred_useful = [] for a, b in zip(y_test, y_pred): if a not in ['[CLS]', '[SEP]']: y_test_useful.append(a) y_pred_useful.append(b) test_data["labeled"] = y_test_useful test_data["pred"] = y_pred_useful test_data.to_csv("result_files/bio_test_result.csv", index=False)
标签:Bertcrf,self,实体,batch,len,test,import,model,识别 来源: https://www.cnblogs.com/qiaoqifa/p/16502373.html