[Pytorch] Pytorch를 Keras처럼 API 호출 하는 방식으로 사용하는 방법

2020. 8. 25. 23:21분석 Python/Pytorch

 

파이토치에서 케라스처럼 API를 만들어서 하는 경우를 발견하게 돼서 공유한다.

실제로 이 규격에 맞춰서 LSTM도 만들수도 있을 것 같다(조금 복잡할 것 같기도 하지만)

기존 코드를 조금씩 바꿔서 해봤고, 아래에는 Custom Dataset에 대해서 새로 만들어서 진행해봤다.

이런 좋은 코드를 공유해주신 다른 분들이 고맙고, 나도 조금 수정을 해서 공유를 한다.

다들 즐거운 코딩하시길! 

Library Load

import torch
from torch import nn
from torch import optim
from torch.autograd import Variable
from torchsummary import summary as summary_
import pkbar

import warnings
warnings.filterwarnings('ignore')

Input

def Input(shape):
    Input.shape = shape
    return Input.shape

Dense Class

class Dense(nn.Module):
    def __init__(self, outputs, activation):
        super().__init__()
        self.outputs = outputs
        self.activation = activation
    def __call__(self, inputs):
        self.inputs_size = 1
        if type(inputs) == tuple:
            for i in range(len(inputs)):
                self.inputs_size *= inputs[i]
            self.layers = nn.Sequential(
                nn.Linear(self.inputs_size, self.outputs),
                self.activation)
            return self.layers
        elif isinstance(inputs[-2], nn.Linear):
            self.inputs = inputs
            self.layers = list(self.inputs)
            self.layers.extend([nn.Linear(self.layers[-2].out_features, self.outputs), 
                                self.activation])
            self.layers = nn.Sequential(*self.layers)
            return self.layers
        else:
            self.inputs = inputs
            self.layers = list(self.inputs)
            self.layers.extend([nn.Linear(get_conv_output(Input.shape, self.inputs), self.outputs), 
                                self.activation])
            self.layers = nn.Sequential(*self.layers)
            return self.layers

Model Class

주의 깊게 볼 것은 .summary와 pkbar이다.

class Model():
    def __init__(self, inputs, outputs, device):
        self.input_size = inputs
        self.device = device
        self.model = outputs.to(self.device)
        
    def parameters(self):
        return self.model.parameters()

    def compile(self, optimizer, loss):
        self.opt = optimizer
        self.criterion = loss
        
    def summary(self, batch_size=-1):
        summary_(self.model, self.input_size, device=self.device, 
                 batch_size=batch_size)
        print("Device Type:", self.device)

    def fit(self, data_x, data_y, epochs):
        self.model.train()
        for epoch in range(epochs):
            print("Epoch {}/{}".format(epoch+1, epochs))
            progress = pkbar.Kbar(target=len(data_x), width=25)
            for i, (data, target) in enumerate(zip(data_x, data_y)):
                self.opt.zero_grad()
                train_out = self.model(data.to(self.device))
                loss = self.criterion(train_out, target.to(self.device))
                loss.backward()
                self.opt.step()
                progress.update(i, values=[("loss: ", loss.item())])
            progress.add(1)
    def evaluate(self, test_x, test_y):
        self.model.eval()
        correct, loss = 0.0, 0.0
        progress = pkbar.Kbar(target=len(test_x), width=25)
        for i, (data, target) in enumerate(zip(test_x, test_y)):
            out = self.model(data.to(self.device))
            loss += self.criterion(out, target.to(self.device))
            correct += ((torch.max(out, 1)[1]) == target.to(self.device)).sum()
        progress.update(i, values=[("loss", loss.item()/len(test_x)), ("acc", (correct/len(test_x)).item())])
        progress.add(1)


    def fit_generator(self, generator, epochs):
        self.model.train()
        for epoch in range(epochs):
            print("Epoch {}/{}".format(epoch+1, epochs))
            progress = pkbar.Kbar(target=len(generator), width=25)
            for i, (data, target) in enumerate(generator):
                self.opt.zero_grad()
                train_out = self.model(data.to(self.device))
                loss = self.criterion(train_out.squeeze(), target.to(self.device))
                loss.backward()
                self.opt.step()
            progress.update(i, values=[("loss: ", loss.item())])
            progress.add(1)
    def evaluate_generator(self, generator):
        self.model.eval()
        progress = pkbar.Kbar(target=len(generator), width=25)
        for i, (data, target) in enumerate(generator):
            correct, loss = 0.0, 0.0
            out = self.model(data.to(self.device))
            loss += self.criterion(out.squeeze(), target.to(self.device))
            if out.size()[1] == 1 :
                pred = (out > 0.5).float() * 1
                correct += (pred.to(self.device) == target.to(self.device)).sum().item()
            else :
                correct += (torch.max(out.squeeze(), 1)[1] == target.to(self.device)).sum()
        progress.update(i, values=[("test_acc", np.round(correct/out.size()[0],4)*100), 
                                   ("test_loss", loss.item())])
        progress.add(1)

    def predict_generator(self, generator):
        self.model.train()
        out = []
        for i, (data, labels) in enumerate(generator):
            out.append(self.model(data.to(self.device)))
        return out

Fully Connected Neural Network

 

device = 'cuda' if torch.cuda.is_available() else 'cpu'

inputs = Input((1, 1024))
x = Dense(54, nn.ReLU())(inputs)
y = Dense(34, nn.ReLU())(x)
z = Dense(43, activation=nn.ReLU())(y)
a = Dense(10, activation=nn.ReLU())(z)
b = Dense(1, activation=nn.Sigmoid())(a)

model = Model(inputs, b, 'cpu')
model.compile(optim.Adam(model.parameters(), lr=0.001), nn.BCELoss())

model.summary()

torch.manual_seed(42)
#input should be of the dimension - (batch_size, 1, n_rows*n_columns)
x = torch.rand((10, 1, 1024), dtype=torch.float)
y = torch.tensor((torch.rand(10, 1) < 0.5), dtype=torch.float)
#y = torch.tensor([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]], dtype=torch.int64).reshape(-1, 1)
model.fit(x, y, 4)

pkbar를 사용하면 이처럼 나온다!

model.evaluate(x, y)

Convolutional Neural Network

def get_conv_output(shape, inputs):
  bs = 1
  data = Variable(torch.rand(bs, *shape))
  output_feat = inputs(data)

  return output_feat.size(1)

def same_pad(h_in, kernal, stride, dilation):
  return (stride*(h_in-1)-h_in+(dilation*(kernal-1))+1) / 2.0
  
class FlattenedLayer(nn.Module):
    def __init__(self):
        super().__init__()
        pass
    def forward(self, input):
        self.inputs = input.view(input.size(0), -1)
        return self.inputs


class Flatten():
    def __init__(self):
        pass

    def __call__(self, inputs):
        self.inputs = inputs
        self.layers = list(self.inputs)
        self.layers.extend([FlattenedLayer()])
        self.layers = nn.Sequential(*self.layers)
        return self.layers
class Conv2d(nn.Module):
    def __init__(self, filters, kernel_size, strides, padding, dilation, activation):
        super().__init__()
        self.filters = filters
        self.kernel = kernel_size
        self.strides = strides
        self.padding = padding
        self.dilation = dilation
        self.activation = activation

    def __call__(self, inputs):
        if type(inputs) == tuple:
            self.inputs_size = inputs
            if self.padding == 'same':
                self.padding = int(same_pad(self.inputs_size[-2], self.kernel, self.strides, self.dilation))
            else:
                self.padding = self.padding
            self.layers = nn.Sequential(
                nn.Conv2d(self.inputs_size[-3],
                      self.filters, 
                      self.kernel, 
                      self.strides, 
                      self.padding,
                      self.dilation),self.activation)
            return self.layers
        else:
            if self.padding == 'same':
                self.padding = int(same_pad(get_conv_output(Input.shape, inputs), self.kernel, self.strides, self.dilation))
            else:
                self.padding = self.padding
            self.inputs = inputs
            self.layers = list(self.inputs)
            self.layers.extend(
             [nn.Conv2d(self.layers[-2].out_channels, 
                    self.filters, 
                    self.kernel, 
                    self.strides, 
                    self.padding,
                    self.dilation),self.activation])
            self.layers = nn.Sequential(*self.layers)
            return self.layers
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import numpy as np
import matplotlib.pyplot as plt
bs = 128

transform = transforms.Compose([transforms.Grayscale(num_output_channels=1),
                                transforms.Resize((1, 28*28)),
                                transforms.ToTensor()
])

trainset = torchvision.datasets.CIFAR100(root='./../../../Data', train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR100(root='./../../../Data', train=False, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=bs, shuffle=True)
testloader = DataLoader(testset, batch_size=bs)
input = Input((1, 784))
x = Dense(1000, nn.ReLU())(input)
y = Dense(500, nn.ReLU())(x)
z = Dense(700, activation=nn.ReLU())(y)
a = Dense(200, activation=nn.ReLU())(z)
b = Dense(100, activation=nn.ReLU())(a)

model = Model(input, b, device)
model.compile(optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss())
model.summary()
model.fit_generator(trainloader, 3)

 

model.evaluate_generator(testloader)

out = model.predict_generator(testloader)

배치 사이즈 DataLoader 사용하기

bs = 128

transform = transforms.ToTensor()
trainset = torchvision.datasets.CIFAR100(root='./../../../Data', train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR100(root='./../../../Data', train=False, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=bs, shuffle=True)
testloader = DataLoader(testset, batch_size=bs)
input = Input((3, 32, 32))
x = Conv2d(3, 3, 1, 'same', 1, nn.ReLU())(input)
y = Conv2d(5, 3, 1, 0, 1, nn.ReLU())(x)
z = Conv2d(6, 3, 1, 'same', 1, nn.ReLU())(y)
a = Flatten()(z)
b = Dense(100, activation=nn.ReLU())(a)

model = Model(input, b, device)
model.compile(optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss())
model.summary()

model.fit_generator(trainloader, 3)

model.evaluate_generator(testloader)

out = model.predict_generator(testloader)
torch.max(out[0], 1)[1]
tensor([99, 99, 99, 99, 60, 75, 30, 75, 60, 99, 99, 75, 99, 99, 29, 99, 53, 29,
        99, 53, 61, 99, 99, 30, 75, 99, 99, 75, 99, 53, 91, 61, 41, 30, 29, 30,
        91, 41, 91, 99, 99, 99, 75, 30, 29, 61, 75, 99, 99, 99, 30, 75, 99, 99,
        99, 99, 60, 30, 30, 30, 99, 60, 75, 30, 53, 53, 99, 41, 99, 99, 99, 99,
        61, 30, 99, 99, 99, 61, 99, 61, 30, 61, 30, 75, 99, 30, 99, 99, 53, 30,
        30, 99, 75, 99, 75, 75, 99, 99, 75, 53, 30, 53, 99, 99, 75, 75, 99, 99,
        53, 30, 61, 41, 30, 61, 53, 60, 41, 75, 99, 99, 75, 99, 30, 53, 99, 61,
        75, 75])

 

Custom DataLoader를 만들어서 학습시켜보기(Binary classification)

 

from sklearn.datasets import load_breast_cancer
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
dataset = load_breast_cancer()
bs = 32
class CustomDataset(Dataset): 
    def __init__(self,x,y):
        self.x_data = x
        self.y_data = y
    def __len__(self): 
        return len(self.x_data)
    def __getitem__(self, idx): 
        x = torch.FloatTensor(self.x_data[idx,:])
        y = torch.FloatTensor(self.y_data[idx,:])
        return x, y
        
custom_dataset = CustomDataset(dataset.data ,dataset.target.reshape(-1,1))
trainloader = DataLoader(custom_dataset, batch_size=bs, shuffle=True)
custom_dataset[0][0].size()[0]

device = 'cuda' if torch.cuda.is_available() else 'cpu'
input = Input((1, custom_dataset[0][0].size()[0]))
layer = Dense(15, nn.ReLU())(input)
layer = Dense(5, activation=nn.ReLU())(layer)
layer = Dense(1, activation=nn.Sigmoid())(layer)
model = Model(input, layer, device)
model.compile(optim.Adam(model.parameters(), lr=0.001),  nn.BCELoss())
model.summary(batch_size=bs)
model.fit_generator(trainloader, 100)

model.evaluate_generator(trainloader)

out = model.predict_generator(trainloader)

 

 

 

 

https://towardsdatascience.com/recreating-keras-functional-api-with-pytorch-cc2974f7143c

 

Recreating keras functional api with PyTorch

Let’s bring the keras experience to PyTorch

towardsdatascience.com

http://zetcode.com/python/fstring/

 

Python f-string tutorial - formatting strings in Python with f-string

Python f-string tutorial last modified July 6, 2020 Python f-string tutorial shows how to format strings in Python with f-string. Python f-string Python f-string is the newest Python syntax to do string formatting. It is available since Python 3.6. Python

zetcode.com

 

728x90