Python) pyarrow 사용 방법
2022. 12. 21. 22:21ㆍ분석 Python/구현 및 자료
2022.11.26 - [분석 Python/구현 및 자료] - Python) pyarrow 다뤄보기
2022.12.21 - [분석 Python/구현 및 자료] - Python) pyarrow 사용 방법
라이브러리
import numpy as np
import pyarrow.parquet as pq
import pandas as pd
from sklearn.datasets import make_classification
데이터 생성
X_pd= pd.DataFrame(X,columns =[ f"feature_{i}" for i in range(X.shape[1])])
X_pd['class'] =y
파티션별로 저장
X_pd.to_parquet(
path="./test.parquet",
engine='pyarrow',
compression='snappy',
partition_cols=['class']
)
pyarrow 로 parquet 파일 읽기
read_table
read_table을 하는 경우 Table로 나옴
df = pq.read_table(source="./test.parquet", use_threads=True)
파일 크기 비교
판다스와 parquet 파일 크기 비교
import sys
print(sys.getsizeof(df)) # 89348
print(sys.getsizeof(df.to_pandas())) # 82268
스키마를 정해서 읽기
import pyarrow as pa
schema = pa.schema([
('feature_1', pa.float32()),
('feature_2', pa.float32()),
('feature_3', pa.float32()),
('feature_4', pa.float32()),
('feature_0', pa.float32()),
('class', pa.int32()),
])
df = pq.read_table(source="./test.parquet", use_threads=True,schema=schema)
타입을 정해서 읽을 경우, 용량이 절반 정도 줄어든 것을 알 수 있음.
import sys
print(sys.getsizeof(df)) # 48686
print(sys.getsizeof(df.to_pandas())) # 48144
Filtering
데이터 필터를 하고, 특정 컬럼만 뽑기
df = pq.read_table(source="./test.parquet",
use_threads=True,
filters=[('class','=',0)],
columns=['feature_0','feature_1','class'])
특정 인덱스만 뽑기
df.take([0,1,10]).to_pandas()
행 개수 세기
df.num_rows
# 503
저장
parquet으로 저장하기
X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
path="./test2",
engine='pyarrow',
compression='snappy'
# partition_cols=['class'],
# row_group_size=100,
)
파티션별로 저장하기
class 별로 저장이 됨
기본적으로 partition 되는 개수는 제한이 되어 있는 것으로 확인됨.
/test2/class==1
/test2/class==0
라는 폴더가 생기고 그 안에 parquet 파일들로 이루어짐
X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
path="./test2",
engine='pyarrow',
compression='snappy'
partition_cols=['class'],
# row_group_size=100,
)
row group size별로 저장하기
한 parquet 안에 row group size 별로 나눠서 저장할 수 있어서 특정 row group size만 빠르게 불러올 수 있음.
X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
path="./test2",
engine='pyarrow',
compression='snappy'
# partition_cols=['class'],
row_group_size=100,
)
파티션 및 row group size 정하기
- 실제로 테스트는 안해봤지만, 파티션과 row group size 가 동시에 될 것으로 기대함
X , y = make_classification(n_samples=1000, n_features=5,n_classes=2)
X_pd= pd.DataFrame(X,columns =[ f"feature_1{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd.to_parquet(
path="./test2",
engine='pyarrow',
compression='snappy',
partition_cols=['class'],
row_group_size=100,
)
Table
Tabel) 정의
import pyarrow as pa
t = pa.table([
pa.array(['a','a','b','b','c']),
pa.array([1,2,3,4,5])
],names=["keys","values"])
Table) 컬럼 제거
dataset = pq.ParquetDataset("./test2",use_legacy_dataset=False)
pq_table = dataset.read() # pyarrow.Table
pq_table.remove_column(0)
Table) GROUP BY 를 통한 집계(aggregate) -1
result= t.group_by("keys").aggregate([
("values","sum"),
("values","max")
])
result
Table) GROUP BY 를 통한 집계(aggregate) -2
pq_table.group_by("class").aggregate([("class","count")]).to_pandas()
pq_table.group_by("class").aggregate([("class","count"),("feature_10","mean")]).to_pandas()
Table) GROUP BY 를 통한 집계(aggregate) null 값 처리 -3
import pyarrow.compute as pc
table_with_nulls = pa.table([
pa.array(['a','a','a']),
pa.array([1,None ,None])
], names = ['keys','values'])
result= table_with_nulls.group_by(["keys"]).aggregate(
[
("values","count",pc.CountOptions(mode="all")),
("values","count",pc.CountOptions(mode='only_valid'))
]
)
Table) pandas로 변환하기
result.to_pandas()
Table) Join
import pyarrow as pa
table1 = pa.table({'id': [1, 2, 3],
'year': [2020, 2022, 2019]})
table2 = pa.table({'id': [3, 4],
'n_legs': [5, 100],
'animal': ["Brittle stars", "Centipede"]})
joined_table = table1.join(table2, keys="id")
table1.join(table2, keys='id', join_type="full outer")
Table) Append Column
table2_withyear = table2.append_column("year", pa.array([2019, 2022]))
Table) Join two keys
result = table1.join(table2_withyear, keys=["id", "year"])
Table) Filter
import pyarrow.compute as pc
even_filter = (
pc.bit_wise_and(
pc.field("nums"), pc.scalar(1)) == pc.scalar(0))
table = pa.table({'nums': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'chars': ["a", "b", "c", "d", "e", "f", "g", "h", "i", "l"]})
table.filter(even_filter)
table.filter(~even_filter)
dataset
dataset 정의
dataset = pq.ParquetDataset("./test2",use_legacy_dataset=False)
dataset 파일 확인하기
dataset.files
['./test2/class=0/2c56a8feebb849e0be8c767622239ff5-0.parquet',
'./test2/class=1/2c56a8feebb849e0be8c767622239ff5-0.parquet']
dataset row 개수 세기
result.count_rows()
dataset metadata
pd.DataFrame(dataset.schema.pandas_metadata['columns'])
import pandas as pd
meta_info = eval(dataset.schema.metadata[b'pandas'].decode('utf-8').replace("null",'None'))
pd.DataFrame(meta_info['columns'])
앞에 10개 행만 추출하기
pq_file = pq.ParquetFile(pq.ParquetDataset('./test2',use_legacy_dataset=False).files[0])
first_ten_rows = next(pq_file.iter_batches(batch_size=10))
first_ten_rows.to_pandas()
dataset.read().slice(0,10).to_pandas()
dataset에서 필터하고 데이터 가져오기
dataset = pq.ParquetDataset("./test2",use_legacy_dataset=False,
filters=[("feature_10",">",0)])
dataset.files
dataset.read().to_pandas()
dataset.read_pandas().to_pandas()
dataset join
import pyarrow.dataset as ds
ds1 = ds.dataset(table1)
ds2 = ds.dataset(table2)
# joined_ds = ds1.join(ds2, key="id",join_type="left outer")
result= ds1.join(right_dataset=ds2,keys=['id'],join_type="left outer")
dataset 에서 scanner - 1 (컬럼 선택)
result.scanner(columns=["id",'year']).to_table().to_pandas()
dataset 에서 scanner - 2 (행필터)
result.scanner(filter=ds.field("year") > 2020).to_table().to_pandas()
dataset 에서 scanner - 3 (변환)
result.scanner(columns={"n_legs_unit" : ds.field("n_legs")}).to_table().to_pandas()
dataset 에서 to_table (필터)
expression = ds.field("is_fraud") == 1
dataset.to_table(filter=expression).to_pandas().head()
dataset 에서 to_table (컬럼 변환)
projection = dict(
a= ds.field("is_fraud"),
a_as_str = ds.field("is_fraud").cast("string")
)
dataset.to_table(columns=projection).to_pandas().head()
dataset에서 pandas 로 변환하기
dataset.to_table().to_pandas()
728x90
'분석 Python > 구현 및 자료' 카테고리의 다른 글
python) metaflow 파이썬 스크립트에서 실행해보기 (0) | 2023.02.10 |
---|---|
Python) multiprocessing 코어 수 제한해서 돌리기 (0) | 2023.02.04 |
Python) pyarrow 다뤄보기 (0) | 2022.11.26 |
(진행중) SHAP (Shapley Additive exPlanations) 이해하기 (1) | 2022.11.21 |
[Pandas][꿀팁] string 데이터를 pandas data frame으로 바꾸기 (1) | 2022.09.09 |