Python) multiprocessing 코어 수 제한해서 돌리기

2023. 2. 4. 10:12분석 Python/구현 및 자료

728x90

이 글의 목적은 병렬코드를 작성할 때 자기에게 주어진 코어가 제한되어 있는 경우에 대한 팁이 담겨 있는 글이다.

 

팁을 적기 전에 언제 병렬코드가 필요하고, 또 언제 코어 수를 제한해야 하는 지 생각해 보자.

작업을 하다 보면, loop를 쓰는 경우가 많고, task마다 다르겠지만, 각각의 job은 서로 관련이 없는 경우를 푸는 경우가 있다. 

이럴 때 가장 쉽게 생각할 수 있는 것은 병렬로 코드를 짜는 것을 생각한다.

 

아래 그림처럼 100,000개를 처리할 때 각각의 record끼리 관계가 없다고 하면, 해당 작업을 끝내는 동안 계속 기다려야 한다.

바쁜 우리들은 할 게 많기 때문에 이 시간을 줄이고 싶다는 생각을 하게 되고, 결국 Multi-Processing을 사용하려고 검색을 하게 된다.

아래 그림처럼 위의 100,000개를 자기가 사용하는 코어 수에 맞게 쓰고 싶어서 해당 작업을 4 등분하여 worker에거 할당하고 다시 수집하면, 위의 코드와 동일한 결과를 얻을 수 있게 된다.

 

그래서 보통 아래와 같은 코드를 돌게하고 flow를 보면 다음과 같은 형태가 될 것이다.

각각의 worker(sleepy_man)의 작업을 수행하고, 두 개가 다 끝나는 시점에 join을 통해 취합 후 전체 시간을 계산하는 방식으로 보통 하게 된다.

import multiprocessing
import time

def sleepy_man():
    print('Starting to sleep')
    time.sleep(1)
    print('Done sleeping')

tic = time.time()
p1 =  multiprocessing.Process(target= sleepy_man)
p2 =  multiprocessing.Process(target= sleepy_man)
p1.start()
p2.start()
p1.join()
p2.join()
toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

그래서 10개로 만약 늘린다고 하면 다음과 같은 코드가 된다.

아래 코드는 결국 10개의 process를 만들어진 후에 실행을 하다 보니, loop 할 개수가 많게 되면 문제가 될 수 있다.

import multiprocessing
import time

def sleepy_man(sec):
    print('Starting to sleep')
    time.sleep(sec)
    print('Done sleeping')

tic = time.time()

process_list = []
for i in range(10):
    p =  multiprocessing.Process(target= sleepy_man, args = [2])
    p.start()
    process_list.append(p)

for process in process_list:
    process.join()

toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

 

그래서 보통은 Pool을 사용해서 num_workers를 제한하고 작업을 하게 된다.

아래 코드는 5개의 코어로 제한하고, 10개를 수행할 때 앞에 작업이 끝나면 돌게 하는 코드이다.

import multiprocessing
import time
from multiprocessing import Manager

SharedValue = Manager().Value("i", 0)


def sleepy_man(sec):
    print("Starting to sleep for {} seconds".format(sec))
    time.sleep(sec)
    print("Done sleeping for {} seconds".format(sec))
    SharedValue.value += 1 

tic = time.time()

pool = multiprocessing.Pool(5)
pool.map(sleepy_man, range(1, 11))
pool.close()

toc = time.time()

print("Done in {:.4f} seconds".format(toc - tic))

SharedValue.value
## 10

다른 방식으로 구현해 봄

## Done in 15.0389 seconds

import multiprocessing
import time
from multiprocessing import Manager

SharedValue = Manager().Value("i", 0)

from functools import partial

def sleepy_man(sec,share_value):
    print("Starting to sleep for {} seconds".format(sec))
    time.sleep(sec)
    print("Done sleeping for {} seconds".format(sec))
    share_value.value += 1 
    return sec 
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
tic = time.time()

pool = multiprocessing.Pool(5)
pool.map(sleepy_man2, range(1, 11))
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pool.close()

toc = time.time()

print("Done in {:.4f} seconds".format(toc - tic))

SharedValue.value

 

근데 막상 쓰려고 하니... 내가 한 코드가 Pool 코드로 충분히 대체할 수 있었다

 

괜히 고생해서 짠 코드....ㅠ

Pool만 잘 알았어도 이렇게 구현해서 안 짜도 될 것 같았다...

결국 하고자 했던 것은 동일한 것 같다...

 

기존에 하고자 했던 것도 결국 Pool과 동일하다.

process를 정하면 그 숫자만큼만 돌다가 다음 process들이 도는 것이다. 

더 잘 구현된 것 같다.

 

## Done in 15.0693 seconds

import multiprocessing as mp
from tqdm import tqdm


def chunk(list, size):
    from itertools import islice

    arr_range = iter(list)
    return iter(lambda: tuple(islice(arr_range, size)), ())


def worker(arr, sharedValue=None, sharedList=None, sharedDict=None):
    raise NotImplementedError


def run_parallel(chunk_list, max_iter, worker, mp_count=1, worker_kwargs={}):
    count = 0
    temp = []
    pbar = tqdm(chunk_list)
    for idx, arr in enumerate(pbar):
        count += 1
        temp.append(arr)
        if ((idx > 0) & (count % mp_count == 0)) | (count == max_iter):
            p = []
            for idx2, tmp in enumerate(temp):
                p.append(mp.Process(target=worker, args=(tmp,), kwargs=worker_kwargs))
                p[idx2].start()
            else:
                for idx2 in range(len(temp)):
                    p[idx2].join()
                else:
                    p = []
                    temp = []
    else:
        print("shared object를 확인하세요")
        print(worker_kwargs)


import time

chunk_list = range(1, 11)
max_iter = 10

from functools import partial
from multiprocessing import Manager
SharedValue = Manager().Value("i", 0)
def sleepy_man(sec,share_value):
    print("Starting to sleep for {} seconds".format(sec))
    time.sleep(sec)
    print("Done sleeping for {} seconds".format(sec))
    share_value.value += 1 
    return sec 
tic = time.time()
run_parallel(
    chunk_list=chunk_list,
    max_iter=max_iter,
    worker=sleepy_man,
    mp_count=5,
    worker_kwargs=dict(share_value=SharedValue),
)
toc = time.time()
print("Done in {:.4f} seconds".format(toc - tic))

 

시간을 보니 0.03초가 내가 짠 코드가 더 느린 것 같다...

아마 내가 짠 코드 경우 경우 멀티코어 개수만큼 멈춰있다가 다시 데이터 쌓고 다시 start 하고 join을 하지만,

Pool 같은 경우 특정 process가 끝나면 다음 프로세스가 바로 돌게 되다 보니 시간적으로 더 빨리 도는 것  같다.

 

역시 구글링을 먼저 하고 했어야 했는데...

아쉽다...

다음부터는 Pool을 써서 구현하는 걸로...

 

아래와 같은 방법으로 백그라운드에서 수행을 하게 해주는 코드이다.


tic = time.time()
SharedValue = Manager().Value("i", 0)
pool = multiprocessing.Pool(5)
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
outputs_async = pool.map_async(sleepy_man2, range(1, 11))


while True :
    print(SharedValue)
    if SharedValue.value == 10 :
        toc = time.time()
        print("Done in {:.4f} seconds".format(toc - tic))
        print(outputs_async.get())
        break 



tic = time.time()
SharedValue = Manager().Value("i", 0)
pool = multiprocessing.Pool(5)
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
results_async = [pool.apply_async(sleepy_man2, [i]) for i in range(1,11)]

while True :
    print(SharedValue)
    if SharedValue.value == 10 :
        toc = time.time()
        print("Done in {:.4f} seconds".format(toc - tic))
        break

결괏값을 받아서 이후체 처리할 것은 return으로 넣어도 될 것 같긴 했지만, share 하는 것에 대한 테스트를 수행해 봤다

약간의 이슈로는 동시에 끝나는 작업이 많으면 share 하는 값에서 문제가 생기는 것을 확인하였다.


def sleepy_man(sec,share_value):
    print("Starting to sleep for {} seconds".format(sec))
    time.sleep(1)
    print("Done sleeping for {} seconds".format(sec))
    share_value.value += 1 
    
tic = time.time()
SharedValue = Manager().Value("i", 0)
pool = multiprocessing.Pool(5)
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
results_async = [pool.apply_async(sleepy_man2, [i]) for i in range(1,11)]



while True :
    print(SharedValue)
    if SharedValue.value == 10 :
        toc = time.time()
        print("Done in {:.4f} seconds".format(toc - tic))
        break

 

 

참고

https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments

https://www.analyticsvidhya.com/blog/2021/04/a-beginners-guide-to-multi-processing-in-python/

728x90