Back to Blog
February 6, 2026
Tuning Daft's Distributed UDFs: Lessons from ByteDance

Tuning Daft's Distributed UDFs: Lessons from ByteDance

by ByteDance Volcano Engine

Translated and adapted from an article by the Volcengine LAS Team (火山引擎LAS团队) at ByteDance.
Original article (Chinese).


TLDR

When running Daft UDFs on Ray, three parameters interact in non-obvious ways: partition count, batch_size, and concurrency. If they're misaligned you get data skew, idle workers, or both.

The core insight is that data flows through a pipeline: partitions become tasks, tasks get assigned to workers, then each task's data is sliced into morsels by batch_size and round-robined to local UDFActors. So you need enough partitions to cover all workers, and a small enough batch_size that each task produces at least as many morsels as local actors.

The practical formula: (concurrency / num_workers) * batch_size = rows_per_task. Hit that and every actor stays busy with even data distribution. into_batches() can help rebalance when partitions are uneven or too few, but adds overhead if you don't actually need it.

In AI data processing workflows, multimodal data — text, images, point clouds, audio — has become a core concern. Daft, as a lightweight and efficient distributed multimodal data processing framework, makes User Defined Functions (UDFs) the key mechanism for supporting these complex operations.

Within the Daft ecosystem, UDFs power custom multimodal processing such as image feature extraction, text cleaning, and cross-modal data joins. The framework's low-barrier design means developers don't need to understand the distributed internals — they simply wrap their business logic in a concise function and plug it into a Daft pipeline.

Daft offers several UDF variants to suit different development patterns and use cases: Batch UDFs, Row-wise UDFs, and Async UDFs, all with seamless switching between local debugging and distributed deployment.

However, getting strong performance from Daft UDFs is not plug-and-play. Performance depends heavily on proper parameter tuning — especially two parameters: batch_size (the maximum number of rows per UDF invocation) and concurrency (the number of parallel UDF workers).

These two parameters require an understanding of Daft's runtime mechanics:

  • batch_size controls the maximum data volume per UDF call. Set it too large and you risk out-of-memory errors; too small and you pay excessive function-call overhead.

  • concurrency controls how many UDF tasks run in parallel. Too high and you hit resource contention (CPU, GPU, or network bandwidth); too low and you leave distributed resources idle.

If these parameters don't match your data characteristics (per-record size, total volume) and hardware, you'll see anything from degraded throughput to task stalls and crashes. Developers need to tune based on their specific workload, the data modalities, processing complexity, and available resources, to fully exploit Daft's distributed capabilities.

A Working Example

Daft supports running UDFs in both local and distributed (Ray) modes, with some subtle scheduling differences. Distributed execution relies on Ray's scheduling infrastructure.

The following example demonstrates a Batch UDF running on Ray, and illustrates the parameters we need to think about. (Daft version: v0.6.4)

1import logging
2import time
3import os
4
5def configure_logging():
6 logging.basicConfig(
7 level=logging.INFO,
8 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
9 datefmt='%Y-%m-%d %H:%M:%S.%s'
10 )
11 logging.getLogger("tracing.span").setLevel(logging.WARNING)
12 logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
13 logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
14 logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
15 logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)
16 logging.getLogger("MyBatchUdf").setLevel(logging.WARNING)
17
18configure_logging()
19
20import daft
21from daft import col, Series
22
23@daft.udf(return_dtype=daft.DataType.string())
24class MyBatchUdf:
25 def __init__(self, sleep: int = 0):
26 self._id = id(self)
27 self.counter = 0
28 self.sleep = sleep
29 self.logger = logging.getLogger(f"MyBatchUdf-{self._id}")
30
31 def __call__(self, a: Series) -> Series:
32 self.counter += len(a)
33 self.logger.info(f"Actor got {len(a)} records, total: {self.counter}")
34 if self.sleep > 0:
35 time.sleep(self.sleep)
36 return a
37
38if __name__ == "__main__":
39 if os.getenv("DAFT_RUNNER", "native") == "ray":
40 import ray
41 ray.init(runtime_env={
42 "worker_process_setup_hook": configure_logging
43 })
44
45 partition = 2
46 udf_concurrency = 4
47 udf_batch_size = 200
48 sleep_seconds = 2
49
50 paths = [f"file-{i}" for i in range(1000)]
51 df = daft.from_pydict({"path": paths})
52 df = df.repartition(partition, "path")
53
54 udf = (
55 MyBatchUdf
56 .override_options(batch_size=udf_batch_size)
57 .with_concurrency(udf_concurrency)
58 .with_init_args(sleep=sleep_seconds)
59 )
60 df = df.with_column("batch", udf(col("path")))
61 df.collect()

This MyBatchUdf maintains a counter of processed records, accepts a string column, and returns it unchanged. The pipeline creates a 1,000-row DataFrame with a single path column, repartitions it into 2 partitions, then applies a UDF configured with concurrency=4 and batch_size=200. In theory, each of the 4 workers should process 250 rows.

What Goes Wrong: Three Scenarios

Scenario 1 — Data Skew

Running this on Ray in single-node mode produces the following logs. Four UDFActor instances start, but they process [110, 200, 290, 400] rows respectively — far from the expected 250 each.

(UDFActor pid=80511) Actor got 200 records, total: 200
(UDFActor pid=80510) Actor got 200 records, total: 200
(UDFActor pid=80509) Actor got 110 records, total: 110
(UDFActor pid=80512) Actor got 200 records, total: 200
(UDFActor pid=80510) Actor got  90 records, total: 290
(UDFActor pid=80512) Actor got 200 records, total: 400
``

Two UDF instances get called twice, with the gap between calls matching the configured sleep time — confirming that a single UDF instance processes its batches serially. If per-record processing time is uniform, the instance handling 400 rows becomes the long tail, and toward the end of the job only one worker is active.

The skew comes from repartition. Hash-based repartitioning on the path column produces partitions of 510 and 490 rows. Each partition is then independently sliced into batches of 200, yielding [200, 200, 110, 200, 200, 90] — six batches distributed across four workers. Since each partition's slicing is independent, the last batch of each partition is much smaller than the rest.

Without repartitioning, the single-partition result is slightly better — [200, 200, 200, 400] — but still uneven since 5 batches map onto 4 workers.

Scenario 2 — Actual Concurrency Below Target (Multi-Worker)

On a Ray cluster with two workers and the same single-partition DataFrame, only 2 of the 4 UDFActors receive data, processing [400, 600] rows:

(UDFActor pid=13825) Actor got 200 records, total: 200
(UDFActor pid=13827) Actor got 200 records, total: 200
(UDFActor pid=13825) Actor got 200 records, total: 400
(UDFActor pid=13827) Actor got 200 records, total: 400
(UDFActor pid=13825) Actor got 200 records, total: 600

The DataFrame has only one partition, which generates a single task routed to one worker. UDFActors are spread across workers at plan time, and RaySwordfishActor preferentially dispatches morsels to local UDFActors. The other worker's actors sit idle.

Scenario 3 — Actual Concurrency Below Target (Batch Size Too Large)

Even on a single node, setting batch_size=500 results in only 2 concurrent workers processing [500, 500]:

(UDFActor pid=37087) Actor got 500 records, total: 500
(UDFActor pid=37086) Actor got 500 records, total: 500

With 1,000 rows split into just 2 morsels, there simply aren't enough morsels to fill 4 UDFActors.

Under the Hood: How Daft Schedules UDFs on Ray

Understanding the scheduling pipeline explains all three scenarios.

When a distributed Daft job runs, the code is first compiled into a LogicalPlan. On df.collect(), the plan is optimized by Daft's built-in optimizer, then converted into a DistributedPhysicalPlan and handed to the DistributedPhysicalPlanRunner.

The runner converts the physical plan into an executable pipeline where each plan node becomes a PipelineNode. The pipeline is split into one or more SwordfishTasks.

Generally, the number of SwordfishTasks equals the number of DataFrame partitions. Tasks are dispatched to RaySwordfishActors (one per worker) according to a scheduling policy. Under the Spread policy, tasks are distributed evenly based on worker availability.

Each RaySwordfishActor executes its SwordfishTasks via a NativeExecutor. (At this level, the Native Runner and Ray Runner behave almost identically, except that in distributed mode each SwordfishTask contains only a single partition, while the Native Runner holds all partitions.)

When executing the UDF operator within a SwordfishTask, the data is sliced into morsels (MicroPartitions) according to batch_size, then dispatched to the local worker's UDFActors via UDFActor.eval_input.

This gives us four key parameters:

  • Number of Workers — Each worker runs one RaySwordfishActor that can concurrently execute multiple SwordfishTasks (async). The plan runner distributes tasks based on each actor's load; undispatched tasks wait. Worker count and total CPU determine the maximum task-level concurrency.

  • DataFrame Partitions — Each partition maps to a SwordfishTask, so the partition count determines the pipeline's maximum parallelism. If there are fewer tasks than workers, some workers idle. You can increase partition count with repartition, into_partition, or split partitions into multiple tasks with into_batches.

  • UDF Concurrency — Controls the maximum UDF parallelism. On Ray, this determines how many UDFActors are launched. With homogeneous workers, actors are spread evenly. RaySwordfishActor only dispatches morsels to UDFActors on its own worker. If the number of morsels is less than the number of local UDFActors, some actors idle.

  • UDF Batch Size — Controls the max rows per UDF invocation. Large values require more memory and produce fewer morsels, underutilizing actors. Small values increase call overhead. This needs to be tuned to the workload.

The Fix: into_batches and Batch Size Tuning

Running df.into_batches(200).with_column("batch", udf(col("path"))) on a single-worker cluster produces:

(UDFActor pid=29906) Actor got 200 records, total: 200
(UDFActor pid=29907) Actor got 200 records, total: 200
(UDFActor pid=29909) Actor got 200 records, total: 200
(UDFActor pid=29906) Actor got 200 records, total: 400
(UDFActor pid=29906) Actor got 200 records, total: 600

Data is evenly split into 5 batches, but only 3 of 4 actors receive data. Each task produces a single morsel that gets randomly assigned to one actor — and since tasks are independent, all 5 morsels could theoretically land on the same actor.

The solution: set batch_size small enough that each task's data splits into at least as many morsels as local UDFActors. With batch_size=50, each 200-row task produces 4 morsels, round-robined across 4 actors:

(UDFActor pid=43813) Actor got 50 records, total: 50
(UDFActor pid=43815) Actor got 50 records, total: 50
(UDFActor pid=43814) Actor got 50 records, total: 50
(UDFActor pid=43816) Actor got 50 records, total: 50
(UDFActor pid=43813) Actor got 50 records, total: 100
(UDFActor pid=43815) Actor got 50 records, total: 100
...

All 4 actors process exactly 250 rows each. The combination of into_batches and a well-tuned batch_size delivers both even data distribution and full concurrency utilization.

Editors Note
The original article attributes event data distribution and full concurrency utilization to the round-robin morsel dispatch filling all 4 actors. However, the more likely explanation is a timing artifact of the example's time.sleep(2).

With batch_size=200, each task produces a single morsel that completes quickly relative to the sleep, so by the time the next morsel is ready, the previous actor is still sleeping, and the scheduler happens to reuse the same one. With batch_size=50, morsels are produced faster than the UDF can drain them (each takes 2 seconds regardless of row count), creating a backlog that naturally spreads across idle actors. If the sleep duration scaled with row count (e.g., time.sleep(0.01 * len(a))), the batch_size=200 case would likely show the same even distribution.

The underlying lesson that more morsels per task improves distribution still holds, but the mechanism is scheduling backpressure, not round-robin assignment.

Caveats with into_batches

into_batches isn't always the right tool:

  • Single-node, single-partition — No skew or multi-worker distribution issues, so into_batches just adds overhead.

  • Uneven tail batches — With two partitions of 180 and 220 rows and batch_size=200, you get tasks of [180, 200, 20].

  • Small-partition merginginto_batches can also combine small partitions: 16 partitions of 20 rows each with into_batches(200) produces two tasks of 160 rows (not one of 200 + one of 120), because the distributed plan runner creates a new task when accumulated rows exceed 80% of the batch size.

Practical Tuning Guidelines

To maximize UDF concurrency across M workers:

  • Ensure every worker receives tasks. The number of generated tasks must be >= the number of workers. By default, task count equals partition count. If partitions are few, increase them via repartition, into_partition, or use into_batches to generate more tasks.

  • Balance data across workers. Both the number of tasks and the data volume within each task should be as uniform as possible. When per-task data is balanced, make the task count an integer multiple of the worker count. When it's imbalanced, use repartition/into_partition (best for many large partitions) or into_batches (best for small-batch scenarios) to rebalance.

  • Align batch size with concurrency. Aim for: (concurrency / num_workers) * batch_size = rows_per_task. This ensures each task's data splits evenly across the local UDF Actors.

Beyond maximizing concurrency, consider runtime resource utilization:

  • Memory-heavy UDFs — Reduce batch_size to avoid OOM. Smaller batches produce more morsels, which may cause backpressure, so increase concurrency to compensate.

  • I/O-heavy UDFs — Use async operations and coroutines inside the UDF to improve CPU utilization. Increase batch_size to reduce call count, but watch for morsel starvation if the morsel count drops below the local actor count.

  • Many small files / fragmented upstream partitions — Use into_batches to merge partitions and reduce the number of UDF invocations.


This article was originally published by the Volcengine LAS Team (火山引擎LAS团队) at ByteDance on WeChat. Translated and adapted for the Daft community with attribution to the original authors.

Get updates, contribute code, or say hi.
Daft Engineering Blog
Join us as we explore innovative ways to handle multimodal datasets, optimize performance, and simplify your data workflows.
Github Discussions Forums
join
GitHub logo
The Distributed Data Community Slack
join
Slack logo