[ Ray ] 10x Faster Parallel Python Without Python Multiprocessing -리뷰

2020. 2. 20. 19:50분석 Python/Ray

728x90

광고 한 번씩 눌러주세요 블로그 운영에 큰 힘이 됩니다. : )

파이썬 실력은 부족하지만, 빠르게 돌리기 위해서 병렬 처리에 대해서 관심이 있다.
기존에는 주로 multiprocessing을 많이 해결했는데, 웬만한 거는 쉽게 할 수 있어서 좋았지만,
모델링을 하였을 때, 일단 데이터를 각 프로세스마다 복제해서 커지는 것도 문제고, 데이터의 크기가 크면 안 된다라는 단점을 확인했다. 그러던 중에 ray라는 존재에 대해서는 알게 되었지만, 이러한 문제를 해결해줄 수 있을 것 같아서(추측) 읽어보기로 한다. 개인적으로 쭉 읽고 나서 기존 multiprocessing 보다 많은 장점이 있는 것을 확인하였고, 익숙해지기 위해 노력해야겠다


파이썬 라이브러리 중에서 multiprocessing 가 많은 영역에서 사용하고 있다.

글쓴이는 그것이 수치 데이터 처리, 상태 저장 연산, 값비싼 초기화를 통한 연산 등 몇 가지 중요한 애플리케이션 클래스에 비해 부족하다는 것을 보여준다. 두 가지 주된 이유가 있다고 한다.

  • Inefficient handling of numerical data.
  • Missing abstractions for stateful computation (i.e., an inability to share variables between separate “tasks”).

이번 글에서 Ray에 대해 이야기 할 것이고,
Ray는 빠르며 위의 문제를 해결하는데 분산형으로 실행하고 만들 수 있는 간단한 프레임워크다.
기본 개념을 알고 싶다면,  this blog post 을 읽어보라고 한다.
Ray는 Apache Arrow를 활용하여 데이터를 효율적으로 처리하고 분산 컴퓨팅을 위한 taskactor 추상화를 제공한다.
저 글에서는 Python Multiprocessing으로 쉽게 표현되지 않는 세 가지 워크로드를 벤치마킹하고 Ray, Python Multiprocessing, serial Python 코드를 비교한다.

it’s important to always compare to optimized single-threaded code

실험을 하였을 때 Ray가 기본 serial python 보다 10-30배 빠르고, multiprocessing 보다 5-25배 정도 빠르다고 한다.
그리고 serial, multiprocessing을 좋은 머신에서 돌린 것보다 5-15배 빠르다고 한다.

The benchmarks were run on EC2 using the m5 instance types (m5.large for 1 physical core and m5.24xlarge for 48 physical cores). Code for running all of the benchmarks is available here. Abbreviated snippets are included in this post. The main differences are that the full benchmarks include 1) timing and printing code, 2) code for warming up the Ray object store, and 3) code for adapting the benchmark to smaller machines.


Benchmark 1: Numerical Data

많은 머신 러닝과 계산 과학 그리고 데이터 분석 작업량은 데이터의 큰 배열을 사용한다.
예를 들어, 배열은 큰 이미지 또는 데이터 집합을 나타낼 수 있으며, 애플리케이션은 이미지를 분석하는 여러 작업을 원할 수 있다. 수치 데이터를 효율적으로 처리하는 것은 중요하다.

아래의 For 루프를 통과하는 각 패스는 Ray를 사용하는 경우 0.84초, Python 다중 처리를 사용하는 경우 7.5초, 직렬 Python을 사용하는 경우 24초를 소비한다.(on 48 physical cores).

이 성능 차이는 왜 Modin과 같은 라이브러리를 Ray 위에 구축 할구축할 수 있지만 다른 라이브러리 위에 구축할 수 없는지 설명합니다.

Ray의 코드는 다음과 같다.

import numpy as np
import psutil
import ray
import scipy.signal

num_cpus = psutil.cpu_count(logical=False)

ray.init(num_cpus=num_cpus)

@ray.remote
def f(image, random_filter):
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

# Time the code below.

for _ in range(10):
    image = np.zeros((3000, 3000))
    image_id = ray.put(image)
    ray.get([f.remote(image_id, filters[i]) for i in range(num_cpus)])

ray.put(image) 을 호출함으로써, 큰 배열은 공유 메모리에 저장되며 복사본을 만들지 않고 모든 작업자 프로세스에서 액세스 할 수 있습니다. ( 결국 이 말은 복사를 안 하니 메모리를 아낄 수 있다는 소리?)

이것은 배열 뿐만 아니라 포함하고 있는 배열들(lists of arrays)와 같은 객체 또한 처리할 수 있다.

workers가 f task를 실행할 때, 그 결과는 다시 공유된 메모리에 저장된다.
그때 script는 ray.get()을 호출할 때, 값을 역 직렬 화하거나 복사하지 않고 공유 메모리가 지원하는 numpy 배열을 만듭니다.

이 최적화들은 Plasma shared-memory object store. 뿐만 아니라 data layour과 serialization 형태를 기반으로 한 Apache Arrow의 Ray's를 사용함으로써, 가능하게 되었다. 

아래 코드는 멀티 프로세싱으로 작성한 코드다.(The code looks as follows with Python multiprocessing.)

from multiprocessing import Pool
import numpy as np
import psutil
import scipy.signal

num_cpus = psutil.cpu_count(logical=False)

def f(args):
    image, random_filter = args
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

pool = Pool(num_cpus)

filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

# Time the code below.

for _ in range(10):
    image = np.zeros((3000, 3000))
    pool.map(f, zip(num_cpus * [image], filters))
view rawparallel_python_multiprocessing_numerical_computation.py hosted with ❤ by GitHub

여기의 차이점은 파이썬 멀티프로세싱은 processes 사이에 그들을 거칠 때 큰 객체들을 직렬 화하기 위해 pickle을 사용한다는 것이다. 

이 방법을 사용하려면 각 프로세스마다 고유한 데이터 복사본을 만들어야 하므로 상당한 메모리 사용량과 비싼 직렬화 해제를 위한 오버 헤드가 추가됩니다.

Ray는 Plasma store. 와 함께  zero-copy serialization를 위해 Apache Arrow  data layout을 사용하여 피합니다.


Benchmark 2: Stateful Computation

많은 소규모 작업 단위 간에 공유해야 할 실질적인 "상태"가 필요한 워크로드는 Python 멀티 프로세싱에 문제가 되는 또 다른 범주의 워크로드입니다. 이러한 패턴은 매우 일반적이고, 그리고 toy stream processing application으로 설명할 것이다.

On a machine with 48 physical cores, Ray is  6x  faster than Python multiprocessing and  17x  faster than single-threaded Python. Python multiprocessing doesn’t outperform single-threaded Python on fewer than 24 cores. The workload is scaled to the number of cores, so more work is done on more cores (which is why serial Python takes longer on more cores).

상태는 종종 파이썬 클래스에서 요약된다.  Ray는 an actor abstraction를 제공하여 병렬 및 분산 설정에서 클래스를 사용할 수 있습니다. 반대로 파이썬 multiprocessing은 파이썬 classes를 병렬로 하기 위한 자연스러운 처리 방법을 제공하지 않는다. 따라서 사용자는 종종 맵 호출 간에 관련 상태를 전달해야 합니다.

이 전략은 실제로 구현하기 까다로울 수 있다. (많은 파이썬 변수들은 실제로 serializable 하지 않다) 그리고 그것은 작동할 때 느리게 작동할 거라고 한다.

아래는 병렬 작업을 사용하여 한 번에 하나의 문서를 처리하고 각 단어의 접두사를 추출하고 끝에 가장 일반적인 접두사를 반환하는 토이 예제이다. 이 접두사 개수는 actor state에 저장되고 다른 작업에 의해 변경된다.

This example takes 3.2s with Ray, 21s with Python multiprocessing, and 54s with serial Python (on 48 physical cores).

Ray Version

from collections import defaultdict
import numpy as np
import psutil
import ray

num_cpus = psutil.cpu_count(logical=False)

ray.init(num_cpus=num_cpus)

@ray.remote
class StreamingPrefixCount(object):
    def __init__(self):
        self.prefix_count = defaultdict(int)
        self.popular_prefixes = set()

    def add_document(self, document):
        for word in document:
            for i in range(1, len(word)):
                prefix = word[:i]
                self.prefix_count[prefix] += 1
                if self.prefix_count[prefix] > 3:
                    self.popular_prefixes.add(prefix)

    def get_popular(self):
        return self.popular_prefixes

streaming_actors = [StreamingPrefixCount.remote() for _ in range(num_cpus)]

# Time the code below.

for i in range(num_cpus * 10):
    document = [np.random.bytes(20) for _ in range(10000)]
    streaming_actors[i % num_cpus].add_document.remote(document)

# Aggregate all of the results.
results = ray.get([actor.get_popular.remote() for actor in streaming_actors])
popular_prefixes = set()
for prefixes in results:
    popular_prefixes |= prefixes

Ray는 Ray's abstraction를 당면한 문제에 fitting 하기 때문에 여기서 잘 수행됩니다.
이 응용 프로그램은 분산 설정에서 상태를 캡슐화하고 변경하는 방법이 필요하며 액터가 계산서에 맞습니다.

multiprocessing version

from collections import defaultdict
from multiprocessing import Pool
import numpy as np
import psutil

num_cpus = psutil.cpu_count(logical=False)

def accumulate_prefixes(args):
    running_prefix_count, running_popular_prefixes, document = args
    for word in document:
        for i in range(1, len(word)):
            prefix = word[:i]
            running_prefix_count[prefix] += 1
            if running_prefix_count[prefix] > 3:
                running_popular_prefixes.add(prefix)
    return running_prefix_count, running_popular_prefixes

pool = Pool(num_cpus)

running_prefix_counts = [defaultdict(int) for _ in range(4)]
running_popular_prefixes = [set() for _ in range(4)]

for i in range(10):
    documents = [[np.random.bytes(20) for _ in range(10000)]
                 for _ in range(num_cpus)]
    results = pool.map(
        accumulate_prefixes,
        zip(running_prefix_counts, running_popular_prefixes, documents))
    running_prefix_counts = [result[0] for result in results]
    running_popular_prefixes = [result[1] for result in results]

popular_prefixes = set()
for prefixes in running_popular_prefixes:
    popular_prefixes |= prefixes

문제는 pool.map이 stateless 함수를 실행한다는 것입니다. 즉, 다른 pool.map 호출에서 사용하려는 하나의 pool.map 호출에서 생성된 모든 변수는 첫 번째 호출에서 반환되어 두 번째 호출로 전달되어야 합니다.
작은 객체에선 이러한 접근이 괜찮지만, 그러나 큰 중간 결과를 공유한다면, 그 결과를 교화하는 비용은 상당히 많이 들 것이다. (변수가 스레드 간에 공유되는 경우 위의 말은 맞지 않지만, 프로세스 경계 간의 공유되므로 변수는 pickle과 같은 라이브러리를 사용하여 바이트 문자열로 직렬화되어야 합니다.)

멀티 프로세싱 버전은 너무 많은 상태를 통과해야 하기 때문에 매우 어색해 보이며 결국 직렬 파이썬보다 약간의 속도 향상을 달성합니다. 실제로 당신은 이와 같은 코드를 쓰지 않을 것이다. 왜냐하면 당신은 stream processing에서 파이선 멀티프로세싱을 사용하지 않기 때문이다. 대신에 아마도 당신은 dedicated stream-processing framework을 사용할 것이다. 이 예제에서 ray가 이러한 프레임워크나 애플리케이션을 빌드하기 위해 적합하다는 것을 보여준다.

 


Benchmark 3: Expensive Initialization

이전 예제와 달리 많은 병렬 계산에서는 작업 간에 중간 계산을 공유할 필요는 없지만 어쨌든 이점을 얻을 수 있다.
심지어 stateless 계산은 state를 초기화하는 것이 비쌀 때 sharing state로부터 이점을 얻을 수 있다.

아래 예제에서는 병렬로 디스크로부터 저장된 neural net을 불러오고 이미지 뭉텅이를 분류하는 데 사용할 수 있다.

On a machine with 48 physical cores, Ray is  25x  faster than Python multiprocessing and  13x  faster than single-threaded Python. Python multiprocessing doesn’t outperform single-threaded Python in this example. Error bars are depicted, but in some cases are too small to see. The workload is scaled to the number of cores, so more work is done on more cores. In this benchmark, the “serial” Python code actually uses multiple threads through TensorFlow. The variability of the Python multiprocessing code comes from the variability of repeatedly loading the model from disk, which the other approaches don’t need to do.

Ray Multiprocessing Serial Python
5s 126s 64s

이 경우 serial python에서는 tensorflow를 쓰고 있어서 게산을 병렬로 처리하기 위해 많은 코어를 쓴다 그래서 이것은 single thread가 아니다.

이런 모델을 학습했다고 하자.

import tensorflow as tf

mnist = tf.keras.datasets.mnist.load_data()
x_train, y_train = mnist[0]
x_train = x_train / 255.0
model = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(512, activation=tf.nn.relu),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy'])
# Train the model.
model.fit(x_train, y_train, epochs=1)
# Save the model to disk.
filename = '/tmp/model'
model.save(filename)

이제 우리는 저 모델을 load 해서 이미지 뭉텅이를 분류하고 싶다. 애플리케이션에서 이미지가 모두 동시에 사용 가능하지 않을 수 있고 이미지 로드는 데이터 로드와 동시에 수행되어야 하기 때문에 일괄 처리로 작업을 수행합니다.

Ray

import psutil
import ray
import sys
import tensorflow as tf

num_cpus = psutil.cpu_count(logical=False)

ray.init(num_cpus=num_cpus)

filename = '/tmp/model'

@ray.remote
class Model(object):
    def __init__(self, i):
        # Pin the actor to a specific core if we are on Linux to prevent
        # contention between the different actors since TensorFlow uses
        # multiple threads.
        if sys.platform == 'linux':
            psutil.Process().cpu_affinity([i])
        # Load the model and some data.
        self.model = tf.keras.models.load_model(filename)
        mnist = tf.keras.datasets.mnist.load_data()
        self.x_test = mnist[1][0] / 255.0

    def evaluate_next_batch(self):
        # Note that we reuse the same data over and over, but in a
        # real application, the data would be different each time.
        return self.model.predict(self.x_test)

actors = [Model.remote(i) for i in range(num_cpus)]

# Time the code below.

# Parallelize the evaluation of some test data.
for j in range(10):
    results = ray.get([actor.evaluate_next_batch.remote() for actor in actors])
view rawparallel_python_ray_expensive_initialization.py hosted with ❤ by GitHub

모델을 로딩하는 것은 한 번만 실행하는 것으로 충분하다.
Ray 버전은 액터의 생성자에서 모델을 한 번 로드하여서 이 비용을 절감합니다.
만약 모델을 GPU에 배치하고 싶다면, 이 초기화는 더 비싼 비용을 들 것이다.

multiprocessing version은 모델을 다시 reload 할 필요가 있기 때문에 느리다.
왜냐하면 mapped 함수들은 stateless를 가정하기 때문이다.

multiprocessing 은 다음과 같다. 
경우에 따라서는 initializer 인수를 사용하여 multiprocessing.Pool에이를 수행할 수 있습니다.
그러나 이는 각 프로세스마다 초기화가 동일하고 다른 프로세스가 다른 설정 기능을 수행할 수 없는 설정으로 제한됩니다. 다른 작업이 다른 작업자를 대상으로 하는 것을 허용하지 않습니다.

from multiprocessing import Pool
import psutil
import sys
import tensorflow as tf

num_cpus = psutil.cpu_count(logical=False)

filename = '/tmp/model'

def evaluate_next_batch(i):
    # Pin the process to a specific core if we are on Linux to prevent
    # contention between the different processes since TensorFlow uses
    # multiple threads.
    if sys.platform == 'linux':
        psutil.Process().cpu_affinity([i])
    model = tf.keras.models.load_model(filename)
    mnist = tf.keras.datasets.mnist.load_data()
    x_test = mnist[1][0] / 255.0
    return model.predict(x_test)

pool = Pool(num_cpus)

for _ in range(10):
    pool.map(evaluate_next_batch, range(num_cpus))

이 모든 예에서 보았던 것은 Ray의 성능은 성능 최적화뿐만 아니라 현재 작업에 적합한 추상화를 통해 얻은 것입니다. 상태 저장 계산은 많은 응용 프로그램에서 중요하며 상태 저장 계산을 상태 비 저장 추상화로 강제하는 것은 비용이 많이 든다.

Ray와 multiprocessing은 사고와 사과를 비교하는 것과 같아서 비교가 어렵다.
왜냐하면 이 두 개의 라이브러리는 유사하지 않기 때문이다. 차이는 아래와 같다.

  • Ray is designed for scalability and can run the same code on a laptop as well as a cluster (multiprocessing only runs on a single machine).
  • Ray workloads automatically recover from machine and process failures.
  • Ray is designed in a language-agnostic manner and has preliminary support for Java.

More relevant links are below.


개인적으로 tensorflow를 주로 쓰기 때문에 병렬로 돌리는 것을 기존 ray tutorial를 참고해서 low level로 만들어봤다.

from collections import defaultdict
import numpy as np
import psutil
import ray
import tensorflow as tf

num_cpus = psutil.cpu_count(logical=False)
print(num_cpus)
ray.shutdown()
ray.init(num_cpus=num_cpus)

mnist = tf.keras.datasets.mnist.load_data()
x_train, y_train = mnist[0]
x_train = x_train.reshape(-1,28*28)

@ray.remote
class SimpleTensorflow(object):
    def __init__(self, model_id):
        import tensorflow
        self.model_id = model_id
        self.X = tf.placeholder(tf.float32, shape=[None, x_train.shape[1]] , name="X")
        self.y = tf.placeholder(tf.int32, shape=[None], name="y")
        self.fit()
        config=tf.ConfigProto(log_device_placement=True)
        config.gpu_options.allow_growth = True
        self.sess = tf.Session(config = config)
        self.sess.run(tf.global_variables_initializer())
        
    def fit(self,) :
        W = tf.get_variable(name="w",shape=[x_train.shape[1],
                                            len(np.unique(y_train))],
                initializer= tf.truncated_normal_initializer())
        logit = tf.matmul(self.X , W)
        Loss = tf.losses.sparse_softmax_cross_entropy(self.y, logit)
        self.Loss = tf.reduce_mean(Loss)
        self.vars = tf.trainable_variables()
        self.optim = tf.train.AdamOptimizer().minimize(self.Loss, var_list = self.vars)
        
    def get_weight(self,) :
        return self.sess.run(self.vars)
    def set_weights(self, weights) : 
        if isinstance(weights , list) :pass
        else : weights = list(weights)
        for idx , var in enumerate(self.vars) :
            self.sess.run(var.assign(weights[idx]))
    def train(self,) :
        for i in range(1000) :
            _ , loss = self.sess.run([self.optim , self.Loss], 
                                feed_dict={self.X : x_train , self.y :y_train}) 
            print(f"[ID : {self.model_id}] Epoch : {i}, lOSS : {loss:.3f}")
        return f"Terminate {self.model_id}"

이제  class를 만들고 이제 아까 2번째 장점이었던 Stateful Computation에서 class를 병렬 처리가 가능하다는 것을 시도해봤다.

프로세스별로 actor를 만들기

tensorflow_actors = [SimpleTensorflow.remote(i) for i in range(num_cpus)]

각 actor별 train 함수를 호출해서 훈련하기

results = ray.get([actor.train.remote() for actor in tensorflow_actors])

그리고 actor 중에 2개를 뽑아서 학습된 가중치 평균을 구하기

model1 , model2 = ray.get([tensorflow_actors[0].get_weight.remote(),
                           tensorflow_actors[1].get_weight.remote()
                          ])
averaged_weight = [(layer1 + layer2) / 2
                   for layer1, layer2 in zip(model1, model2)]

새로운 actor를 만들어서 weight 넣어주기

newmodel = SimpleTensorflow.remote("new") 
weight_id = ray.put(averaged_weight)
newmodel.set_weights.remote(weight_id)

그리고 다시 훈련
첫 번째 epoch부터 작은 것을 보니 학습된 가중치를 사용한다는 것을 알 수 있다.

newmodel.train.remote()

https://ray.readthedocs.io/en/latest/using-ray-with-tensorflow.html

 

Best Practices: Ray with Tensorflow — Ray 0.9.0.dev0 documentation

Best Practices: Ray with Tensorflow This document describes best practices for using the Ray core APIs with TensorFlow. Ray also provides higher-level utilities for working with Tensorflow, such as distributed training APIs (training tensorflow example), T

ray.readthedocs.io

 

728x90