Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Duplicate
-
0.14.1
-
None
-
None
Description
My Spark workloads produce small-to-moderately-sized Parquet files with typical on-disk sizes in the order of 100-300MB, and I use PyArrow to process these files further.
Surprisingly, I find that similarly-sized Parquet files sometimes take vastly different amounts of memory and time to load using pyarrow.parquet.read_table. For illustration, I've uploaded 2 such parquet files to s3://public-parquet-test-data/fast.snappy.parquet and s3://public-parquet-test-data/slow.snappy.parquet.
Both files have about 1.2 million rows and 450 columns and occupy 100-120MB on disk. But when they are loaded by read_table:
- fast.snappy.parquet takes 10-15GB of memory and 5-8s to load
- slow.snappy.parquet takes up to 300GB (!!) of memory and 45-60s to load
Since I have been using the default Snappy compression in all my Spark jobs, it is unlikely that the files differ in the their compression levels. That the on-disk sizes are similar suggests that they are similarly compressed. So the fact that slow.snappy.parquet takes 10-20x amounts of resources to read is very surprising.
My benchmarking code snippet is below. I'd appreciate your help to troubleshoot this matter.
```
{python}from pyarrow.parquet import read_metadata, read_table
from time import time
from tqdm import tqdm
FAST_PARQUET_TMP_PATH = '/tmp/fast.snappy.parquet'
SLOW_PARQUET_TMP_PATH = '/tmp/slow.snappy.parquet'
fast_parquet_metadata = read_metadata(FAST_PARQUET_TMP_PATH)
print('Fast Parquet Metadata: {}\n'.format(fast_parquet_metadata))
durations = []
for _ in tqdm(range(3)):
tic = time()
tbl = read_table(
source=FAST_PARQUET_TMP_PATH,
columns=None,
use_threads=True,
metadata=None,
use_pandas_metadata=False,
memory_map=False,
filesystem=None,
filters=None)
toc = time()
durations.append(toc-tic)
print('Fast Parquet READ_TABLE(...) Durations: {}\n'
.format(', '.join('{:.0f}s'.format(duration) for duration in durations)))
slow_parquet_metadata = read_metadata(SLOW_PARQUET_TMP_PATH)
print('Slow Parquet Metadata: {}\n'.format(slow_parquet_metadata))
durations = []
for _ in tqdm(range(3)):
tic = time()
tbl = read_table(
source=SLOW_PARQUET_TMP_PATH,
columns=None,
use_threads=True,
metadata=None,
use_pandas_metadata=False,
memory_map=False,
filesystem=None,
filters=None)
toc = time()
durations.append(toc - tic)
print('Slow Parquet READ_TABLE(...) Durations: {}\n'
.format(', '.join('{:.0f}s'.format(duration) for duration in durations)))
```