
Introducing Dynamic Batching: Auto-Tuning for Daft Pipelines
I Got Tired of Tuning Batch Sizes, So I Made Them Tune Themselves
by Cory GrinsteadTLDR
Frustrated with manually tuning your data pipelines? Messing with custom batch sizes often leads to performance bottlenecks, out-of-memory errors, and poor progress visibility, so I implemented dynamic batching in Daft to avoid all of that hassle. Inspired by inference servers, this system automatically adapts batch sizes based on real-time execution metrics, latency targets, and workload variations, achieving performance comparable to hand-tuned setups while eliminating the need for user intervention and enhancing overall user experience.
You can enable dynamic batching in daft today with:daft.set_execution_config(enable_dynamic_batching=True)
The Problem: Babysitting Batch Sizes
If you've ever run AI workloads on batch data, you know the drill. You write your pipeline, plug in your model, and hit run. It's slow. Really slow.
So you start tuning...
- •
Batch size 32? Still slow.
- •
Batch size 128? Faster, but now you're OOMing on certain inputs.
- •
Batch size 64? Better, but you know you're leaving performance on the table.
Then you switch models. Or datasets. Or add a preprocessing step. And suddenly your carefully tuned batch size is terrible again. But the worst part isn't just the performance - it's the complete lack of visibility into what's happening.
A Real Example from Our Users
Here's an actual issue reported from a user running a pipeline with OpenAI's API:
1df = df.with_column(2 "classification",3 prompt(4 messages=col("prompts"),5 model="gpt-4",6 )7)
Their complaint:
"The pipeline just hangs. 0 rows out. No progress, no metrics, nothing. I can't tell if it's working or broken."
What was actually happening?
The source was creating large batches, and the UDF operator with default batch settings tried to process a massive chunk at once. That single batch could take 5 minutes or even 20 minutes to complete.
During that time: no progress updates, no metrics, no indication anything was working. Just silence. The pipeline wasn't hung - it was processing a huge batch that would eventually complete. But from the user's perspective, it looked completely broken.
The workaround?
Manually add .into_batches(100) to force smaller batch sizes. Now they get progress updates every 100 rows instead of waiting for the entire massive batch.
This is exactly backwards. Users shouldn't need to understand batch size optimization and manually tune it just to see if their pipeline is working. The system should handle this automatically.
The Core Issue
As a user of Daft (and as someone building it), this drove me crazy. We had UDFs with adjustable batching, but you had to manually fiddle with the batch sizes to get reasonable pipeline performance. And it always varied based on the model, the UDF, the input data, or whatever else was in the mix.
I found it deeply annoying that I, as a user, needed to think about tuning this knob at all.
Sometimes users legitimately care about batch sizes. Their model might expect exact dimensions, or they have specific memory constraints. But most of the time, users care about their data and models, not finding the magic number that makes their pipeline fast. Batch size should be an optimization the system handles, not a required configuration parameter.
Why Manual Tuning Doesn't Scale
The problem is that AI workloads are fundamentally different from traditional data processing:
- •
Variable compute costs: Processing 100 images through a vision model might take 500ms or 5 seconds depending on resolution, model complexity, and hardware.
- •
Memory constraints: Larger batches are more efficient but risk OOM errors. The optimal size depends on your specific model and hardware.
- •
Pipeline dynamics: Your AI operation is just one step in a larger pipeline. A poorly tuned batch size creates a bottleneck that slows down everything downstream.
- •
Workload diversity: Different models need different batch sizes. Same model on different data needs different batch sizes. You can't pick one number and call it done.
The traditional approach makes users experts in batch optimization: pick a size, benchmark, adjust, repeat. This works for ML engineers running isolated experiments, but it's poor UX for data engineers who just want to process data efficiently.
The Insight: Batch Sizes Should Adapt to Workloads
The insight didn't come from data processing - it came from watching how inference servers work.
Modern inference servers like vLLM and TensorRT-LLM use dynamic batching to maximize GPU utilization. They don't ask users to configure batch sizes. Instead, they continuously adjust batching based on incoming request patterns, latency targets, and available memory. The server observes what's happening and adapts in real-time.
This is crucial for serving workloads where request patterns are unpredictable. You can't pre-configure the perfect batch size because you don't know if you'll get 10 requests per second or 1000. The system needs to figure it out dynamically.
I realized: data pipelines have the exact same problem.
When you're processing batch data through AI models, you face the same challenges inference servers solve:
- •
Variable compute costs depending on inputs
- •
Memory constraints that change based on batch composition
- •
Different models with different optimal batch sizes
- •
Workload characteristics that change over time
The difference is that inference servers figured this out years ago, while data processing systems still make users manually configure batch sizes.
So the insight was simple: what if we borrowed dynamic batching from inference servers and applied it to data pipelines?
The system has all the information it needs - every batch execution gives us timing data, we know our latency targets, we can measure memory pressure. Why should users manually tune based on this information when the system can adapt automatically?
This isn't revolutionary. It's just applying a proven technique from one domain (model serving) to another (data processing) where it's equally applicable but largely missing.
How Dynamic Batching Works
So how do we actually implement this in a data processing system? Turns out, we already had most of the pieces in place.
Our execution model uses buffers between operators and configurable "morsel sizes" (our term for batch size requirements) per operator.
A typical pipeline looks like:

Typical Pipeline: Source > Buffer > Project > Buffer > Sink
The source operator has a strict requirement of 1024 rows. It always sends chunks of exactly that size. The project operator is flexible - it can handle anywhere from 1 to 128,000 rows. The sink is unbounded - it takes whatever it gets.
Each buffer sits between operators and accumulates rows. When the downstream operator is ready, it checks if the buffer has enough data to meet its requirements:
- •
Below lower bound: Wait for more data
- •
Within range: Pull all available data as one batch
- •
Above upper bound: Pull exactly upper_bound rows, leave the rest
This already worked well for static configuration. The challenge was making it dynamic.
The Key Components
The Buffer
Our RowBasedBuffer already tracked lower and upper bounds. We just added update_bounds() so the bounds could change at runtime:
1pub fn update_bounds(&mut self, lower_bound: usize, upper_bound: usize) {2 self.lower_bound = lower_bound;3 self.upper_bound = upper_bound;4}
The Workers
Workers execute batches and record timing information. After each batch completes, they report the execution stats (batch size and duration) to the batch manager:
1let now = Instant::now();2let result = op.execute(morsel.clone(), state, &task_spawner).await;3let elapsed = now.elapsed();45batch_manager.record_execution_stats(6 runtime_stats.clone(),7 mp.len(),8 elapsed,9);
The Dispatcher
The dispatcher coordinates between the input buffer and workers. After sending each batch to a worker, it recalculates the optimal batch size and updates the buffer bounds:
1while let Some(morsel) = receiver.recv().await {2 buffer.push(&morsel);34 while let Some(batch) = buffer.next_batch_if_ready()? {5 send_to_worker(batch).await;67 let new_requirements = batch_manager.calculate_batch_size();8 let (lower, upper) = new_requirements.values();910 buffer.update_bounds(lower, upper);11 }12}
How it Adapts
The dispatcher and workers don't directly communicate. Instead:
- 1.
Workers record: Each time a worker finishes processing a batch, it records the execution stats (batch size, duration) to a shared collection
- 2.
Dispatcher reads: When the dispatcher calls
calculate_batch_size(), it reads the accumulated execution stats - 3.
Strategy calculates: The batching strategy analyzes recent performance and determines new bounds
- 4.
Buffer updates: The dispatcher updates the buffer bounds for the next batch
This happens continuously as the pipeline runs. If batch execution suddenly gets slower (maybe you hit more complex data), the strategy notices and reduces batch sizes. If execution speeds up, batch sizes increase to maximize throughput.
Operator Independence
Each operator maintains its own dynamic batching state. There's no global coordination. The project operator's batching decisions don't directly affect the source or sink.
However, there's implicit coordination through backpressure. If the project operator decides it wants smaller batches, it pulls less frequently from its input buffer. The buffer fills up. Eventually, the source operator slows down because its output buffer is full.
This creates natural flow control without explicit coordination.
Strict Requirements as Barriers
Operators with strict requirements (like the source with strict 1024) don't participate in dynamic batching - they always send fixed-size chunks. But this doesn't break the system. The buffers handle the impedance mismatch.
If the source sends 1024-row chunks but the project wants 500-row batches, the buffer accumulates the 1024 rows, then the dispatcher pulls 500 and leaves 524 in the buffer for the next batch.
This means you can mix static and dynamic operators in the same pipeline. The dynamic parts optimize themselves while the static parts provide stable boundaries.
The Algorithm: Latency-Constrained Optimization
The batching strategy uses a latency-constrained binary search algorithm adapted from recent research on LLM inference optimization [1].
The core idea: find the largest batch size that keeps latency within acceptable bounds.
The algorithm maintains a search space of valid batch sizes and adjusts it based on recent performance:
- •
If batches are taking too long (say, averaging 8 seconds when we want 5), contract the search space downward - we're processing too much
- •
If batches finish well under target, expand upward - we have headroom
- •
If we're close to target, tighten the search around the current point
The next batch size is always the midpoint of the search space. This means aggressive exploration when far from optimal, conservative adjustments when close.
For UDF operations, we use more conservative parameters since they tend to be more expensive. For details on the algorithm and parameter tuning, see the original paper.
Why This Works for AI Workloads.
AI operations have highly variable costs. Processing 100 small images might take 500ms. Processing 100 large images might take 5 seconds. Same batch size, completely different latency.
The latency-constrained approach handles this naturally. When the algorithm encounters expensive batches, it sees the latency spike and reduces batch size. When it encounters cheap batches, it sees the low latency and increases batch size.
The system continuously adapts to the actual characteristics of the data flowing through it, not some predetermined configuration.
Results: Before and After
Running the AI benchmarks with dynamic batching enabled showed performance roughly equivalent to the hand-tuned configurations -- within ±2%. This is the desired outcome. The benchmarks had carefully tuned batch sizes, and dynamic batching converged to similar values automatically.
TLDR; Peak throughput on well-tuned pipelines was roughly equivalent - if you'd already found that batch size 128 was optimal, the algorithm converges to something similar.
The value isn't making optimally-tuned pipelines faster, it's making untuned pipelines not require tuning in the first place.
The Real Win: User Experience
The biggest improvement shows up in metrics that matter for actually using the system:
Time to first output: Massively improved. Instead of waiting for a huge batch to complete before seeing any results, dynamic batching starts with small batches and scales up. For the OpenAI API case from the intro, instead of waiting 20 minutes for the first progress update, users now see outputs typically within the first 10 seconds.
Time to first log/metric: Similarly improved. Progress bars update at least every 5 seconds (meaning they update every 5 seconds or more frequently)
This is the difference between "is my pipeline broken?" and "oh good, it's working."
The Real Metric: Engineering Time Saved
The actual value isn't in microseconds of throughput improvement. It's in the hours you don't spend:
- •
Profiling your pipeline to find the bottleneck
- •
Tweaking batch sizes and re-running
- •
Debugging why your pipeline appears hung
- •
Re-tuning when you change models or data
For a data engineer who just wants to run inference over their data, going from "spend 30 minutes tuning batch sizes" to "it just works" is the entire point.
The system performs about as well as a well-tuned manual configuration, but you get there immediately instead of after trial and error.
Try it yourself
Dynamic batching is behind a feature flag. Enable it in the execution config:
1daft.set_execution_config(enable_dynamic_batching=True)
I'm working on additional batching strategies for specific workloads - more to come soon.