Python) pyarrow 다뤄보기

2022. 11. 26. 09:36분석 Python/구현 및 자료

728x90

 

2022.11.26 - [분석 Python/구현 및 자료] - Python) pyarrow 다뤄보기

2022.12.21 - [분석 Python/구현 및 자료] - Python) pyarrow 사용 방법

Apache Arrow란?

(https://realsalmon.tistory.com/21)

직렬화(Serialization)란?

(https://realsalmon.tistory.com/21)

Zero-Copy 직렬화

Pyarrow란?

아파치 애로우(Apache Arrow)라는 메모리 내 분석을 위한 개발 플랫폼인데, 빅데이터를 빠르게 처리하고 이동할 수 있도록 하는 일련의 기술을 제공하는 라이브러리를 파이썬 PyArrow를 통해 구현할 수 있다.

기존의 pandas로 용량이 큰 csv파일을 로드하면 시간이 오래 걸리는데,  pyarrow를 활용하면 시간 절약에 도움이 된다.

 

 

CSV 데이터 읽기

# https://yahwang.github.io/posts/83
import pyarrow as pa
from pyarrow import csv

# 데이터 타입에 ()을 항상 명시
convert_opts = csv.ConvertOptions(column_types={'st_cradle': pa.uint8(), 'st_id': pa.uint16()})

# header가 없는 경우, 컬럼명 지정 가능
read_opts = csv.ReadOptions(
    column_names=['register_at', 'st_cradle','st_id'])

df_typed = csv.read_csv('bike_data.csv', convert_options=convert_opts, 
                        read_options=read_opts).to_pandas()

데이터 생성

from sklearn.datasets import make_classification
import numpy as np
import pyarrow.parquet as pq
import pandas as pd 
X , y = make_classification(n_samples=100000, n_features=5,n_classes=2)

X_pd= pd.DataFrame(X,columns =[ f"feature_{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd['class1'] =np.random.randint(0,5,len(y))

 

저장 방법 

전체 (파티션 별로)

X_pd.to_parquet(
    path="./test_whole", 
    engine='pyarrow',
    compression='snappy',
    partition_cols=['class','class1']
    )

저장 방법

배치 (파티션 별로)

  • 배치로 저장하면 배치별로 파티션별로 파일이 생성됨.
  • 읽는 속도에 영향을 줌
import pyarrow as pa
import pyarrow.parquet as pq
for i in pa.Table.from_pandas(X_pd).to_batches(10000) : 
    i.to_pandas().to_parquet(
    path="./test_batch", 
    engine='pyarrow',
    compression='snappy',
    partition_cols=['class','class1']
    )

속도 비교

전체

%%time

_temp = pq.read_table("./test_whole")

# CPU times: user 24 ms, sys: 29.9 ms, total: 54 ms
# Wall time: 12 ms

배치

%%time

_temp = pq.read_table("./test_batch")

# CPU times: user 175 ms, sys: 12.1 ms, total: 187 ms
# Wall time: 35.7 ms

 

 

행을 기준으로 필터 방법

  •  ==
  • !=
  • <
  • >
  • <=
  • >=
  • in
  • not in
pq.read_table("./test_whole", filters=[("class","=",0),("class1","in",{1})])
pq.read_table("./test_whole", filters=[("class","=",0),("class1",">",1)])
pq.read_table("./test_whole", filters=[("class","=",0),("class1",">=",1)])
pq.read_table("./test_whole", filters=[("class","=",0),("class1","<",3)])
pq.read_table("./test_whole", filters=[("class","=",0),("class1","<=",3)])
pq.read_table("./test_whole", filters=[("class","=",0),("class1","!=",0)])
pq.read_table("./test_whole", filters=[("class","=",0),("class1","not in",{0,1})])

 

Join

join_type

- default “left outer”

(“left semi”, “right semi”, “left anti”, “right anti”, “inner”, “left outer”, “right outer”, “full outer”)

 

join type

pydict = dict(
A = np.random.choice(["A","B","C"],size=1000),
B = np.random.choice(["D","E","F"],size=1000),
C = np.random.randint(1,100,1000),
)
left_table = pa.Table.from_pydict(pydict)
pydict2 = dict(
    A = ['A','B','C'],
    V = ['0','1','2']
)
right_table = pa.Table.from_pydict(pydict2)
pydict2 = dict(
    A = ['A','B','C','A','B','C','A','B','C'],
    B = ['D','D','D','E','E','E','F','F','F'],
    V = ['0','1','2','3','4','5','6','7','8']
)
right_table = pa.Table.from_pydict(pydict2)
pydict2 = dict(
    A = ['A','B','C','A','B','C','A','B'],
    B = ['D','D','D','E','E','E','F','F'],
    V = ['0','1','2','3','4','5','6','7']
)
right_table2 = pa.Table.from_pydict(pydict2)

 

left_table.join(right_table,keys=['A','B']).to_pandas()

join이 되지 않은 컬럼들은 밑으로 빠지는 것으로 확인

left_table.join(right_table2,keys=['A','B']).to_pandas()

 

 

batch = left_table.to_batches(100)[0]
pa.Table.from_batches([batch]).join(right_table,keys=['A','B']).to_pandas()

batch = left_table.to_batches(100)[0]
pa.Table.from_batches([batch]).join(right_table2,keys=['A','B']).to_pandas()

 

 

Reference

https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html

 

728x90