[ Python ] 동시성과 Future / concurrent.futures 알아보기

2019. 8. 11. 15:58분석 Python/구현 및 자료

https://hamait.tistory.com/748

 

초보자를 위한 동시성과 Future

1. 그림으로 보는 동시성 2. 동시성과 Future 이야기 3. 자바로 밑바닥부터 Future 구현 4. 언어별 Future 살펴보기 1. 그림으로 보는 동시성 은행에 창구가 하나입니다. 사람들은 줄을 서서 일을 처리합니다...

hamait.tistory.com

 위의 글에서 흐름을 보자면, CPU를 활용하는 THread들이 많아 질수록 처리능력이 좋아진다고 함.

쓰레드를 내부로 감춰놓고 외부에서 편하게 쓸 수 있는 도구가 필요함 Future , Promise , Async, Observerble )

 

이상적

메인 쓰레드 (A)는 자기가 하는 일만 하고 B 에게는 알아서 B -> C -> Result 

예를 들어 내가 지금 하고자 하는 Kafka에서 데이터는 계속 싸주기만 하고(A) 뒤에서 요청을 처리하는 녀석들을 동시성 효율을 보이도록 잘 구성해서 성능을 높여야한다.(B->C->Result)

(B->C->Result) 내가 해야하는 부분 

 

동시성과 Future!

 

 암달의 법칙

멀티코어를 사용하는 프로그램의 속도는 프로그램 내부에 존재하는 순차적sequential 부분이 사용하는 시간에 의해서 제한된다.

 

결국 암달의 법칙에서는 병렬화가 되는 부분과 안되는 부분이 있는데, 여기서 병렬화가 되는 부분에 대해서 시간을 재는 법칙? 그런 것 같다.

 

Future, Promise, Async,Observerble

쓰레드를 사용 안하는게 아니라 깊숙히 감춰 놓은 기술

쓰레드를 직접 사용하는 것 보다 쉽게 효율적으로 동시성 프로그램을 할 수 있게 적용!

" 쓰레드로 하던 것을 저거로 하자" 

 

https://soooprmx.com/archives/5669

 

파이썬의 새로운 병렬처리 API - Concurrent.futures · Wireframe

어떤 처리량이 많은 작업을 작은 단위로 쪼개거나, 현재 진행되는 흐름과 독립적으로 병렬적인 처리를 하기 위해서 멀티스레드나 멀티프로세스를 사용하는 경우가 (지금까지는 드물지만) 종종 있다. 이전에는 Threading.Thread 와 Multiprocessing.Process 를 이용해서 각각의 스레드나 별도 프로세를 제어하는 방식을 사용했다. 파이썬 3.2에서 이러한 비동기 실행을 위한 API를 보다 고수준으로 만들고 사용하기 쉽도록 개선한 concurr

soooprmx.com

 

그래서 이러한 동시성을 처리하기 위해서

패캐지인 threading 과 multiprocessing을 이용해서 제어했지만, 

3.2부터는 비동기 실행을 위한 API를 보다 고수준으로 만들고 사용하기 쉽도록 개선한 concurrent.futures 모듈이 도입

 

이 모듈은 멀티프로세싱 및 멀티스레딩을 위한 API 제공

이전의 스레드, 프레스스는 API 들이 C기반의 코드를 래핑하는 수준으로 개발되어 있다고 함. 그래서 사용하기 까다로웠다고 한다.

 

concurrent.futures 모듈은 이러한 점들을 개선하면서 사용하기 쉽고 스레드와 프로세스를 사용하는 API를 통일하고, 특히 비동기 코루틴과 거의 유사한 형태의 API를 제공하여, 현대적인 자바스크립트의 비동기 Task 프로토콜인 Promise와 유사한 Future라는 클래스를 도입하여 보다 깔끔하게 병렬처리 코드를 작성할 수 있게 해준다.

 

concurrent.futures의 특징

  1. 멀티스레딩/멀티프로세싱처리의 API가 통일되었다. 둘은 모두 똑같은 방식으로 작동하며, 특히 이 인터페이스는 비동기 코루틴에 의한 비동기 호출의  API와도 동일하다. (단 concurrent와 asyncio의 Future는 호환되는 클래스는 아니다.) 따라서 모든 비동기 루틴은 사용하는 기술의 종류에 독립적으로 모두 비슷한 형태로 디자인될 수 있다.
  2. Promise 개념이 도입된다. Executor에 의해서 디스패치된 병렬작업은 Future라는 클래스로 래핑되어 다음의 작업들을 간단하게 처리할 수 있다.
    1.  실행중인 병렬 작업을 취소
    2. 실행중여부, 완료 여부의 체크
    3. 특정 타임아웃 시간 후의 결과값 확인
    4. 완료 콜백을 추가
    5. 동기화 코드를 매우 쉽게 작성할 수 있음

병렬 처리 API은 크게 세 가지 요소로 구성된다. 다음 장에서는 각 요소들에 대해서 살펴볼 것이다.

  • Executor : 병렬 작업을 디스패치하며, 새로운 프로세스나 스레드를 자동으로 관리한다.
  • Future : 디스패치된 병렬 처리 작업. API를 통해서 완료여부를 확인하거나, 취소, 결과값 fetch 등을 수행할 수 있다.
  • 모듈함수 : .wait(), .as_completed() 등의 함수를 통해서 쉽게 병렬처리 결과를 동기화할 수 있다.

Executor

concurrent.futures.Executor 는 함수 호출을 비동기로 디스패치해주는 역할을 담당하는 실행기의 베이스가 되는 추상 클래스이다. concurrent.future 모듈에서는 비동기 병렬호출을 모두 스레드풀/프로세스풀 방식으로 처리하며, 이 때 풀은 모두 실행기에 의해서 관리된다.

 

처리 방식이 스레드인지 프로세스인지에 따라 ThreadPoolExecutor  ProcessPoolExecutor 중 하나를 선택하여 사용하면된다. 두 클래스가 제공하는 API는 동일하며 주로 다음 세 가지 메소드를 사용한다.

 

submit

submit(fn, *args, **kwargs) : 비동기로 실행하고자 하는 단일 함수 fn 을 주 인자로 넘겨주고, 그외 인자로는해당 함수가 받게되는 인자들을 모두 같이 넘겨줄 수 있다.

with ThreadPoolExcutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

submit 함수의 리턴값은 병렬로 실행되는 작업을 래핑한 Future 클래스 객체가 된다.

 

이 future 객체와 관련된 내용은 뒤에서 후술하겠다.

 

Executor 객체는 컨텍스트 매니저 프로토콜을 지원하기 때문에 with 구문 내에서 사용할 수 있다.

위 코드는 1개의 스레드를 관리하는 스레드 풀을 생성하고 323의 1235승을 계산하는 작업을 별도 스레드를 통해서 호출한다.

사실 이 계산은 순식간에 일어나기 때문에, future.result()를 통해서 계산 결과를 얻어와서 출력하면 거의 곧바로 출력된다.

 

이 코드의 실제 구동은 별도 스레드가 생성되고, 해당 스레드에서 pow(323, 1235)가 실행되었다는 것 빼고는 특별히 병렬로 무언가가 처리되었다는 효과를 느끼기는 힘들지만, 대략의 API 모양이 어떻게 생겼다 하는 정도만 소개했다는데 의의를 둔다.

 

map

Executor.map 함수는 timeout 인자를 별도로 받는다는 점 외에는 연속열에 함수를 맵핑하는 map() 내장함수와 동일하게 생겼고, 동작도 비슷하다.

대신 호출은 비동기적으로 발생하며, 결과값이 생성되는 순서가 반드시 호출이 시작된 순서와 동일하지는 않다.

 

map(func, *iterables, timeout=None) : 결과값은 특이하게 [Futures] 타입이 아닌 결과에 대한 제너레이터이다.

또한 넘겨질 수 있는 함수가 단일 인자만을 받게 되어 있는데, 만약 복수 인자를 받되, 나머지 인자들이 공통으로 들어가도 된다면 functools.partial 함수를 이용해서 단일 인자 함수로 래핑해서 사용하는 방법이 있다.

 

타임 아웃 값을 별도로 주지 않으면 디스패치된 모든 작업들이 종료될 때까지 기다린 후 리턴한다. 타임아웃 값이 주어진 경우에는 해당 타임아웃 내에 완료되지 못한 작업이 있을 때, 예외를 일으키게 된다.

 

shutdown

shutdown 메소드는 executor에게 종료 시그널을 보내어 실행중이거나 대기중인 모든 future 객체에 대해서 중지 명령을 전달하고 리소스를 정리한다. shutdown이 호출된 executor에게는 map, submit을 호출할 수 없다.

선택적으로 wait= 파라미터값을 전달할 수 있다. 이 값을 True 로 준 경우에는 현재 돌아가고 있는 작업은 강제로 중단하지 않고, 대기 중인 작업만 취소된다.

 

Executor의 구분 – 스레드 vs 프로세스

Executor는 멀티스레드를 쓸 것이냐, 멀티 프로세스를 쓸 것이냐에 따라 ThreadPoolExecutor와 ProcessPoolExecutor로 나뉜다. 둘의 사용방법은 거의 동일하나 다음과 같은 차이가 있다.

  • IO기반의 작업에 대해서 대기 시간을 줄이고 리소스 사용 효율을 늘리고 싶다면 ThreadPoolExecutor를 사용한다. 예를 들어 HTTP 통신을 이용하여 여러 곳을 순차적으로 접근하는 것보다, 멀티 스레드를 이용하면 전반적인 성능향상을 볼 수 있다.
  • CPU 부하가 많은 작업을 분산처리 하는 목적이라면 ProcessPoolExecutor를 사용한다. CPU 로드가 크게 걸리는 작업인 경우에는 파이썬 내에서는 GIL(Grand Interal Lock)이라는 제약이 존재하기 때문에,  멀티 스레드로는 CPU 분산 처리의 효과를 누릴 수 없다. 멀티 프로세스는 서브 프로세스와 유사하게 __main__ 스스로를 반입하는 별도의 프로세스를 가지므로, 이를 호출하는 코드는 반드시 __main__ 모듈 내에서 호출되어야 한다. 또한 __main__ 자체가 반입 가능해야 하기 때문에, REPL 내에서는 사용해 볼 수 없다.

 

ThreadPoolExecutor의 사용 예제

url의 리스트로부터 해당 페이지를 읽어서 바이트 수를 리턴하는 함수를 하나 정의하고 이를 여러 URL에 대해서 비동기로 호출하는 예이다. as_completed() 모듈 함수를 이용해서 제출한 작업이 하나씩 완료될 때마다 결과를 출력하도록 한다.

 

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    conn = urllib.request.urlopen(url, timeout=timeout)
    return conn.readall()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exe:
    future_to_url = {exe.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exp:
            print("%r generated an exception: %s" % (url, exp))
        else:
            print("{} page is {} bytes".format(url, len(data)))

ProcessPoolExecutor의 사용예

 

이번에는 앞에서 언급한 프로세스 풀을 통한 CPU 분산처리 방법이다.  map 메소드를 이용했기 때문에 모든 프로세스가 실행을 완료할 때까지 기다리게 된다. 같은 방식을 스레드 풀로 방식을 바꿔보면 확실히 성능이 차이가 날 것이다. 또한 이 API는 스레드 풀 실행과 완전히 일치하기 때문에,  submit 메소드를 이용하여 Future 를 이용하는 방식으로도 충분히 응용가능하다.

 

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
#[Finished in 4.9s]

 

Future

concurrent.futures.Future 비동기로 호출된 함수 콜이 객체로 캡슐화된 형태이다.

Executor 클래스 인스턴스의 .submit() 호출에 의해 인스턴스가 만들어진다.

특히 이 객체는 asyncio의 Future 클래스와 유사한 API를 가지고 있다. (둘이 호환되는 객체는 아니다.)

따라서 단일 스레드 비동기 코루틴을 사용하는 방식과 concurrent.futures를 이용한 병렬처리 방식은 매우 비슷한 형태로 사용 가능하다.

 Future 클래스는 자바스크립트의 Promise API와 매우 비슷하다.

아직 완료되지 않은 (혹은 완료되었는지 당장은 모르는) 작업을 외부에서 객체로 다룰 수 있게 된다.

다음의 메소드들이 지원된다.

특히 하나의 작업에 대해서 하나 이상의 완료 콜백을 추가할 수 있다는 점이 흥미롭다.

  • cancel() : 작업 취소를 시도한다. 만약 현재 실행중이고 취소가 불가능할 경우 False를 리턴한다. 작업이 취소되었다면 True가 리턴된다.
  • canceled() : 취소가 완료된 작업이면 True를 리턴한다.
  • running(): 실행 중인 경우 True를 리턴한다.
  • done(): 작업이 완료되어고 정상적으로 종료되었다면 True를 리턴한다.
  • result(): 해당 호출의 결과를 리턴한다. 만약 작업이 아직 완료되지 않았다면 최대 타임아웃시간까지 기다린다음, None을 리턴한다.
  • exception(): 해당 호출이 던진 예외를 반환한다. 역시 작업이 완료되지 않았다면 타임아웃 시간까지 기다린다.
  • add_done_callback(): 콜백함수를 Future 객체에 추가한다. 이 함수는 future 객체하나를 인자로 받는 함수이다. 콜백은 취소되거나 종료된 경우에 모두 호출된다.

모듈 함수

concurrent.futures 함수는 병렬처리 작업을 동기화하는 두 가지 함수를 제공한다. 

wait() 함수는 특정한 타임아웃 시간 동안 대기한 다음, 그 시간동안 완료된 작업과 그렇지 않은 작업을 구분하는 두 개의 세트로 된 튜플을 리턴한다.

참고로 별도의 옵션 파라미터를 이용하여 최초 1 개의 작업이 완료될 때 리턴하거나, 첫 예외가 발생할 때 리턴하는 등의 조건을 만들 수 있다.

as_completed()함수  future 의 집합을 받아서 기다리면서 하나씩 완료되는 것 순서대로 순회하면서 반복하는 반복자를 생성하는 함수이다.

728x90