Multithreaded Producer-Consumer File Writing Patterns in PySpark
File write operations block the main processing threads in large-scale PySpark applications which writes waste valuable computation time as the CPU sits idle waiting for I/O operations to complete. The most obvious fix would be to use AsyncIO, but this doesn't work with PySpark's distributed JVM architecture. One solution is to implement the producer-consumer design pattern with multithreading. This gave us a 20-30% performance improvements through concurrent execution of I/O and computation.
The Problem: Blocking File Writes
In traditional PySpark workflows, file write operations block the main processing thread, creating a bottleneck that wastes valuable computational resources:
# Traditional synchronous approach
def run_simulation(self):
# Process data
simulation_table = self.create_simulation_data()
# This blocks everything else
simulation_table.data.write.parquet(output_path) # Blocks for minutes
# More processing (delayed until write completes)
transformed_table = self.transform_data(simulation_table)
transformed_table.data.write.parquet(another_path) # Blocks again
# Traditional synchronous approach
def run_simulation(self):
# Process data
simulation_table = self.create_simulation_data()
# This blocks everything else
simulation_table.data.write.parquet(output_path) # Blocks for minutes
# More processing (delayed until write completes)
transformed_table = self.transform_data(simulation_table)
transformed_table.data.write.parquet(another_path) # Blocks again
This sequential approach is particularly problematic when running large-scale simulations, where multiple large output tables are generated. Each write operation can take several minutes, during which the main thread sits idle instead of performing the next computation.
Why AsyncIO Doesn't Work with PySpark
Before diving into the solution, it's important to understand why the obvious choice asyncio
isn't viable for PySpark:
JVM Execution: PySpark operations execute on the JVM through Py4J, which doesn't integrate with Python's event loop. The driver communicates with executors through blocking RPC calls that can't be made asynchronous.
Distributed Nature: Spark operations span multiple executors across a cluster, making async coordination complex. Each DataFrame operation involves coordinating hundreds of tasks across dozens of worker nodes.
Blocking Operations: Core Spark operations like df.write.parquet()
are inherently blocking at the driver level, even though the actual work happens on executors. The driver must wait for all tasks to complete before proceeding.
Threading Model: Spark's own internal threading model conflicts with AsyncIO's single-threaded event loop approach. Spark uses its own thread pools for task scheduling and doesn't expose async interfaces.
The Solution: Producer-Consumer Threading Pattern
Instead of AsyncIO, we implement a producer-consumer pattern using Python's built-in threading
module to handle file writes in the background while allowing the main thread to continue processing.
Architecture Overview
class WriteThreadManager:
"""Manage multiple background write operations using producer-consumer pattern"""
def __init__(self, num_workers: int = 4):
self.write_queue = Queue()
self.workers = []
self.active_writes = {}
self._shutdown = False
# Start worker threads
for i in range(num_workers):
worker = Thread(target=self._write_file, name=f"write_worker_{i}")
worker.daemon = True
worker.start()
self.workers.append(worker)
class WriteThreadManager:
"""Manage multiple background write operations using producer-consumer pattern"""
def __init__(self, num_workers: int = 4):
self.write_queue = Queue()
self.workers = []
self.active_writes = {}
self._shutdown = False
# Start worker threads
for i in range(num_workers):
worker = Thread(target=self._write_file, name=f"write_worker_{i}")
worker.daemon = True
worker.start()
self.workers.append(worker)
The Worker Thread Implementation
The heart of the system is the worker thread that consumes write tasks from a shared queue:
def _write_file(self):
"""Worker thread that consumes write tasks from the queue"""
while not self._shutdown:
try:
# Get task with timeout to check shutdown condition
task = self.write_queue.get(timeout=1.0)
if task is None: # Poison pill for shutdown
break
table, start_time = task
event = self.active_writes.get(table.name)
try:
logger.info(f"Worker {current_thread().name} starting write: {table.name}")
# Perform the actual write operation
spark_df = table.data
if table.partition_column:
spark_df.write.partitionBy(table.partition_column).parquet(
f"{table.path}.parquet", mode="overwrite"
)
else:
spark_df.write.parquet(f"{table.path}.parquet", mode="overwrite")
self.completed_writes.append(table.name)
except Exception as e:
logger.error(f"Write failed for {table.name}: {str(e)}")
self.failed_writes.append((table.name, e))
finally:
if event:
event.set()
self.write_queue.task_done()
except Empty:
continue
def _write_file(self):
"""Worker thread that consumes write tasks from the queue"""
while not self._shutdown:
try:
# Get task with timeout to check shutdown condition
task = self.write_queue.get(timeout=1.0)
if task is None: # Poison pill for shutdown
break
table, start_time = task
event = self.active_writes.get(table.name)
try:
logger.info(f"Worker {current_thread().name} starting write: {table.name}")
# Perform the actual write operation
spark_df = table.data
if table.partition_column:
spark_df.write.partitionBy(table.partition_column).parquet(
f"{table.path}.parquet", mode="overwrite"
)
else:
spark_df.write.parquet(f"{table.path}.parquet", mode="overwrite")
self.completed_writes.append(table.name)
except Exception as e:
logger.error(f"Write failed for {table.name}: {str(e)}")
self.failed_writes.append((table.name, e))
finally:
if event:
event.set()
self.write_queue.task_done()
except Empty:
continue
Non-Blocking Write Submission
The main thread submits writes without blocking, allowing computation to continue immediately:
def submit_write(self, table: OutputTable) -> None:
"""Submit OutputTable for background writing to queue"""
start_time = perf()
# Create an event to track this specific write
event = Event()
self.active_writes[table.name] = event
# Add task to queue (non-blocking)
self.write_queue.put((table, start_time))
logger.info(f"Submitted {table.name} to write queue (queue size: {self.write_queue.qsize()})")
def submit_write(self, table: OutputTable) -> None:
"""Submit OutputTable for background writing to queue"""
start_time = perf()
# Create an event to track this specific write
event = Event()
self.active_writes[table.name] = event
# Add task to queue (non-blocking)
self.write_queue.put((table, start_time))
logger.info(f"Submitted {table.name} to write queue (queue size: {self.write_queue.qsize()})")
The Critical Challenge: DataFrame Lazy Evaluation
Here's where most implementations fail: PySpark DataFrames are lazy by default. Even when submitted to background threads, they retain their computation graph, causing background workers to compete with the main thread for Spark executors.
The Problem with Unmaterialised DataFrames
# This looks like it should work, but doesn't
df = spark.read.parquet("input.parquet")
transformed_df = df.withColumn("new_col", F.col("old_col") * 2)
# Background thread still needs to recompute the entire graph
background_writer.submit_write(transformed_df) # Still blocks executors
# This looks like it should work, but doesn't
df = spark.read.parquet("input.parquet")
transformed_df = df.withColumn("new_col", F.col("old_col") * 2)
# Background thread still needs to recompute the entire graph
background_writer.submit_write(transformed_df) # Still blocks executors
When background threads attempt to write unmaterialised DataFrames, Spark must:
- Recompute the entire computation graph
- Apply all transformations from scratch
- Perform joins, aggregations, and column operations
- Compete with the main thread for executor resources
This defeats the purpose of background threading, often making performance worse due to resource contention.
The Solution: Force Materialisation
Before submitting to background threads, we must force complete materialisation:
def write_Table(self, table: OutputTable) -> None:
"""Write a table using the write manager with proper materialisation"""
# Force complete materialisation
table.data = table.data.cache()
table.data.take(1) # Fast trigger (faster than count())
# Now submit to background writer (truly non-blocking)
self.write_manager.submit_write(table)
def write_Table(self, table: OutputTable) -> None:
"""Write a table using the write manager with proper materialisation"""
# Force complete materialisation
table.data = table.data.cache()
table.data.take(1) # Fast trigger (faster than count())
# Now submit to background writer (truly non-blocking)
self.write_manager.submit_write(table)
Why take(1)
instead of count()
?
In our production system, count()
operations were taking 20+ minutes on large DataFrames because they scan every partition and every row. Alternative triggering methods offer much better performance:
count()
: Scans every partition and row (20+ minutes)take(1)
: Only processes enough partitions to get 1 row (seconds)first()
: Stops immediately after finding first row (fastest)isEmpty()
: Minimal evaluation for emptiness check
All of these trigger caching, but take(1)
provides the best balance of speed and reliability.
What Happens After Materialisation
Once properly materialised, background writes become truly non-blocking:
# Before materialisation: Background thread recomputes everything
df.groupBy("simulation_id").agg(F.sum("generation")).write.parquet(path)
# After materialisation: Background thread only does I/O
cached_df.write.parquet(path)
# Before materialisation: Background thread recomputes everything
df.groupBy("simulation_id").agg(F.sum("generation")).write.parquet(path)
# After materialisation: Background thread only does I/O
cached_df.write.parquet(path)
Complete Implementation Example
Here's how the pieces fit together in a complete example:
class SimulationProcessor:
def __init__(self, spark_session, write_manager: WriteThreadManager):
self.spark = spark_session
self.write_manager = write_manager
def run_simulation(self):
# Create first dataset
simulation_table = self.create_simulation_data()
# Submit to background (non-blocking after materialisation)
self.background_write(simulation_table)
# Continue processing immediately while write happens in background
transformed_table = OutputTable(
name='TransformedData',
data=simulation_table.data.withColumn(
'adjusted_generation', F.col('generation') * 2
),
path=f"{self.output_path}/transformed.parquet"
)
# Submit second write to background
self.background_write(transformed_table)
# More processing while both writes happen in parallel
another_table = self.create_another_dataset(transformed_table)
self.background_write(another_table)
# Only synchronize when all processing is complete
self.write_manager.wait_for_completion()
def background_write(self, table: OutputTable) -> None:
"""Submit write to background thread pool (non-blocking)"""
self.write_manager.submit_write(table)
class SimulationProcessor:
def __init__(self, spark_session, write_manager: WriteThreadManager):
self.spark = spark_session
self.write_manager = write_manager
def run_simulation(self):
# Create first dataset
simulation_table = self.create_simulation_data()
# Submit to background (non-blocking after materialisation)
self.background_write(simulation_table)
# Continue processing immediately while write happens in background
transformed_table = OutputTable(
name='TransformedData',
data=simulation_table.data.withColumn(
'adjusted_generation', F.col('generation') * 2
),
path=f"{self.output_path}/transformed.parquet"
)
# Submit second write to background
self.background_write(transformed_table)
# More processing while both writes happen in parallel
another_table = self.create_another_dataset(transformed_table)
self.background_write(another_table)
# Only synchronize when all processing is complete
self.write_manager.wait_for_completion()
def background_write(self, table: OutputTable) -> None:
"""Submit write to background thread pool (non-blocking)"""
self.write_manager.submit_write(table)
Synchronization and Error Handling
The system provides flexible synchronization options:
def wait_for_completion(self, timeout: float = None):
"""Wait for all background writes to complete"""
logger.info(f"Waiting for queue to empty (current size: {self.write_queue.qsize()})...")
# Wait for all tasks in queue to be processed
self.write_queue.join()
# Wait for all active writes to complete
for table_name, event in list(self.active_writes.items()):
if event.wait(timeout=timeout):
logger.info(f"Write completed: {table_name}")
else:
logger.warning(f"Write timeout for {table_name}")
if self.failed_writes:
logger.error(f"Failed writes: {[name for name, _ in self.failed_writes]}")
def wait_for_completion(self, timeout: float = None):
"""Wait for all background writes to complete"""
logger.info(f"Waiting for queue to empty (current size: {self.write_queue.qsize()})...")
# Wait for all tasks in queue to be processed
self.write_queue.join()
# Wait for all active writes to complete
for table_name, event in list(self.active_writes.items()):
if event.wait(timeout=timeout):
logger.info(f"Write completed: {table_name}")
else:
logger.warning(f"Write timeout for {table_name}")
if self.failed_writes:
logger.error(f"Failed writes: {[name for name, _ in self.failed_writes]}")
Resource Management and Spark Executors
A common misconception is that background threads need separate Spark contexts. In reality, all threads share the same Spark context and executor pool, which works through Spark's natural task scheduling:
How Resource Sharing Works
# Main thread: Creates 1000 computation tasks
main_tasks = simulation_rdd.map(heavy_computation) # Uses ~80 cores
# Background threads: Create 50 write tasks
write_tasks = df.write.parquet() # Uses ~20 cores
# Spark automatically distributes 1050 total tasks across 100 available cores
# Main thread: Creates 1000 computation tasks
main_tasks = simulation_rdd.map(heavy_computation) # Uses ~80 cores
# Background threads: Create 50 write tasks
write_tasks = df.write.parquet() # Uses ~20 cores
# Spark automatically distributes 1050 total tasks across 100 available cores
Spark's scheduler automatically balances tasks between main computation and background writes. The key is ensuring background writes are truly materialised so they only compete for I/O resources, not computation resources.
What Executors Do During Materialised Writes
Even with fully materialised DataFrames, Spark executors still need some resources for write operations:
Data serialization: Convert cached data to Parquet/CSV format Compression: Apply Snappy/Gzip compression algorithms Partitioning: Distribute data across multiple output files Network I/O: Send data to Azure/S3 storage File coordination: Manage concurrent writes to avoid conflicts
Resource usage comparison:
Unmaterialised DataFrame writes:
- CPU: 80-90% (recomputing entire computation graph)
- Memory: High (intermediate results, joins, aggregations)
- Network: High (Azure bandwidth)
Materialised DataFrame writes:
- CPU: 10-20% (serialization, compression)
- Memory: Minimal (streaming writes, small buffers)
- Network: High (Azure bandwidth)
Unmaterialised DataFrame writes:
- CPU: 80-90% (recomputing entire computation graph)
- Memory: High (intermediate results, joins, aggregations)
- Network: High (Azure bandwidth)
Materialised DataFrame writes:
- CPU: 10-20% (serialization, compression)
- Memory: Minimal (streaming writes, small buffers)
- Network: High (Azure bandwidth)
This is why properly materialised background writes show significant performance improvement—you're moving from competing for expensive computation cycles to competing for much cheaper I/O operations.
Performance Results
Our production implementation shows significant improvements when DataFrame materialisation is handled correctly:
# Synchronous approach (baseline)
Total execution time: 866.73 seconds
# Background threading (properly materialised)
Total execution time: 620.91 seconds
Performance improvement: 28.4% faster
# Background threading (unmaterialised DataFrames)
Total execution time: 979.82 seconds
Performance degradation: 13.0% slower than baseline
# Synchronous approach (baseline)
Total execution time: 866.73 seconds
# Background threading (properly materialised)
Total execution time: 620.91 seconds
Performance improvement: 28.4% faster
# Background threading (unmaterialised DataFrames)
Total execution time: 979.82 seconds
Performance degradation: 13.0% slower than baseline
The difference between success and failure is proper DataFrame materialisation. Without it, background threading actually hurts performance due to executor resource contention.
Best Practices and Common Pitfalls
Do This
Materialise DataFrames before background writing: Always use df.cache().take(1)
before submitting to background threads.
Use appropriate worker counts: Too many workers can cause resource contention.
Implement graceful shutdown: Use poison pills and thread joins to ensure clean shutdown.
def shutdown(self):
self._shutdown = True
# Send poison pills to all workers
for _ in range(self.num_workers):
self.write_queue.put(None)
# Wait for workers to finish
for worker in self.workers:
worker.join()
def shutdown(self):
self._shutdown = True
# Send poison pills to all workers
for _ in range(self.num_workers):
self.write_queue.put(None)
# Wait for workers to finish
for worker in self.workers:
worker.join()
Track write status: Monitor queued, active, and completed writes for debugging and performance analysis.
Handle failures gracefully: Log and track failed writes, implement retry mechanisms where appropriate.
Avoid This
Don't use separate Spark contexts: This adds unnecessary complexity and resource overhead without benefits.
Don't skip materialisation: Unmaterialised DataFrames will compete for executors and degrade performance.
Don't use count()
for triggering: Too expensive; use take(1)
or first()
instead.
Don't ignore resource limits: Monitor executor usage to ensure background writes don't overwhelm the cluster.
Don't forget error handling: Background failures can be silent; implement proper logging and monitoring.
When to Use This Pattern
This approach works best for:
I/O Heavy Workloads: Applications that generate many large output files where write time dominates total runtime.
Independent Outputs: When output generation doesn't depend on previous writes completing.
Sufficient Resources: Clusters with enough executors to handle both computation and I/O concurrently (typically 100+ cores).
When Not to Use
Limited Resources: Small clusters where background writes would starve main computation.
Sequential Dependencies: Workflows where each step depends on the previous write completing.
Simple Workloads: Applications with minimal I/O where the threading overhead isn't worth the complexity.
Conclusion
Background threading for PySpark file writes provides a practical solution when AsyncIO isn't available. The key insights for success are:
AsyncIO limitations: PySpark's JVM-based architecture makes traditional async patterns impossible.
Producer-consumer threading: Provides an effective alternative that integrates well with existing PySpark workflows.
DataFrame materialisation is critical: The difference between success and failure lies in proper materialisation before background processing.
Resource awareness: Understanding how Spark manages executor resources prevents contention and ensures true parallelization.
When implemented correctly with proper DataFrame materialisation, this approach delivers 20-30% performance improvements by allowing computation and I/O to occur concurrently rather than sequentially.
For teams building large-scale PySpark applications where I/O latency is a bottleneck, background threading provides a robust solution that scales well and integrates seamlessly with existing Databricks and Spark infrastructure.