NLP/Sequence to Sequence
Sequence to Sequence 모델
Sequence-to-Sequence
RNN의 구조 중 many to many의 구조. (입력도 sequence 출력도 sequence)
encoder와 decoder는 서로 파라미터를 공유하지 않는다.
-
encoder의 마지막 timestep의 hidden state vector는 decoder의 h_0의 역할을 한다.
-
decoder에서의 문장 생성시에, 첫 번째 단어로
<SoS>
토큰을 넣어준다. (Start of Sentence)
또한 마지막에는<EoS>
토큰을 넣어준다. (End of Sentence)
Seq2Seq model with Attention
Seq-to-Seq 모델에서 순차적으로 단어의 정보를 매 time step마다 축적해가면서 hidden state vector를 생성하는 과정
많은 Sequence를 거치면서 정보가 손실될 수 있다. 때문에 각각의 hidden state vector를 decoder에 넘겨줘서 정보 손실을 최소화 한다.
- encoder의 hidden state vector 와 decoder의 hidden state vector의 내적으로 attention output(context vector)을 구해낸다.
(파란색이 encoder, 빨간색이 decoder)
Teacher forcing : 잘못된 예측을 막는 것.
Seq2Seq 모델 구현 실습
Encoder, Decoder 를 구현하고 이를 이용해서 seq2seq모델을 구현한다.
-
패키지 import 및 데이터 전처리
from tqdm import tqdm
from torch import nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import torch
import randomon
데이터 설정 및 전처리
vocab_size = 100
pad_id = 0
sos_id = 1 # start token
eos_id = 2 # end token
src_data = [
[3, 77, 56, 26, 3, 55, 12, 36, 31],
[58, 20, 65, 46, 26, 10, 76, 44],
[58, 17, 8],
[59],
[29, 3, 52, 74, 73, 51, 39, 75, 19],
[41, 55, 77, 21, 52, 92, 97, 69, 54, 14, 93],
[39, 47, 96, 68, 55, 16, 90, 45, 89, 84, 19, 22, 32, 99, 5],
[75, 34, 17, 3, 86, 88],
[63, 39, 5, 35, 67, 56, 68, 89, 55, 66],
[12, 40, 69, 39, 49]
]
trg_data = [
[75, 13, 22, 77, 89, 21, 13, 86, 95],
[79, 14, 91, 41, 32, 79, 88, 34, 8, 68, 32, 77, 58, 7, 9, 87],
[85, 8, 50, 30],
[47, 30],
[8, 85, 87, 77, 47, 21, 23, 98, 83, 4, 47, 97, 40, 43, 70, 8, 65, 71, 69, 88],
[32, 37, 31, 77, 38, 93, 45, 74, 47, 54, 31, 18],
[37, 14, 49, 24, 93, 37, 54, 51, 39, 84],
[16, 98, 68, 57, 55, 46, 66, 85, 18],
[20, 70, 14, 6, 58, 90, 30, 17, 91, 18, 90],
[37, 93, 98, 13, 45, 28, 89, 72, 70]
]
trg_data = [[sos_id]+seq+[eos_id] for seq in tqdm(trg_data)] # target data에 start, end token 추가
# 패딩 추가 함수
def padding(data, is_src=True):
max_len = len(max(data, key=len))
print(f"Maximum sequence length: {max_len}")
valid_lens = []
for i, seq in enumerate(tqdm(data)):
valid_lens.append(len(seq))
if len(seq) < max_len:
data[i] = seq + [pad_id] * (max_len - len(seq))
return data, valid_lens, max_len
#패딩 추가 적용.
src_data, src_lens, src_max_len = padding(src_data)
trg_data, trg_lens, trg_max_len = padding(trg_data)
# B: batch size, S_L: source maximum sequence length, T_L: target maximum sequence length
src_batch = torch.LongTensor(src_data) # (B, S_L)
src_batch_lens = torch.LongTensor(src_lens) # (B)
trg_batch = torch.LongTensor(trg_data) # (B, T_L)
trg_batch_lens = torch.LongTensor(trg_lens) # (B)
이전의 PackedSequence를 사용
# packedSequence사용을 위해 정렬
src_batch_lens, sorted_idx = src_batch_lens.sort(descending=True)
src_batch = src_batch[sorted_idx]
trg_batch = trg_batch[sorted_idx]
trg_batch_lens = trg_batch_lens[sorted_idx]
-
Encoder 구현
파라미터 설정
embedding_size = 256
hidden_size = 512
num_layers = 2
num_dirs = 2
dropout = 0.1
Encoder 클래스
nn.Embedding을 학습시키고, encoder 역할을 할 gru를 먼저 설정한다.
gru 모델에 만들어 둔, 미리 설정한 packed_input과 h_0를 넣어준다.
forward의 마지막 hidden state vector와 backward의 마지막 hidden state vector를 합쳐서 새로운 hidden state를 만든다.
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_size)
# encoder 역할을 하는 Bi-GRU.
self.gru = nn.GRU(
input_size=embedding_size,
hidden_size=hidden_size,
num_layers=num_layers,
bidirectional=True if num_dirs > 1 else False,
dropout=dropout
)
# decoder와의 크기를 맞추기 위한 작업
self.linear = nn.Linear(num_dirs * hidden_size, hidden_size)
def forward(self, batch, batch_lens): # batch: (B, S_L), batch_lens: (B)
# d_w: word embedding size
batch_emb = self.embedding(batch) # (B, S_L, d_w)
batch_emb = batch_emb.transpose(0, 1) # (S_L, B, d_w)
packed_input = pack_padded_sequence(batch_emb, batch_lens)
h_0 = torch.zeros((num_layers * num_dirs, batch.shape[0], hidden_size)) # (num_layers*num_dirs, B, d_h) = (4, B, d_h)
# gru 모델에 만들어 둔, packed_input과 h_0를 넣어준다.
packed_outputs, h_n = self.gru(packed_input, h_0) # h_n: (4, B, d_h)
outputs = pad_packed_sequence(packed_outputs)[0] # outputs: (S_L, B, 2d_h)
forward_hidden = h_n[-2, :, :]
backward_hidden = h_n[-1, :, :]
#forward 관점에서의 마지막 hidden state vector와 backward 관점에서의 마지막 hidden state vector를 합친다.
hidden = self.linear(torch.cat((forward_hidden, backward_hidden), dim=-1)).unsqueeze(0) # (1, B, d_h)
return outputs, hidden
encoder = Encoder()
-
Decoder 구현
각 time step의 hidden state Vector를 구한다.
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_size)
self.gru = nn.GRU(
input_size=embedding_size,
hidden_size=hidden_size,
)
# 각 time step의 hidden state vector
self.output_layer = nn.Linear(hidden_size, vocab_size)
def forward(self, batch, hidden): # batch: (B), hidden: (1, B, d_h)
batch_emb = self.embedding(batch) # (B, d_w)
batch_emb = batch_emb.unsqueeze(0) # (1, B, d_w)
outputs, hidden = self.gru(batch_emb, hidden) # outputs: (1, B, d_h), hidden: (1, B, d_h)
# V: vocab size
outputs = self.output_layer(outputs) # (1, B, V)
return outputs.squeeze(0), hidden
decoder = Decoder()
-
Seq2Seq 모델
encoder에 src_batch, src_batch_lens를 넣어서 마지막 hidden state vector를 만든다.
for 문을 통해 <SoS> 토큰부터 시작해서 길이 1짜리 hidden state vector를 만든다.
현재 time step에서의 예측 배치 텐서를 top_ids에 저장한다.
현재의 top_ids는 특정 teacher_forching_prob를 사용해서 사용할지 말지를 결정한다.
class Seq2seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src_batch, src_batch_lens, trg_batch, teacher_forcing_prob=0.5):
# src_batch: (B, S_L), src_batch_lens: (B), trg_batch: (B, T_L)
_, hidden = self.encoder(src_batch, src_batch_lens) # hidden: (1, B, d_h)
input_ids = trg_batch[:, 0] # (B)
batch_size = src_batch.shape[0]
outputs = torch.zeros(trg_max_len, batch_size, vocab_size) # (T_L, B, V)
for t in range(1, trg_max_len):
decoder_outputs, hidden = self.decoder(input_ids, hidden) # decoder_outputs: (B, V), hidden: (1, B, d_h)
outputs[t] = decoder_outputs
_, top_ids = torch.max(decoder_outputs, dim=-1) # top_ids: (B)
input_ids = trg_batch[:, t] if random.random() > teacher_forcing_prob else top_ids
return outputs
seq2seq = Seq2seq(encoder, decoder)
-
Seq2Seq 모델 사용
src_batch와 trg_batch를 seq2seq모델에 넣어본다.
모델에 적용 후 output의 형태는 max_length, batch_size, vocab_size 이다.
outputs = seq2seq(src_batch, src_batch_lens, trg_batch)
print(outputs.shape)
--- outputs.shape ---
# 차례대로 max_length, batch_size, vocab_size 이다.
torch.Size([22, 10, 100])
decoder를 통해 만든 결과에서 마지막 <EoS>토큰을 예측한 "?"항목은 필요가 없으므로 제거하고,
실제 trg_batch에서는 앞의 <SoS>토큰을 제거한다.
loss값을 구한다음, 역전파 연산을 해주고 업데이트를 하면서 seq2seq모델을 학습시킨다.
loss_function = nn.CrossEntropyLoss()
preds = outputs[:-1, :, :].transpose(0, 1) # (B, T_L-1, V)
loss = loss_function(preds.contiguous().view(-1, vocab_size), trg_batch[:,1:].contiguous().view(-1, 1).squeeze(1))
Seq2Seq Attention 실습
위에서 진행했던 데이터전처리(데이터, packedSequence)는 모두 동일하다.
Encoder를 구현하는 부분부터 차이점이 생긴다.
-
Encoder 구현
hidden state vector뿐만 아니라, 기존의 output도 linear를 적용한다.
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_size)
self.gru = nn.GRU(
input_size=embedding_size,
hidden_size=hidden_size,
num_layers=num_layers,
bidirectional=True if num_dirs > 1 else False,
dropout=dropout
)
self.linear = nn.Linear(num_dirs * hidden_size, hidden_size)
def forward(self, batch, batch_lens): # batch: (B, S_L), batch_lens: (B)
# d_w: word embedding size
batch_emb = self.embedding(batch) # (B, S_L, d_w)
batch_emb = batch_emb.transpose(0, 1) # (S_L, B, d_w)
packed_input = pack_padded_sequence(batch_emb, batch_lens)
h_0 = torch.zeros((num_layers * num_dirs, batch.shape[0], hidden_size)) # (num_layers*num_dirs, B, d_h) = (4, B, d_h)
packed_outputs, h_n = self.gru(packed_input, h_0) # h_n: (4, B, d_h)
outputs = pad_packed_sequence(packed_outputs)[0] # outputs: (S_L, B, 2d_h)
# 차원을 줄이고, non linear 인 tanh를 거친다.
outputs = torch.tanh(self.linear(outputs)) # (S_L, B, d_h)
forward_hidden = h_n[-2, :, :]
backward_hidden = h_n[-1, :, :]
hidden = torch.tanh(self.linear(torch.cat((forward_hidden, backward_hidden), dim=-1))).unsqueeze(0) # (1, B, d_h)
return outputs, hidden
encoder = Encoder()
-
Dot-product Attention 구현
decoder hidden state와 encoder hidden state간의 내적을 구해 유사도를 구한 다음, 해당 유사도를 소프트맥스 함수를 적용한다.
그 후, 가중치를 구해서 encoder hidden state 가중합하여 Attention value를 끌어낸다.
class DotAttention(nn.Module):
def __init__(self):
super().__init__()
def forward(self, decoder_hidden, encoder_outputs): # (1, B, d_h), (S_L, B, d_h)
query = decoder_hidden.squeeze(0) # (B, d_h)
key = encoder_outputs.transpose(0, 1) # (B, S_L, d_h)
# 내적 --> 유사도 (key와 query)
# 각 encoder hidden state의 길이만큼 반복적으로 각 차원을 곱하고, 더한다.
energy = torch.sum(torch.mul(key, query.unsqueeze(1)), dim=-1) # (B, S_L)
# 내적(유사도)를 소프트맥스 함수에 적용한다.
attn_scores = F.softmax(energy, dim=-1) # (B, S_L)
# 그 후에 가중치를 구한다.
# 각 encoder hidden state에 적용되는 가중치로 나타내기 위해 해당 연산을 한다.
attn_values = torch.sum(torch.mul(encoder_outputs.transpose(0, 1), attn_scores.unsqueeze(2)), dim=1) # (B, d_h)
return attn_values, attn_scores
dot_attn = DotAttention()
-
Decoder 구현
- 기본적인 seq2seq의 decoder에다가 Attention 을 추가.
class Decoder(nn.Module):
def __init__(self, attention):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_size)
self.attention = attention
self.rnn = nn.GRU(
embedding_size,
hidden_size
)
self.output_linear = nn.Linear(2*hidden_size, vocab_size)
def forward(self, batch, encoder_outputs, hidden): # batch: (B), encoder_outputs: (L, B, d_h), hidden: (1, B, d_h)
batch_emb = self.embedding(batch) # (B, d_w)
batch_emb = batch_emb.unsqueeze(0) # (1, B, d_w)
outputs, hidden = self.rnn(batch_emb, hidden) # (1, B, d_h), (1, B, d_h)
attn_values, attn_scores = self.attention(hidden, encoder_outputs) # (B, d_h), (B, S_L)
concat_outputs = torch.cat((outputs, attn_values.unsqueeze(0)), dim=-1) # (1, B, 2d_h)
return self.output_linear(concat_outputs).squeeze(0), hidden # (B, V), (1, B, d_h)
decoder = Decoder(dot_attn)
-
Seq2Seq 모델 구현
- decoder에 attention을 넣은 모델
class Seq2seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src_batch, src_batch_lens, trg_batch, teacher_forcing_prob=0.5):
# src_batch: (B, S_L), src_batch_lens: (B), trg_batch: (B, T_L)
encoder_outputs, hidden = self.encoder(src_batch, src_batch_lens) # encoder_outputs: (S_L, B, d_h), hidden: (1, B, d_h)
input_ids = trg_batch[:, 0] # (B)
batch_size = src_batch.shape[0]
outputs = torch.zeros(trg_max_len, batch_size, vocab_size) # (T_L, B, V)
for t in range(1, trg_max_len):
decoder_outputs, hidden = self.decoder(input_ids, encoder_outputs, hidden) # decoder_outputs: (B, V), hidden: (1, B, d_h)
outputs[t] = decoder_outputs
_, top_ids = torch.max(decoder_outputs, dim=-1) # top_ids: (B)
input_ids = trg_batch[:, t] if random.random() > teacher_forcing_prob else top_ids
return outputs
seq2seq = Seq2seq(encoder, decoder)
\[s_{t-1}와 모든 h_1 , ... , h_{T_z}사이의 연관성을 가중치로 보고, 이 가중치의 합을 구해서 Context \, vector를 구한다.\]Bahdanau Attention
해당 시점의 decode hidden vector와 전체 encode hidden vector를 내적하는 것이 아니다.
concat을 통해서 특정 layer를 통해 score를 계산하는 방식
-
Encoder 구현
class ConcatAttention(nn.Module):
def __init__(self):
super().__init__()
self.w = nn.Linear(2*hidden_size, hidden_size, bias=False)
self.v = nn.Linear(hidden_size, 1, bias=False)
def forward(self, decoder_hidden, encoder_outputs): # (1, B, d_h), (S_L, B, d_h)
src_max_len = encoder_outputs.shape[0]
decoder_hidden = decoder_hidden.transpose(0, 1).repeat(1, src_max_len, 1) # (B, S_L, d_h)
encoder_outputs = encoder_outputs.transpose(0, 1) # (B, S_L, d_h)
concat_hiddens = torch.cat((decoder_hidden, encoder_outputs), dim=2) # (B, S_L, 2d_h)
energy = torch.tanh(self.w(concat_hiddens)) # (B, S_L, d_h)
attn_scores = F.softmax(self.v(energy), dim=1) # (B, S_L, 1)
attn_values = torch.sum(torch.mul(encoder_outputs, attn_scores), dim=1) # (B, d_h)
return attn_values, attn_scores
-
Decoder 구현
- embedding 과 attention을 수행해서 attention value를 embedding과 concat을 하여, input_size가 embedding_size + hidden_size가 된다.
class Decoder(nn.Module):
def __init__(self, attention):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_size)
self.attention = attention
self.rnn = nn.GRU(
embedding_size + hidden_size,
hidden_size
)
self.output_linear = nn.Linear(hidden_size, vocab_size)
def forward(self, batch, encoder_outputs, hidden): # batch: (B), encoder_outputs: (S_L, B, d_h), hidden: (1, B, d_h)
batch_emb = self.embedding(batch) # (B, d_w)
batch_emb = batch_emb.unsqueeze(0) # (1, B, d_w)
attn_values, attn_scores = self.attention(hidden, encoder_outputs) # (B, d_h), (B, S_L)
concat_emb = torch.cat((batch_emb, attn_values.unsqueeze(0)), dim=-1) # (1, B, d_w+d_h)
outputs, hidden = self.rnn(concat_emb, hidden) # (1, B, d_h), (1, B, d_h)
return self.output_linear(outputs).squeeze(0), hidden # (B, V), (1, B, d_h)