Python) pyarrow 사용 방법

2022. 12. 21. 22:21분석 Python/구현 및 자료

 

2022.11.26 - [분석 Python/구현 및 자료] - Python) pyarrow 다뤄보기

2022.12.21 - [분석 Python/구현 및 자료] - Python) pyarrow 사용 방법

 

 

라이브러리

import numpy as np
import pyarrow.parquet as pq
import pandas as pd 
from sklearn.datasets import make_classification

데이터 생성

X_pd= pd.DataFrame(X,columns =[ f"feature_{i}" for i in range(X.shape[1])])
X_pd['class'] =y

 

파티션별로 저장 

X_pd.to_parquet(
    path="./test.parquet", 
    engine='pyarrow',
    compression='snappy',
    partition_cols=['class']
    )

 

pyarrow 로 parquet 파일 읽기

 

read_table

read_table을 하는 경우 Table로 나옴

df = pq.read_table(source="./test.parquet", use_threads=True)

파일 크기 비교

판다스와 parquet 파일 크기 비교

import sys  
print(sys.getsizeof(df))                 # 89348
print(sys.getsizeof(df.to_pandas()))     # 82268

스키마를 정해서 읽기

import pyarrow as pa
schema = pa.schema([
    ('feature_1', pa.float32()),
    ('feature_2', pa.float32()),
    ('feature_3', pa.float32()),
    ('feature_4', pa.float32()),
    ('feature_0', pa.float32()),
    ('class', pa.int32()),
])

df = pq.read_table(source="./test.parquet", use_threads=True,schema=schema)

타입을 정해서 읽을 경우, 용량이 절반 정도 줄어든 것을 알 수 있음.

import sys  
print(sys.getsizeof(df))                # 48686
print(sys.getsizeof(df.to_pandas()))    # 48144

Filtering

데이터 필터를 하고, 특정 컬럼만 뽑기

df = pq.read_table(source="./test.parquet", 
use_threads=True,
filters=[('class','=',0)],
columns=['feature_0','feature_1','class'])

특정 인덱스만 뽑기

df.take([0,1,10]).to_pandas()

 

행 개수 세기

df.num_rows
# 503

저장

parquet으로 저장하기

X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
    path="./test2", 
    engine='pyarrow',
    compression='snappy'
    # partition_cols=['class'],
    # row_group_size=100,
    )

파티션별로 저장하기

class 별로 저장이 됨

기본적으로 partition 되는 개수는 제한이 되어 있는 것으로 확인됨.

 

/test2/class==1

/test2/class==0

라는 폴더가 생기고 그 안에 parquet 파일들로 이루어짐

X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
    path="./test2", 
    engine='pyarrow',
    compression='snappy'
    partition_cols=['class'],
    # row_group_size=100,
    )

row group size별로 저장하기

한 parquet 안에 row group size 별로 나눠서 저장할 수 있어서 특정 row group size만 빠르게 불러올 수 있음.

X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
    path="./test2", 
    engine='pyarrow',
    compression='snappy'
    # partition_cols=['class'],
    row_group_size=100,
    )

 

 

파티션 및 row group size 정하기

- 실제로 테스트는 안해봤지만, 파티션과 row group size 가 동시에 될 것으로 기대함

X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
    path="./test2", 
    engine='pyarrow',
    compression='snappy',
    partition_cols=['class'],
    row_group_size=100,
    )

 

Table

Tabel) 정의

import pyarrow as pa
t = pa.table([
    pa.array(['a','a','b','b','c']),
    pa.array([1,2,3,4,5])
],names=["keys","values"])

Table) 컬럼 제거

dataset = pq.ParquetDataset("./test2",use_legacy_dataset=False)
pq_table = dataset.read() # pyarrow.Table
pq_table.remove_column(0)

Table) GROUP BY 를 통한 집계(aggregate) -1

result= t.group_by("keys").aggregate([
    ("values","sum"),
    ("values","max")
])
result

Table) GROUP BY 를 통한 집계(aggregate) -2

pq_table.group_by("class").aggregate([("class","count")]).to_pandas()
pq_table.group_by("class").aggregate([("class","count"),("feature_10","mean")]).to_pandas()

Table) GROUP BY 를 통한 집계(aggregate) null 값 처리 -3

import pyarrow.compute as pc 
table_with_nulls = pa.table([
    pa.array(['a','a','a']),
    pa.array([1,None ,None])
], names = ['keys','values'])
result= table_with_nulls.group_by(["keys"]).aggregate(
    [
        ("values","count",pc.CountOptions(mode="all")),
        ("values","count",pc.CountOptions(mode='only_valid'))
    ]
)

Table) pandas로 변환하기

result.to_pandas()

Table) Join

import pyarrow as pa

table1 = pa.table({'id': [1, 2, 3],
                   'year': [2020, 2022, 2019]})

table2 = pa.table({'id': [3, 4],
                   'n_legs': [5, 100],
                   'animal': ["Brittle stars", "Centipede"]})

joined_table = table1.join(table2, keys="id")
table1.join(table2, keys='id', join_type="full outer")

Table) Append Column

table2_withyear = table2.append_column("year", pa.array([2019, 2022]))

Table) Join two keys

result = table1.join(table2_withyear, keys=["id", "year"])

Table) Filter

import pyarrow.compute as pc
even_filter = (
    pc.bit_wise_and(
        pc.field("nums"), pc.scalar(1)) == pc.scalar(0))
table = pa.table({'nums': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                  'chars': ["a", "b", "c", "d", "e", "f", "g", "h", "i", "l"]})
table.filter(even_filter)
table.filter(~even_filter)

 

dataset

dataset 정의

dataset = pq.ParquetDataset("./test2",use_legacy_dataset=False)

dataset 파일 확인하기

dataset.files

['./test2/class=0/2c56a8feebb849e0be8c767622239ff5-0.parquet',
 './test2/class=1/2c56a8feebb849e0be8c767622239ff5-0.parquet']

dataset  row 개수 세기

result.count_rows()

dataset  metadata

pd.DataFrame(dataset.schema.pandas_metadata['columns'])

import pandas as pd 
meta_info = eval(dataset.schema.metadata[b'pandas'].decode('utf-8').replace("null",'None'))
pd.DataFrame(meta_info['columns'])

 

앞에 10개 행만 추출하기

pq_file = pq.ParquetFile(pq.ParquetDataset('./test2',use_legacy_dataset=False).files[0])
first_ten_rows = next(pq_file.iter_batches(batch_size=10))
first_ten_rows.to_pandas()

dataset.read().slice(0,10).to_pandas()

 

dataset에서 필터하고 데이터 가져오기

dataset = pq.ParquetDataset("./test2",use_legacy_dataset=False,
                            filters=[("feature_10",">",0)])
dataset.files
dataset.read().to_pandas()
dataset.read_pandas().to_pandas()

 

dataset join

import pyarrow.dataset as ds

ds1 = ds.dataset(table1)
ds2 = ds.dataset(table2)

# joined_ds = ds1.join(ds2, key="id",join_type="left outer")
result= ds1.join(right_dataset=ds2,keys=['id'],join_type="left outer")

 

dataset 에서 scanner - 1 (컬럼 선택)

result.scanner(columns=["id",'year']).to_table().to_pandas()

 

dataset 에서 scanner - 2 (행필터)

result.scanner(filter=ds.field("year") > 2020).to_table().to_pandas()

dataset 에서 scanner - 3 (변환)

result.scanner(columns={"n_legs_unit" : ds.field("n_legs")}).to_table().to_pandas()

 

dataset 에서 to_table (필터)

expression = ds.field("is_fraud") == 1
dataset.to_table(filter=expression).to_pandas().head()

dataset 에서 to_table (컬럼 변환)

projection = dict(
    a= ds.field("is_fraud"),
    a_as_str = ds.field("is_fraud").cast("string")
)
dataset.to_table(columns=projection).to_pandas().head()

 

 

dataset에서 pandas 로 변환하기

dataset.to_table().to_pandas()

 

 

 

728x90