2023. 2. 4. 10:12ㆍ분석 Python/구현 및 자료
이 글의 목적은 병렬코드를 작성할 때 자기에게 주어진 코어가 제한되어 있는 경우에 대한 팁이 담겨 있는 글이다.
팁을 적기 전에 언제 병렬코드가 필요하고, 또 언제 코어 수를 제한해야 하는 지 생각해 보자.
작업을 하다 보면, loop를 쓰는 경우가 많고, task마다 다르겠지만, 각각의 job은 서로 관련이 없는 경우를 푸는 경우가 있다.
이럴 때 가장 쉽게 생각할 수 있는 것은 병렬로 코드를 짜는 것을 생각한다.
아래 그림처럼 100,000개를 처리할 때 각각의 record끼리 관계가 없다고 하면, 해당 작업을 끝내는 동안 계속 기다려야 한다.
바쁜 우리들은 할 게 많기 때문에 이 시간을 줄이고 싶다는 생각을 하게 되고, 결국 Multi-Processing을 사용하려고 검색을 하게 된다.
아래 그림처럼 위의 100,000개를 자기가 사용하는 코어 수에 맞게 쓰고 싶어서 해당 작업을 4 등분하여 worker에거 할당하고 다시 수집하면, 위의 코드와 동일한 결과를 얻을 수 있게 된다.
그래서 보통 아래와 같은 코드를 돌게하고 flow를 보면 다음과 같은 형태가 될 것이다.
각각의 worker(sleepy_man)의 작업을 수행하고, 두 개가 다 끝나는 시점에 join을 통해 취합 후 전체 시간을 계산하는 방식으로 보통 하게 된다.
import multiprocessing
import time
def sleepy_man():
print('Starting to sleep')
time.sleep(1)
print('Done sleeping')
tic = time.time()
p1 = multiprocessing.Process(target= sleepy_man)
p2 = multiprocessing.Process(target= sleepy_man)
p1.start()
p2.start()
p1.join()
p2.join()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
그래서 10개로 만약 늘린다고 하면 다음과 같은 코드가 된다.
아래 코드는 결국 10개의 process를 만들어진 후에 실행을 하다 보니, loop 할 개수가 많게 되면 문제가 될 수 있다.
import multiprocessing
import time
def sleepy_man(sec):
print('Starting to sleep')
time.sleep(sec)
print('Done sleeping')
tic = time.time()
process_list = []
for i in range(10):
p = multiprocessing.Process(target= sleepy_man, args = [2])
p.start()
process_list.append(p)
for process in process_list:
process.join()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
그래서 보통은 Pool을 사용해서 num_workers를 제한하고 작업을 하게 된다.
아래 코드는 5개의 코어로 제한하고, 10개를 수행할 때 앞에 작업이 끝나면 돌게 하는 코드이다.
import multiprocessing
import time
from multiprocessing import Manager
SharedValue = Manager().Value("i", 0)
def sleepy_man(sec):
print("Starting to sleep for {} seconds".format(sec))
time.sleep(sec)
print("Done sleeping for {} seconds".format(sec))
SharedValue.value += 1
tic = time.time()
pool = multiprocessing.Pool(5)
pool.map(sleepy_man, range(1, 11))
pool.close()
toc = time.time()
print("Done in {:.4f} seconds".format(toc - tic))
SharedValue.value
## 10
다른 방식으로 구현해 봄
## Done in 15.0389 seconds
import multiprocessing
import time
from multiprocessing import Manager
SharedValue = Manager().Value("i", 0)
from functools import partial
def sleepy_man(sec,share_value):
print("Starting to sleep for {} seconds".format(sec))
time.sleep(sec)
print("Done sleeping for {} seconds".format(sec))
share_value.value += 1
return sec
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
tic = time.time()
pool = multiprocessing.Pool(5)
pool.map(sleepy_man2, range(1, 11))
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pool.close()
toc = time.time()
print("Done in {:.4f} seconds".format(toc - tic))
SharedValue.value
근데 막상 쓰려고 하니... 내가 한 코드가 Pool 코드로 충분히 대체할 수 있었다
괜히 고생해서 짠 코드....ㅠ
Pool만 잘 알았어도 이렇게 구현해서 안 짜도 될 것 같았다...
결국 하고자 했던 것은 동일한 것 같다...
기존에 하고자 했던 것도 결국 Pool과 동일하다.
process를 정하면 그 숫자만큼만 돌다가 다음 process들이 도는 것이다.
더 잘 구현된 것 같다.
## Done in 15.0693 seconds
import multiprocessing as mp
from tqdm import tqdm
def chunk(list, size):
from itertools import islice
arr_range = iter(list)
return iter(lambda: tuple(islice(arr_range, size)), ())
def worker(arr, sharedValue=None, sharedList=None, sharedDict=None):
raise NotImplementedError
def run_parallel(chunk_list, max_iter, worker, mp_count=1, worker_kwargs={}):
count = 0
temp = []
pbar = tqdm(chunk_list)
for idx, arr in enumerate(pbar):
count += 1
temp.append(arr)
if ((idx > 0) & (count % mp_count == 0)) | (count == max_iter):
p = []
for idx2, tmp in enumerate(temp):
p.append(mp.Process(target=worker, args=(tmp,), kwargs=worker_kwargs))
p[idx2].start()
else:
for idx2 in range(len(temp)):
p[idx2].join()
else:
p = []
temp = []
else:
print("shared object를 확인하세요")
print(worker_kwargs)
import time
chunk_list = range(1, 11)
max_iter = 10
from functools import partial
from multiprocessing import Manager
SharedValue = Manager().Value("i", 0)
def sleepy_man(sec,share_value):
print("Starting to sleep for {} seconds".format(sec))
time.sleep(sec)
print("Done sleeping for {} seconds".format(sec))
share_value.value += 1
return sec
tic = time.time()
run_parallel(
chunk_list=chunk_list,
max_iter=max_iter,
worker=sleepy_man,
mp_count=5,
worker_kwargs=dict(share_value=SharedValue),
)
toc = time.time()
print("Done in {:.4f} seconds".format(toc - tic))
시간을 보니 0.03초가 내가 짠 코드가 더 느린 것 같다...
아마 내가 짠 코드 경우 경우 멀티코어 개수만큼 멈춰있다가 다시 데이터 쌓고 다시 start 하고 join을 하지만,
Pool 같은 경우 특정 process가 끝나면 다음 프로세스가 바로 돌게 되다 보니 시간적으로 더 빨리 도는 것 같다.
역시 구글링을 먼저 하고 했어야 했는데...
아쉽다...
다음부터는 Pool을 써서 구현하는 걸로...
아래와 같은 방법으로 백그라운드에서 수행을 하게 해주는 코드이다.
tic = time.time()
SharedValue = Manager().Value("i", 0)
pool = multiprocessing.Pool(5)
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
outputs_async = pool.map_async(sleepy_man2, range(1, 11))
while True :
print(SharedValue)
if SharedValue.value == 10 :
toc = time.time()
print("Done in {:.4f} seconds".format(toc - tic))
print(outputs_async.get())
break
tic = time.time()
SharedValue = Manager().Value("i", 0)
pool = multiprocessing.Pool(5)
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
results_async = [pool.apply_async(sleepy_man2, [i]) for i in range(1,11)]
while True :
print(SharedValue)
if SharedValue.value == 10 :
toc = time.time()
print("Done in {:.4f} seconds".format(toc - tic))
break
결괏값을 받아서 이후체 처리할 것은 return으로 넣어도 될 것 같긴 했지만, share 하는 것에 대한 테스트를 수행해 봤다
약간의 이슈로는 동시에 끝나는 작업이 많으면 share 하는 값에서 문제가 생기는 것을 확인하였다.
def sleepy_man(sec,share_value):
print("Starting to sleep for {} seconds".format(sec))
time.sleep(1)
print("Done sleeping for {} seconds".format(sec))
share_value.value += 1
tic = time.time()
SharedValue = Manager().Value("i", 0)
pool = multiprocessing.Pool(5)
sleepy_man2 = partial(sleepy_man ,share_value=SharedValue)
results_async = [pool.apply_async(sleepy_man2, [i]) for i in range(1,11)]
while True :
print(SharedValue)
if SharedValue.value == 10 :
toc = time.time()
print("Done in {:.4f} seconds".format(toc - tic))
break
참고
https://www.analyticsvidhya.com/blog/2021/04/a-beginners-guide-to-multi-processing-in-python/
'분석 Python > 구현 및 자료' 카테고리의 다른 글
git) submodule 다루기 (0) | 2023.02.15 |
---|---|
python) metaflow 파이썬 스크립트에서 실행해보기 (0) | 2023.02.10 |
Python) pyarrow 사용 방법 (0) | 2022.12.21 |
Python) pyarrow 다뤄보기 (0) | 2022.11.26 |
(진행중) SHAP (Shapley Additive exPlanations) 이해하기 (1) | 2022.11.21 |