Pytorch) multioutput Regression 구현해보기

2022. 3. 26. 19:31분석 Python/Pytorch

 

Pytorch에서 multioutput regression을 구현해보고자 한다.

구현하게 된 이유는 sckit-learn에서 RegressorChain이라는 것을 보고 도움이 될 것 같아서 해보려고 한다.

 

 

실제 구현된 코드를 보니 다음과 같이 구현이 되어 있었다.

구현된 방식은 Chain인 경우에 X에다가 y 예측값을 붙이고, 또 그것을 사용해서 다시 y를 예측하는 방식이었다.

이 부분을 참고해서 딥러닝이다 보니 약간은 다르게 구현을 해봤다.

class _BaseChain(BaseEstimator, metaclass=ABCMeta):
    def __init__(self, base_estimator, *, order=None, cv=None, random_state=None):
        self.base_estimator = base_estimator
        self.order = order
        self.cv = cv
        self.random_state = random_state

    @abstractmethod
    def fit(self, X, Y, **fit_params):
        """Fit the model to data matrix X and targets Y.
        Parameters
        ----------
        X : {array-like, sparse matrix} of shape (n_samples, n_features)
            The input data.
        Y : array-like of shape (n_samples, n_classes)
            The target values.
        **fit_params : dict of string -> object
            Parameters passed to the `fit` method of each step.
            .. versionadded:: 0.23
        Returns
        -------
        self : object
            Returns a fitted instance.
        """
        X, Y = self._validate_data(X, Y, multi_output=True, accept_sparse=True)

        random_state = check_random_state(self.random_state)
        self.order_ = self.order
        if isinstance(self.order_, tuple):
            self.order_ = np.array(self.order_)

        if self.order_ is None:
            self.order_ = np.array(range(Y.shape[1]))
        elif isinstance(self.order_, str):
            if self.order_ == "random":
                self.order_ = random_state.permutation(Y.shape[1])
        elif sorted(self.order_) != list(range(Y.shape[1])):
            raise ValueError("invalid order")

        self.estimators_ = [clone(self.base_estimator) for _ in range(Y.shape[1])]

        if self.cv is None:
            Y_pred_chain = Y[:, self.order_]
            if sp.issparse(X):
                X_aug = sp.hstack((X, Y_pred_chain), format="lil")
                X_aug = X_aug.tocsr()
            else:
                X_aug = np.hstack((X, Y_pred_chain))

        elif sp.issparse(X):
            Y_pred_chain = sp.lil_matrix((X.shape[0], Y.shape[1]))
            X_aug = sp.hstack((X, Y_pred_chain), format="lil")

        else:
            Y_pred_chain = np.zeros((X.shape[0], Y.shape[1]))
            X_aug = np.hstack((X, Y_pred_chain))

        del Y_pred_chain

        for chain_idx, estimator in enumerate(self.estimators_):
            y = Y[:, self.order_[chain_idx]]
            estimator.fit(X_aug[:, : (X.shape[1] + chain_idx)], y, **fit_params)
            if self.cv is not None and chain_idx < len(self.estimators_) - 1:
                col_idx = X.shape[1] + chain_idx
                cv_result = cross_val_predict(
                    self.base_estimator, X_aug[:, :col_idx], y=y, cv=self.cv
                )
                if sp.issparse(X_aug):
                    X_aug[:, col_idx] = np.expand_dims(cv_result, 1)
                else:
                    X_aug[:, col_idx] = cv_result

        return self

    def predict(self, X):
        """Predict on the data matrix X using the ClassifierChain model.
        Parameters
        ----------
        X : {array-like, sparse matrix} of shape (n_samples, n_features)
            The input data.
        Returns
        -------
        Y_pred : array-like of shape (n_samples, n_classes)
            The predicted values.
        """
        check_is_fitted(self)
        X = self._validate_data(X, accept_sparse=True, reset=False)
        Y_pred_chain = np.zeros((X.shape[0], len(self.estimators_)))
        for chain_idx, estimator in enumerate(self.estimators_):
            previous_predictions = Y_pred_chain[:, :chain_idx]
            if sp.issparse(X):
                if chain_idx == 0:
                    X_aug = X
                else:
                    X_aug = sp.hstack((X, previous_predictions))
            else:
                X_aug = np.hstack((X, previous_predictions))
            Y_pred_chain[:, chain_idx] = estimator.predict(X_aug)

        inv_order = np.empty_like(self.order_)
        inv_order[self.order_] = np.arange(len(self.order_))
        Y_pred = Y_pred_chain[:, inv_order]

        return Y_pred

 

Implementation

Library Load

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import numpy as np
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from sklearn.datasets import make_regression
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from IPython import display

Data 준비

여기서 딥러닝이다 보니 스케일이 중요하기 때문에, 스케일은 0 주변으로 있게 표준화를 해준다.

 


X, y = make_regression(n_samples=1000, n_features=10, n_informative=5, n_targets=3, random_state=1, noise=0.5)
X.shape , y.shape

# ((1000, 10), (1000, 3))

scaler=  StandardScaler()
y = scaler.fit_transform(y)

Torch Custom Dataset 만들기

class TabularDataSet(Dataset) :
    def __init__(self, X , Y) :
        self._X = np.float32(X)
        self._Y = Y

    def __len__(self,) :
        return len(self._Y)

    def __getitem__(self,idx) :
        return self._X[idx], self._Y[idx]

Design Model (Multi-Output)

이 부분은 일반적으로 하는 X를 넣으면 여러 개의 Input을 내보는 방식으로 진행하는 버전

class MultOutRegressor(nn.Module):
    def __init__(self, input_dim, target_dim, hidden_dim=32,seed=1234):
        super().__init__()
        torch.manual_seed(seed)
        self.hidden_dim = hidden_dim
        self.target_dim = target_dim

        self.fc1 = nn.Linear(input_dim, self.hidden_dim)
        self.fc2 = nn.Linear(self.hidden_dim, self.hidden_dim)
        self.fc3 = nn.Linear(self.hidden_dim, self.target_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        return x

 

Design Model (Chained Multi-Output)

이 부분이 Chain 형태를 반영한 코드이다.

사실 이 부분은 뭔가 실제로 데이터가 이런 chain 형태로 이루어져서 생성이 될 때는 한 번에 생성하는 것보다 이런 식으로 생성하는 것이 학습에 도움이 될 것 같아서 구성해봤다.

기존의 코드랑은 차이점은 기존 scikit-learn에서는 데이터에다가 Y를 붙여서 새롭게 모델링하는 반면에, 나는 여기서 

X에 대한 latent feature를 공유하면서, 그 공유된 값에 예측된 y를 붙여서 사용했다.

사실 이 부분은 정답이 없는 부분이지만, 네트워크를 굳이 크게 만들 필요는 없을 것 같아서 이런 식으로 진행했다.

 

class MultOutChainedRegressor(nn.Module):
    def __init__(self, input_dim, target_dim,order, hidden_dim=32,seed=1234 ):
        super().__init__()
        torch.manual_seed(seed)
        self.hidden_dim = hidden_dim
        self.target_dim = target_dim
        self._order = order 
        assert len(self._order) == self.target_dim
        assert min(self._order) == 0 

        self.fc1 = nn.Linear(input_dim, self.hidden_dim)
        self.fc2 = nn.Linear(self.hidden_dim, self.hidden_dim)
        self.output_seq = []
        self.nested_dim = self.hidden_dim
        self.output_rank = {}
        for idx , order in enumerate(self._order) : 
            self.output_seq.append(nn.Linear(self.nested_dim, 1))
            self.nested_dim += 1 
            self.output_rank[idx] = order 
        else :
            self.linears = nn.ModuleList(self.output_seq)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        _x = x
        last_vector = torch.zeros(size=(x.size()[0],self.target_dim))
        for idx , rank in self.output_rank.items() :
            y_hat = self.output_seq[idx](_x)
            last_vector[:,[rank]] += y_hat
            _x = torch.cat([_x,y_hat],axis=1)
        return last_vector

Pytorch 학습 관련 함수들

def update(input , target , model, criterion , optimizer,max_norm=5) :
    optimizer.zero_grad()
    output = model(input)
    loss = criterion(output , target.float())
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
    optimizer.step()
    return loss 

def one_epoch(dataloader , model, criterion , optimizer ) :
    result = torch.FloatTensor([0])
    for idx , (input , target) in enumerate(dataloader) :
        loss = update(input , target , model, criterion , optimizer)
        result = torch.add(result , loss)
    else :
        result /= idx+1
        return result.detach().cpu().numpy()

def visualize(result) :
    display.clear_output(wait=True)
    plt.plot(result)
    plt.show()

def train(n_epochs , dataloader , model, criterion , optimizer , log_interval=10) :
    epoch_loss = []
    for epoch in range(n_epochs) :
        loss = one_epoch(dataloader , model, criterion , optimizer )
        if epoch > 0 :
            epoch_loss.append(loss)
        if epoch % log_interval == 0 :
            visualize(epoch_loss)
    else :
        return np.min(epoch_loss)

Train ( Multi-Output)

tabulardataset = TabularDataSet(X,y) # 간단 예시
train_dl = DataLoader(tabulardataset) # 간단 예시
model = MultOutRegressor(10 , 3)
optimizer = optim.AdamW(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
train(500, train_dl ,model, criterion , optimizer,log_interval=50)

 

Train ( Multi-Output Chain)

model = MultOutChainedRegressor(10 , 3 , order=[2,1,0])
optimizer = optim.AdamW(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
train(500, train_dl ,model, criterion , optimizer,log_interval=50)

 

결론

Multi-Output에 대해서 알아봤다.

Output 끼리의 종속성이 없다고 하면, 그냥 한 번에 예측하는 방법이 맞는 것 같고, 만약 예측 값끼리의 어떤 의존성이 있고, 이를 모델링하게 위해서 가장 간단한 방법을 구현해 본 것 같다.

Reference

https://machinelearningmastery.com/multi-output-regression-models-with-python/

 

How to Develop Multi-Output Regression Models with Python

Multioutput regression are regression problems that involve predicting two or more numerical values given an input example. An example might […]

machinelearningmastery.com

https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/multioutput.py#L841

 

GitHub - scikit-learn/scikit-learn: scikit-learn: machine learning in Python

scikit-learn: machine learning in Python. Contribute to scikit-learn/scikit-learn development by creating an account on GitHub.

github.com

 

728x90