Ridge, Lasso, ElasticNet / train, test, coef 값 내뱉는 multiprocessing 함수 만들기

2019. 5. 6. 20:43분석 Python/Scikit Learn (싸이킷런)

728x90

피드백은 항상 좋아합니다.

https://github.com/sungreong/TIL/blob/master/Machine_Learning/Ridge%20%26%20Lasso%20%26%20Elastic%20Multiprocessing.ipynb

숙제로 제출해야 하는 게 있어서 만들어 봤습니다. 

기본적으로 loop로 해야 하는 구조여서 적어도 모델 만들어지는 것은 multiprocessing으로 하려고 했는데 오히려 느리게 됐다는... 3가지 버전을 보여주려고 합니다.

1. Loop ,  2, 잘못된 버전 3. 잘된 버전

import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.metrics import mean_squared_error
import pandas as pd
from multiprocessing import Pool
import re

그냥 Loop로 하는 코드

def Loop_Ridge_Lasso_Elastic(lambda_list) :
    ridge_train_mse  , ridge_test_mse = [] , []
    lasso_train_mse , lasso_test_mse = [] , [] 
    elastic_train_mse  , elastic_test_mse = [] , []
    lasso_coef , ridge_coef , elastic_coef = pd.DataFrame() , pd.DataFrame() , pd.DataFrame()
    for i in lambda_list :
        ridge = Ridge(alpha=i)
        ridge.fit(train_x , train_y )
        r_coef = pd.DataFrame(ridge.coef_)
        ridge_coef = pd.concat([ridge_coef , r_coef], axis = 0)
        train_mse = mean_squared_error(train_y , ridge.predict(train_x) )
        test_mse = mean_squared_error(test_y , ridge.predict(test_x) )
        ridge_train_mse.append(train_mse)
        ridge_test_mse.append(test_mse)
        lasso = Lasso(alpha= i)
        lasso.fit(train_x , train_y )
        l_coef = pd.DataFrame(lasso.coef_).T
        lasso_coef = pd.concat([lasso_coef , l_coef], axis = 0)
        train_mse = mean_squared_error(train_y , lasso.predict(train_x) )
        test_mse  = mean_squared_error(test_y , lasso.predict(test_x) )
        lasso_train_mse.append(train_mse)
        lasso_test_mse.append(test_mse)
        ela = ElasticNet(alpha=i)
        ela.fit(train_x , train_y )
        ela_coef = pd.DataFrame(ela.coef_).T
        elastic_coef = pd.concat([elastic_coef , ela_coef], axis = 0)
        train_mse = mean_squared_error(train_y , ela.predict(train_x) )
        test_mse  = mean_squared_error(test_y , ela.predict(test_x) )
        elastic_train_mse.append(train_mse)
        elastic_test_mse.append(test_mse)
    ridge_coef.reset_index(drop= True , inplace= True)
    lasso_coef.reset_index(drop= True , inplace= True)
    elastic_coef.reset_index(drop= True , inplace= True)
    return ridge_train_mse  , ridge_test_mse , lasso_train_mse , lasso_test_mse , elastic_train_mse  , elastic_test_mse , lasso_coef , ridge_coef , elastic_coef    

잘못된 Multiprocessing 버전

def multiprocessing_Ridge_Lasso_Elastic(lambda_list) :
    global find_mse
    def find_mse(reg) :
        reg.fit(train_x, train_y)
        coef = reg.coef_
        train_mse = mean_squared_error(train_y , reg.predict(train_x) )
        test_mse = mean_squared_error(test_y , reg.predict(test_x) )
        return train_mse , test_mse , coef
    
    ridge_train_mse  , ridge_test_mse = [] , []
    lasso_train_mse , lasso_test_mse = [] , [] 
    elastic_train_mse  , elastic_test_mse = [] , []
    lasso_coef , ridge_coef , elastic_coef = pd.DataFrame() , pd.DataFrame() , pd.DataFrame()
    for i in lambda_list : 
        regressors_list = [Ridge(alpha=i) ,Lasso(alpha= i) ,ElasticNet(alpha=i) ]
        if __name__ == '__main__':
            with Pool(3) as pool :
                output = pool.map(find_mse,regressors_list)
                pool.close()
                pool.join()
        ridge_train_mse.append(output[0][0])
        ridge_test_mse.append(output[0][1])
        lasso_train_mse.append(output[1][0])
        lasso_test_mse.append(output[1][1])
        elastic_train_mse.append(output[2][0])
        elastic_test_mse.append(output[2][1])
        ridge_coef = pd.concat([ridge_coef , pd.DataFrame(output[0][2])], axis = 0)
        lasso_coef = pd.concat([lasso_coef , pd.DataFrame(output[1][2]).T], axis = 0)
        elastic_coef = pd.concat([elastic_coef , pd.DataFrame(output[2][2]).T], axis = 0)
    return ridge_train_mse  , ridge_test_mse , lasso_train_mse , lasso_test_mse , elastic_train_mse  , elastic_test_mse , lasso_coef , ridge_coef , elastic_coef    

 

잘 된 Multiprocessing 

def multiprocessing_Ridge_Lasso_Elastic_v2(lambda_list) :
    global concat_data
    global find_mse
    def find_mse(reg) :
        reg.fit(train_x, train_y)
        coef = reg.coef_
        train_mse = mean_squared_error(train_y , reg.predict(train_x) )
        test_mse = mean_squared_error(test_y , reg.predict(test_x) )
        return train_mse , test_mse , coef
    def concat_data(i) :
        Ridge = pd.DataFrame(np.squeeze(np.array(Ridge_list)[:,2][i])).T
        Lasso = pd.DataFrame(np.squeeze(np.array(Lasso_list)[:,2][i])).T
        Elastic = pd.DataFrame(np.squeeze(np.array(Ela_list)[:,2][i])).T
        return Ridge , Lasso , Elastic

    regressors_list = []
    for i in lambda_list :
        for j in [Ridge(alpha=i) ,Lasso(alpha= i) ,ElasticNet(alpha=i) ] :
            regressors_list.append(j)

    if __name__ == '__main__':
        with Pool(10) as pool :
            output = pool.map(find_mse,regressors_list)
            pool.close()
            pool.join()
    Ridge_list = output[0::3]
    Lasso_list = output[1::3]
    Ela_list   = output[2::3]
    ridge_train_mse  , ridge_test_mse = np.array(Ridge_list)[:,0] , np.array(Ridge_list)[:,1]
    lasso_train_mse , lasso_test_mse = np.array(Lasso_list)[:,0] , np.array(Lasso_list)[:,1]
    elastic_train_mse  , elastic_test_mse = np.array(Ela_list)[:,0] , np.array(Ela_list)[:,1]
    
    if __name__ == '__main__':
        with Pool(10) as pool :
            output = pool.map(concat_data,range(len(lambda_list)))
            pool.close()
            pool.join()
            
    ridge_coef = pd.DataFrame(np.array(output)[:,0,:])
    lasso_coef = pd.DataFrame(np.array(output)[:,1,:])
    elastic_coef = pd.DataFrame(np.array(output)[:,2,:])
    return ridge_train_mse  , ridge_test_mse , lasso_train_mse , lasso_test_mse , elastic_train_mse  , elastic_test_mse , ridge_coef , lasso_coef , elastic_coef    

multiprocessing_Ridge_Lasso_Elastic_v2(lambda_list)

 

lambda_list = np.linspace(0.02 , 50.0 , 100)

저 함수들에 이것만 넣어주면 값을 출력해준다.

잘 된 multiprocessing 버전에는 몇가지 알아둬야 하는 게 있는데, 우리가 지금 구해야 하는 것은

100(lambda 경우의 수) x 3(모델 3개) 이걸 구해야 한다. 잘 못된 버전에서는 100개를 구하는데 모델 3개만 Multiprocessing 한 것이라서 느린 것 같다.

1. 그래서 다른 방법 저것을 길게 쭉 나열해서 300 x 1 개로 만들어서 구하는 방식으로 바꾸니 빨라졌다.

> 엄밀하게 속도 비교를 해봐야하지만, 원래 빨리 돌아가는 코드이기도 하고 믿어주시기를...

2. 그리고 한가지 더 주의해야 하는 것은 multiprocessing을 함수로 만들고 싶고 또 그것의 함수를 만들고 싶을 때

nestd 구조로 짜야하는데, 그때 그냥하면 안 되고 global을 사용해줘야 한다!

 

비교

실제 코드에서는 lambda list 가 100개 였을 때는 그냥 loop가 빠르지만, 숫자를 1000으로 늘리니 

multiprocessing 한 것이 더 빠르게 돌아간 것을 확인했다.

잘 못 짠 코드는 1분이 지나도 안돌아가서 그냥... 확인을 포기했다

비교 사진

728x90