[Pytorch] Pytorch를 Keras처럼 API 호출 하는 방식으로 사용하는 방법
2020. 8. 25. 23:21ㆍ분석 Python/Pytorch
파이토치에서 케라스처럼 API를 만들어서 하는 경우를 발견하게 돼서 공유한다.
실제로 이 규격에 맞춰서 LSTM도 만들수도 있을 것 같다(조금 복잡할 것 같기도 하지만)
기존 코드를 조금씩 바꿔서 해봤고, 아래에는 Custom Dataset에 대해서 새로 만들어서 진행해봤다.
이런 좋은 코드를 공유해주신 다른 분들이 고맙고, 나도 조금 수정을 해서 공유를 한다.
다들 즐거운 코딩하시길!
Library Load
import torch
from torch import nn
from torch import optim
from torch.autograd import Variable
from torchsummary import summary as summary_
import pkbar
import warnings
warnings.filterwarnings('ignore')
Input
def Input(shape):
Input.shape = shape
return Input.shape
Dense Class
class Dense(nn.Module):
def __init__(self, outputs, activation):
super().__init__()
self.outputs = outputs
self.activation = activation
def __call__(self, inputs):
self.inputs_size = 1
if type(inputs) == tuple:
for i in range(len(inputs)):
self.inputs_size *= inputs[i]
self.layers = nn.Sequential(
nn.Linear(self.inputs_size, self.outputs),
self.activation)
return self.layers
elif isinstance(inputs[-2], nn.Linear):
self.inputs = inputs
self.layers = list(self.inputs)
self.layers.extend([nn.Linear(self.layers[-2].out_features, self.outputs),
self.activation])
self.layers = nn.Sequential(*self.layers)
return self.layers
else:
self.inputs = inputs
self.layers = list(self.inputs)
self.layers.extend([nn.Linear(get_conv_output(Input.shape, self.inputs), self.outputs),
self.activation])
self.layers = nn.Sequential(*self.layers)
return self.layers
Model Class
주의 깊게 볼 것은 .summary와 pkbar이다.
class Model():
def __init__(self, inputs, outputs, device):
self.input_size = inputs
self.device = device
self.model = outputs.to(self.device)
def parameters(self):
return self.model.parameters()
def compile(self, optimizer, loss):
self.opt = optimizer
self.criterion = loss
def summary(self, batch_size=-1):
summary_(self.model, self.input_size, device=self.device,
batch_size=batch_size)
print("Device Type:", self.device)
def fit(self, data_x, data_y, epochs):
self.model.train()
for epoch in range(epochs):
print("Epoch {}/{}".format(epoch+1, epochs))
progress = pkbar.Kbar(target=len(data_x), width=25)
for i, (data, target) in enumerate(zip(data_x, data_y)):
self.opt.zero_grad()
train_out = self.model(data.to(self.device))
loss = self.criterion(train_out, target.to(self.device))
loss.backward()
self.opt.step()
progress.update(i, values=[("loss: ", loss.item())])
progress.add(1)
def evaluate(self, test_x, test_y):
self.model.eval()
correct, loss = 0.0, 0.0
progress = pkbar.Kbar(target=len(test_x), width=25)
for i, (data, target) in enumerate(zip(test_x, test_y)):
out = self.model(data.to(self.device))
loss += self.criterion(out, target.to(self.device))
correct += ((torch.max(out, 1)[1]) == target.to(self.device)).sum()
progress.update(i, values=[("loss", loss.item()/len(test_x)), ("acc", (correct/len(test_x)).item())])
progress.add(1)
def fit_generator(self, generator, epochs):
self.model.train()
for epoch in range(epochs):
print("Epoch {}/{}".format(epoch+1, epochs))
progress = pkbar.Kbar(target=len(generator), width=25)
for i, (data, target) in enumerate(generator):
self.opt.zero_grad()
train_out = self.model(data.to(self.device))
loss = self.criterion(train_out.squeeze(), target.to(self.device))
loss.backward()
self.opt.step()
progress.update(i, values=[("loss: ", loss.item())])
progress.add(1)
def evaluate_generator(self, generator):
self.model.eval()
progress = pkbar.Kbar(target=len(generator), width=25)
for i, (data, target) in enumerate(generator):
correct, loss = 0.0, 0.0
out = self.model(data.to(self.device))
loss += self.criterion(out.squeeze(), target.to(self.device))
if out.size()[1] == 1 :
pred = (out > 0.5).float() * 1
correct += (pred.to(self.device) == target.to(self.device)).sum().item()
else :
correct += (torch.max(out.squeeze(), 1)[1] == target.to(self.device)).sum()
progress.update(i, values=[("test_acc", np.round(correct/out.size()[0],4)*100),
("test_loss", loss.item())])
progress.add(1)
def predict_generator(self, generator):
self.model.train()
out = []
for i, (data, labels) in enumerate(generator):
out.append(self.model(data.to(self.device)))
return out
Fully Connected Neural Network
device = 'cuda' if torch.cuda.is_available() else 'cpu'
inputs = Input((1, 1024))
x = Dense(54, nn.ReLU())(inputs)
y = Dense(34, nn.ReLU())(x)
z = Dense(43, activation=nn.ReLU())(y)
a = Dense(10, activation=nn.ReLU())(z)
b = Dense(1, activation=nn.Sigmoid())(a)
model = Model(inputs, b, 'cpu')
model.compile(optim.Adam(model.parameters(), lr=0.001), nn.BCELoss())
model.summary()
torch.manual_seed(42)
#input should be of the dimension - (batch_size, 1, n_rows*n_columns)
x = torch.rand((10, 1, 1024), dtype=torch.float)
y = torch.tensor((torch.rand(10, 1) < 0.5), dtype=torch.float)
#y = torch.tensor([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]], dtype=torch.int64).reshape(-1, 1)
model.fit(x, y, 4)
pkbar를 사용하면 이처럼 나온다!
model.evaluate(x, y)
Convolutional Neural Network
def get_conv_output(shape, inputs):
bs = 1
data = Variable(torch.rand(bs, *shape))
output_feat = inputs(data)
return output_feat.size(1)
def same_pad(h_in, kernal, stride, dilation):
return (stride*(h_in-1)-h_in+(dilation*(kernal-1))+1) / 2.0
class FlattenedLayer(nn.Module):
def __init__(self):
super().__init__()
pass
def forward(self, input):
self.inputs = input.view(input.size(0), -1)
return self.inputs
class Flatten():
def __init__(self):
pass
def __call__(self, inputs):
self.inputs = inputs
self.layers = list(self.inputs)
self.layers.extend([FlattenedLayer()])
self.layers = nn.Sequential(*self.layers)
return self.layers
class Conv2d(nn.Module):
def __init__(self, filters, kernel_size, strides, padding, dilation, activation):
super().__init__()
self.filters = filters
self.kernel = kernel_size
self.strides = strides
self.padding = padding
self.dilation = dilation
self.activation = activation
def __call__(self, inputs):
if type(inputs) == tuple:
self.inputs_size = inputs
if self.padding == 'same':
self.padding = int(same_pad(self.inputs_size[-2], self.kernel, self.strides, self.dilation))
else:
self.padding = self.padding
self.layers = nn.Sequential(
nn.Conv2d(self.inputs_size[-3],
self.filters,
self.kernel,
self.strides,
self.padding,
self.dilation),self.activation)
return self.layers
else:
if self.padding == 'same':
self.padding = int(same_pad(get_conv_output(Input.shape, inputs), self.kernel, self.strides, self.dilation))
else:
self.padding = self.padding
self.inputs = inputs
self.layers = list(self.inputs)
self.layers.extend(
[nn.Conv2d(self.layers[-2].out_channels,
self.filters,
self.kernel,
self.strides,
self.padding,
self.dilation),self.activation])
self.layers = nn.Sequential(*self.layers)
return self.layers
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
bs = 128
transform = transforms.Compose([transforms.Grayscale(num_output_channels=1),
transforms.Resize((1, 28*28)),
transforms.ToTensor()
])
trainset = torchvision.datasets.CIFAR100(root='./../../../Data', train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR100(root='./../../../Data', train=False, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=bs, shuffle=True)
testloader = DataLoader(testset, batch_size=bs)
input = Input((1, 784))
x = Dense(1000, nn.ReLU())(input)
y = Dense(500, nn.ReLU())(x)
z = Dense(700, activation=nn.ReLU())(y)
a = Dense(200, activation=nn.ReLU())(z)
b = Dense(100, activation=nn.ReLU())(a)
model = Model(input, b, device)
model.compile(optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss())
model.summary()
model.fit_generator(trainloader, 3)
model.evaluate_generator(testloader)
out = model.predict_generator(testloader)
배치 사이즈 DataLoader 사용하기
bs = 128
transform = transforms.ToTensor()
trainset = torchvision.datasets.CIFAR100(root='./../../../Data', train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR100(root='./../../../Data', train=False, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=bs, shuffle=True)
testloader = DataLoader(testset, batch_size=bs)
input = Input((3, 32, 32))
x = Conv2d(3, 3, 1, 'same', 1, nn.ReLU())(input)
y = Conv2d(5, 3, 1, 0, 1, nn.ReLU())(x)
z = Conv2d(6, 3, 1, 'same', 1, nn.ReLU())(y)
a = Flatten()(z)
b = Dense(100, activation=nn.ReLU())(a)
model = Model(input, b, device)
model.compile(optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss())
model.summary()
model.fit_generator(trainloader, 3)
model.evaluate_generator(testloader)
out = model.predict_generator(testloader)
torch.max(out[0], 1)[1]
tensor([99, 99, 99, 99, 60, 75, 30, 75, 60, 99, 99, 75, 99, 99, 29, 99, 53, 29,
99, 53, 61, 99, 99, 30, 75, 99, 99, 75, 99, 53, 91, 61, 41, 30, 29, 30,
91, 41, 91, 99, 99, 99, 75, 30, 29, 61, 75, 99, 99, 99, 30, 75, 99, 99,
99, 99, 60, 30, 30, 30, 99, 60, 75, 30, 53, 53, 99, 41, 99, 99, 99, 99,
61, 30, 99, 99, 99, 61, 99, 61, 30, 61, 30, 75, 99, 30, 99, 99, 53, 30,
30, 99, 75, 99, 75, 75, 99, 99, 75, 53, 30, 53, 99, 99, 75, 75, 99, 99,
53, 30, 61, 41, 30, 61, 53, 60, 41, 75, 99, 99, 75, 99, 30, 53, 99, 61,
75, 75])
Custom DataLoader를 만들어서 학습시켜보기(Binary classification)
from sklearn.datasets import load_breast_cancer
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
dataset = load_breast_cancer()
bs = 32
class CustomDataset(Dataset):
def __init__(self,x,y):
self.x_data = x
self.y_data = y
def __len__(self):
return len(self.x_data)
def __getitem__(self, idx):
x = torch.FloatTensor(self.x_data[idx,:])
y = torch.FloatTensor(self.y_data[idx,:])
return x, y
custom_dataset = CustomDataset(dataset.data ,dataset.target.reshape(-1,1))
trainloader = DataLoader(custom_dataset, batch_size=bs, shuffle=True)
custom_dataset[0][0].size()[0]
device = 'cuda' if torch.cuda.is_available() else 'cpu'
input = Input((1, custom_dataset[0][0].size()[0]))
layer = Dense(15, nn.ReLU())(input)
layer = Dense(5, activation=nn.ReLU())(layer)
layer = Dense(1, activation=nn.Sigmoid())(layer)
model = Model(input, layer, device)
model.compile(optim.Adam(model.parameters(), lr=0.001), nn.BCELoss())
model.summary(batch_size=bs)
model.fit_generator(trainloader, 100)
model.evaluate_generator(trainloader)
out = model.predict_generator(trainloader)
https://towardsdatascience.com/recreating-keras-functional-api-with-pytorch-cc2974f7143c
http://zetcode.com/python/fstring/
728x90
'분석 Python > Pytorch' 카테고리의 다른 글
[skorch] VAE 적용 구현해보기 (0) | 2020.09.22 |
---|---|
[Pytorch] torch에서 모델 summary 확인하는 방법 (0) | 2020.08.25 |
[Pytorch] LSTM AutoEncoder for Anomaly Detection (3) | 2020.08.23 |
Pytorch 1.6 Release Note Information (0) | 2020.08.21 |
[Pytorch] MixtureSameFamily 을 사용해서 bimodal distribution 만들기 (0) | 2020.05.05 |