TimeSeries) [MultiHead Self Attention] multi target 예측

2023. 9. 23. 10:47분석 Python/Pytorch

새로운 모델 구조 만들어보기
해당 시도는 Multi Head Self Attention을 이용하면, 각 Head 마다 고유한 특징을 잡는 것을 이용하여 예측하는 코드와 분석을 통해서, gradient 실제로 다르게 가는 지를 확인하고자 함.

 

Load Library

import numpy as np
import torch
import matplotlib.pyplot as plt
import torch.optim as optim
from IPython.display import clear_output

데이터 정의

이번 블로그에서는 타깃 3개를 예측하는 것을 정의해서 예측하고자 함.
 

# 예제 데이터 생성
t = np.linspace(0, 100, 1000)  # 0에서 100까지 1000개의 점
y1 = np.sin(t)
y2 = np.cos(t)
y3 = np.sin(t) * np.cos(t)
# Adding Gaussian noise
y1 += 0.5 * np.random.randn(y1.shape[0])
y2 += 0.5 * np.random.randn(y2.shape[0])
# y3 += 0.5 * np.random.randn(y3.shape[0])

# 그래프 그리기
plt.figure(figsize=(15,5))
plt.subplot(1, 3, 1)
plt.plot(t, y1)
plt.title("y1 = sin(t)")

plt.subplot(1, 3, 2)
plt.plot(t, y2)
plt.title("y2 = cos(t)")

plt.subplot(1, 3, 3)
plt.plot(t, y3)
plt.title("y3 = sin(t) * cos(t)")

plt.show()

Window 데이터 형태로 구성하기

seq_length = 10

X, Y = [], []

for i in range(len(y1) - seq_length):
    X.append(np.vstack([y1[i:i+seq_length],y2[i:i+seq_length],y3[i:i+seq_length]]))
    Y.append([y1[i+seq_length], y2[i+seq_length], y3[i+seq_length]])

X = torch.tensor(X, dtype=torch.float32).transpose(1, 2)
Y = torch.tensor(Y, dtype=torch.float32)

train_size = int(0.67 * len(Y))
test_size = len(Y) - train_size

X_train, X_test = X[:train_size], X[train_size:]
Y_train, Y_test = Y[:train_size], Y[train_size:]
X_train.size() , Y_train.size()
# (torch.Size([663, 10, 3]), torch.Size([663, 3]))

멀티 타겟 모델 만들기

Attention 모델 정의

가장 큰 특징은 output_dim 마다 분리하는 작업을 진행
그리고 gradient 를 미리 저장하여, 실제로 타깃마다 다르게 가는지 확인하고자 함. 
 

- 주의사항
처음부터 바로 reshape을 하지 말고, 아래처럼 부분적으로 reshape을 해야 gradient 가 원하는 방향으로 흘러가서 update가 됨.
저런 식으로 작업하지 않으면, 다른 head에 영향을 주는 것을 확인함. 
(아닐수도 있긴 합니다...)

 

class Attention(nn.Module) :
    def __init__(self, input_dim , seq_len, output_dim) :
        super(Attention, self).__init__()
        num_head = 6
        d_k = 12
        self.q = nn.Linear(input_dim, output_dim * num_head * d_k, bias=True)
        self.k = nn.Linear(input_dim, output_dim * num_head * d_k, bias=True)
        self.v = nn.Linear(input_dim, output_dim * num_head * d_k, bias=True)
        self.last_list = nn.ModuleList()
        self.output_dim = output_dim
        self.seq_len = seq_len
        self.num_head = num_head
        self.d_k = d_k
        
        for i in range(output_dim) :
            last = nn.Linear(num_head * d_k * seq_len,1, bias=True)
            self.last_list.append(last)
        
    def forward(self, x) :
        
        bs,seq_len,input_dim = x.size()
        qq = self.q(x)
        kk = self.k(x)
        vv = self.v(x)
        qq = qq.reshape(bs, seq_len , self.output_dim, self.num_head * self.d_k)
        kk = kk.reshape(bs, seq_len , self.output_dim, self.num_head * self.d_k)
        vv = vv.reshape(bs, seq_len , self.output_dim, self.num_head * self.d_k)
        qq = qq.reshape(bs, seq_len , self.output_dim, self.num_head , self.d_k)
        kk = kk.reshape(bs, seq_len , self.output_dim, self.num_head , self.d_k)
        vv = vv.reshape(bs, seq_len , self.output_dim, self.num_head , self.d_k)
        self.qq = qq.permute(0,2,1,3,4)
        self.kk = kk.permute(0,2,1,3,4)
        vv = vv.permute(0,2,3,1,4)
        self.qq.retain_grad()
        self.kk.retain_grad()
        score = torch.einsum("boqnd,boknd->bonqk", [self.qq, self.kk])
        score = score / (self.d_k ** 0.5)
        self.score = nn.Softmax(dim=-1)(score)   
        self.out = torch.einsum("bonqk,bonkd->boqnd", [self.score, vv])
        # out = torch.einsum("bonqk,bonqd->boqnd",[self.score,vv]) # .permute(0,1,3,2,4)
        out = self.out.reshape(bs,self.output_dim,-1)
        outs = torch.zeros(bs,self.output_dim,device=x.device)
        for i in range(self.output_dim) :
            outs[:,[i]] = self.last_list[i](out[:,i,:])
        return outs

 

모델 정의 및 옵티마이저 정의

model = Attention(seq_len=seq_length,output_dim=3,input_dim=3)
criterion = nn.L1Loss()

optimizer = optim.Adam(model.parameters(), lr=0.01)

모델 학습

특징적인 것은 중간에 gradient를 저장하는 작업이 필요

num_epochs = 1000
q_grad_list = []  
outputs_list = []
targets_list = []
for epoch in range(num_epochs):
    model.train()
    outputs = model(X_train)  # hidden state 값을 반환 받음
    loss = criterion(outputs, Y_train)
    outputs_list.append(outputs[0,:].detach().numpy())
    targets_list.append(Y_train[0,:].detach().numpy())
    optimizer.zero_grad()
    loss.backward()
    q_grad_list.append(model.qq.grad.mean(axis=[0,2,3,4]).detach().numpy())
    optimizer.step()

    if (epoch + 1) % 50 == 0:
        clear_output(wait=True)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
        for i in range(3) :
            plt.plot(outputs[:,i].detach().numpy(),label=f"output_{i}")
            plt.plot(Y_train[:,i].detach().numpy(),label=f"target_{i}")
            plt.legend()
            plt.show()

특정한 행에 대해서 학습 추적하기

 

타겟에 대한 Gradient 확인하기

각 타겟마다 gradient의 패턴이 다른 것을 확인함.

grad_stack = np.vstack(q_grad_list)
for i in range(grad_stack.shape[1]) :
    plt.plot(grad_stack[:,i],label=f"q_grad_{i}")
plt.legend()
plt.show()

Attention 패턴 확인

각 타겟마다 다양한 패턴을 학습한 것을 확인할 수 있음.

bs , output_dim , num_head, _ ,_ = model.score.size()
fig , ax = plt.subplots(figsize=(25,15),nrows=output_dim,ncols=num_head)
axes = ax.flatten()
idx = 0

for output_idx in range(output_dim) :
    for n_head in range(num_head) :
        axes[idx].imshow(model.score[:,output_idx,n_head,:,:].mean(0).detach().numpy(),label=f"output_{output_idx}_head_{n_head}")
        axes[idx].set_title(f"attention score output_{output_idx}_head_{n_head}")
        idx+= 1
plt.show()

 
 

전체 아키텍처 기반으로 특정한 타겟에 대한 모델 만들기

multihead 방식을 도입한 이유가, gradient가 각각 head마다 다르게 흘러간다는 것 때문에 한 것이기 때문에
실제로 그렇게 동작하는 지 확인하고자 한다.
아래 코드에서는 위에 3개중에 한 개의 타깃에 대해서만 예측하게 해 봄.

class Attention(nn.Module) :
    def __init__(self, input_dim , seq_len, output_dim) :
        super(Attention, self).__init__()
        num_head = 3
        d_k = 3
        # self.lstm = nn.LSTM(input_dim,input_dim,batch_first=True)
        self.q = nn.Linear(input_dim, output_dim * num_head * d_k, bias=False)
        self.k = nn.Linear(input_dim, output_dim * num_head * d_k, bias=False)
        self.v = nn.Linear(input_dim, output_dim * num_head * d_k, bias=False)
        self.last_list = nn.ModuleList()
        self.output_dim = output_dim
        self.seq_len = seq_len
        self.num_head = num_head
        self.d_k = d_k
        self.real_output_dim = 1
        for i in range(self.real_output_dim) :
            last = nn.Linear(num_head * d_k * seq_len,1, bias=True)
            self.last_list.append(last)
        
    def forward(self, x) :
        
        bs,seq_len,input_dim = x.size()
        # x , _ = self.lstm(x)
        qq = self.q(x)
        kk = self.k(x)
        vv = self.v(x)
        qq = qq.reshape(bs, seq_len , self.output_dim, self.num_head * self.d_k)
        kk = kk.reshape(bs, seq_len , self.output_dim, self.num_head * self.d_k)
        vv = vv.reshape(bs, seq_len , self.output_dim, self.num_head * self.d_k)
        qq = qq.reshape(bs, seq_len , self.output_dim, self.num_head , self.d_k)
        kk = kk.reshape(bs, seq_len , self.output_dim, self.num_head , self.d_k)
        vv = vv.reshape(bs, seq_len , self.output_dim, self.num_head , self.d_k)
        self.qq = qq.permute(0,2,1,3,4)
        self.kk = kk.permute(0,2,1,3,4)
        vv = vv.permute(0,2,3,1,4)
        self.qq.retain_grad()
        self.kk.retain_grad()
        score = torch.einsum("boqnd,boknd->bonqk", [self.qq, self.kk])
        score = score / (self.d_k ** 0.5)
        self.score2 = nn.Softmax(dim=-1)(score)
        
        qq = self.qq[:,[0],:,:,:]
        kk = self.kk[:,[0],:,:,:]
        vv = vv[:,[0],:,:,:]
        score = torch.einsum("boqnd,boknd->bonqk", [qq, kk])
        self.score = nn.Softmax(dim=-1)(score)
        self.out = torch.einsum("bonqk,bonkd->boqnd", [self.score, vv])
        # out = torch.einsum("bonqk,bonqd->boqnd",[self.score,vv]) # .permute(0,1,3,2,4)
        out = self.out.reshape(bs,self.real_output_dim,-1)
        outs = torch.zeros(bs,self.real_output_dim,device=x.device)
        for i in range(self.real_output_dim) :
            outs[:,[i]] = self.last_list[i](out[:,i,:])
        return outs
model = Attention(seq_len=seq_length,output_dim=3,input_dim=3)
criterion = nn.L1Loss()
import torch.optim as optim

optimizer = optim.Adam(model.parameters(), lr=0.01)

 
나중에 변화량을 비교하기 위해 q weight 일단 저장한다.

ori_q = deepcopy(model.q.weight.cpu().detach().numpy().T)
plt.imshow(model.q.weight.cpu().detach().numpy().T)

outputs = model(X_train)  # hidden state 값을 반환 받음
bs , output_dim , num_head, _ ,_ = model.score.size()
fig , ax = plt.subplots(figsize=(25,15),nrows=output_dim,ncols=num_head)
axes = ax.flatten()
idx = 0

for output_idx in range(output_dim) :
    for n_head in range(num_head) :
        axes[idx].imshow(model.score[:,output_idx,n_head,:,:].mean(0).detach().numpy(),label=f"output_{output_idx}_head_{n_head}")
        axes[idx].set_title(f"attention score output_{output_idx}_head_{n_head}")
        idx+= 1
plt.show()

학습할 score 시각화

전체 score 시각화

from copy import deepcopy
model.eval()
outputs = model(X_train)  # hidden state 값을 반환 받음
ori_score = deepcopy(model.score2.detach().cpu().numpy().mean(axis=0))
output_dim , num_head, _ ,_ = ori_score.shape
fig , ax = plt.subplots(figsize=(25,15),nrows=output_dim,ncols=num_head)
axes = ax.flatten()
idx = 0
for output_idx in range(output_dim) :
    for n_head in range(num_head) :
        axes[idx].imshow(ori_score[output_idx,n_head,:,:],label=f"output_{output_idx}_head_{n_head}")
        axes[idx].set_title(f"attention score output_{output_idx}_head_{n_head}")
        idx+= 1
plt.show()

 

학습하기

from IPython.display import clear_output
num_epochs = 1000
q_grad_list = [] 
k_grad_list = [] 
outputs_list = []
targets_list = []
model.train()
for epoch in range(num_epochs):
    model.train()
    outputs = model(X_train)  # hidden state 값을 반환 받음
    loss = criterion(outputs, Y_train[:,[0]])
    outputs_list.append(outputs[0,:].detach().numpy())
    targets_list.append(Y_train[0,[0]].detach().numpy())
    optimizer.zero_grad()
    loss.backward()
    q_grad_list.append(model.qq.grad.mean(axis=[0,2,3,4]).detach().numpy())
    k_grad_list.append(model.kk.grad.mean(axis=[0,2,3,4]).detach().numpy())
    optimizer.step()

    if (epoch + 1) % 50 == 0:
        clear_output(wait=True)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
        for i in range(1) :
            plt.plot(outputs[:,i].detach().numpy(),label=f"output_{i}")
            plt.plot(Y_train[:,i].detach().numpy(),label=f"target_{i}")
            plt.legend()
            plt.show()

Q Layer에 대한 가중치 변화 확인하기

원하는 결과처럼 특정한 가중치만 변화한 것을 확인할 수 있다.

after_q = model.q.weight.cpu().detach().numpy().T
plt.imshow(np.abs(ori_q - after_q))

 

grad_stack = np.vstack(q_grad_list)
for i in range(grad_stack.shape[1]) :
    plt.plot(grad_stack[:,i],label=f"q_grad_{i}")
    plt.legend()
    plt.show()

 

학습한 타겟에 대한 attention 확인하기

전체 attention score 변화량을 통해 변화환 부분 시각화하기

내가 원하는 것처럼 특정 타겟에 대한 가중치만 학습을 하니, 기존 target에 대한 부분의 score는 유지하다 보니, 변화량이 0이 나오고, 변화된 부분에 대한 score만 얻을 수 있는 것을 확인했다.

model.eval()
outputs = model(X_train)  # hidden state 값을 반환 받음
after_score = deepcopy(model.score2.detach().cpu().numpy().mean(axis=0))
diff_score = ori_score - after_score
print(diff_score.max())
output_dim , num_head, _ ,_ = diff_score.shape
fig , ax = plt.subplots(figsize=(25,15),nrows=output_dim,ncols=num_head)
axes = ax.flatten()
idx = 0
for output_idx in range(output_dim) :
    for n_head in range(num_head) :
        axes[idx].imshow(diff_score[output_idx,n_head,:,:],label=f"output_{output_idx}_head_{n_head}")
        axes[idx].set_title(f"attention score output_{output_idx}_head_{n_head}")
        idx+= 1
plt.show()

 
 

결론

이런 식으로 multihead attention 구조를 사용하여, 한꺼번에 여러 개의 타깃을 학습할 수 있는 구조를 만들 수 있다.
multi head attention이 다른 피처를 뽑아낸다는 아이디어에서 시작을 해봤고,
각각의 특징을 잘 뽑아낼 수 있는 구조라고 생각한다..
이렇게 하는 것을 본 적은 없지만, 새로운 시도...!
 
 
이 구조에 대한 장점은 무엇일까...
모델이 커지면서 메모리 사용량에 이슈가 있다(소 잡는 칼로 닭 잡는 느낌..)
- 전체적으로 큰 모델을 만들고, 부분적으로 고도화할 때 특정 head만 따로 뽑아서 쓸 수 있는 것 같다
즉 추후에 특정 타겟에 대해서만 별도의 작업을 한다고 했을 때 lookup table처럼 활용할 수 있는 구조
 
단점은, 타겟 개수에 따라 모델이 커지게 되는데, 이 때 부분적으로 뽑아낼 수 없다는 것이 단점으로 보인다.
차라리 저렇게 쓸꺼면, layer를 별도로 추가하는 방향이 좋을 수도 있어 보인다...

728x90