Metadata-Version: 2.1
Name: repartipy
Version: 0.1.8
Summary: Helper for handling PySpark DataFrame partition size 📑🎛️
Home-page: https://github.com/sakjung/repartipy
License: Apache-2.0
Keywords: apachespark,spark,pyspark
Author: sakjung
Author-email: ssangyu123@gmail.com
Requires-Python: >=3.7
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Dist: packaging (>=23,<24)
Requires-Dist: typing-extensions (>=4.5,<5.0)
Description-Content-Type: text/markdown

# RepartiPy

RepartiPy helps you to elaborately handle PySpark DataFrame partition size.  

## Possible Use Cases
- Repartition your DataFrame precisely, **without knowing the whole DataFrame size** (i.e. `Dynamic Repartition`)
- Estimate your DataFrame size **with more accuracy**

## Why RepartiPy
 
Although Spark [SizeEstimator](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/util/SizeEstimator.html) can be used to estimate a DataFrame size, it is not accurate sometimes. 
RepartiPy uses **Spark's execution plan statistics** in order to provide a roundabout way.
It suggests two approaches to achieve this:

- `reaprtipy.SizeEstimator`
- `reaprtipy.SamplingSizeEstimator`

### reaprtipy.SizeEstimator
Recommended when your executor resource (memory) is affordable to cache the whole DataFrame. 
`SizeEstimator` just simply caches the whole Dataframe into the memory and extract the execution plan statistics.

### repartipy.SamplingSizeEstimator
Recommended when your executor resource (memory) is ***NOT*** affordable to cache the whole dataframe.
`SamplingSizeEstimator` uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:

1. Prevent double read from the source like S3, which might be inefficient -> better performance
2. Reduce partition skewness by reading data again on purpose (leverage [MaxPartitionBytes](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options)) -> better sampling result

Therefore, **you must have HDFS settings on your cluster and enough disk space.** 

This may not be accurate compared to `SizeEstimator` due to sampling. 
If you want more accurate results, tune the `sample_count` option properly.
Additionally, this approach will be slower than `SizeEstimator` as `SamplingSizeEstimator` requires disk I/O and additional logics.

# How To Use
## Setup
```shell
pip install repartipy
```
### Prerequisite
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
input_data = [
        (1, "Seoul"),
        (2, "Busan"),
    ]
df = spark.createDataFrame(data=input_data, schema=["id", "location"])
```

### get_desired_partition_count()
***Calculate ideal number of partitions for a DataFrame*** 

SizeEstimator will suggest `desired_partition_count`, so that each partition can have `desired_partition_size_in_bytes` (default: 1GiB) after repartition.
`reproduce()` produces exactly the same `df`, but internally reproduced by SizeEstimator for better performance. 
`SizeEstimator` reproduces `df` from **Memory** (Cache). 
`SamplingSizeEstimator` reproduces `df` from **Disk** (HDFS).

#### with SizeEstimator
```python
import repartipy

one_gib_in_bytes = 1073741824

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
```
#### with SamplingSizeEstimator
```python
import repartipy
    
one_gib_in_bytes = 1073741824

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
```

### estimate()
***Estimate size of a DataFrame***
#### with SizeEstimator
```python
import repartipy

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    df_size_in_bytes = se.estimate()
```
#### with SamplingSizeEstimator
```python
import repartipy

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    df_size_in_bytes = se.estimate()
```

# Benchmark

Overall, there appears to be a slight performance loss when employing RepartiPy. 
This benchmark compares the **running time of spark jobs** in the following two cases to give a rough estimate:
- **Static** Repartition (repartition without RepartiPy)
```python
# e.g.
df.repartition(123).write.save("your/write/path")
```
- **Dynamic** Repartition (repartition with RepartiPy)
```python
# e.g.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
```
All the other conditions remain the same **except the usage of RepartiPy**.

> **Note**
> 
> Benchmark results provided are for brief reference only, not absolute.
> Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).

## SizeEstimator
- DataFrame Size ~= 256 MiB (decompressed size)

|              | Static  | Dynamic |
|:------------:|:-------:|:-------:|
| Running Time | 8.5 min | 8.6 min |
 

## SamplingSizeEstimator
- DataFrame Size ~= 241 GiB (decompressed size)

|              | Static | Dynamic |
|:------------:|:------:|:-------:|
| Running Time | 14 min | 16 min  |

