Back to Blog
January 12, 2026
Distributed Daft Engine

Distributed Model Inference with Daft

A deep dive into Daft’s distributed execution engine, Flotilla, for multimodal data pipelines

by ByteDance Volcano Engine, Colin Ho

TL;DR:

Flotilla is Daft’s distributed execution engine for multimodal data and AI workloads. It coordinates multiple single-node Swordfish engines, plans and schedules tasks itself, and executes pipelines in a streaming manner to efficiently scale data processing and model inference across a cluster.

Designing Daft for Modern Data and AI Workloads

Modern data and AI pipelines increasingly combine large-scale data processing with model inference, often across heterogeneous resources such as CPUs and GPUs. These workloads place new demands on execution engines, requiring not just scalability, but also efficient scheduling, streaming execution, and native support for multimodal data.

Daft is designed to meet these demands. It provides a Rust-based execution core with Python DataFrame and SQL APIs, supports both single-node and distributed execution, and natively understands multimodal data types and operators. Its distributed execution engine, Flotilla, builds on these foundations to efficiently execute complex pipelines across a Ray-managed cluster.

Historically, tools like Pandas served as the “Swiss Army knife” for single-node, structured data workflows, while systems such as Polars and Dask explored vertical and horizontal scaling strategies, respectively. Daft builds on the lessons of these systems, combining efficient local execution with first-class distributed support to address the needs of modern, multimodal data pipelines.

Daft combines the strengths of both approaches. Its core engine is implemented in Rust, while exposing Python DataFrame and SQL APIs. It provides both single-node and distributed execution engines, with seamless switching between the two. More importantly, Daft provides first-class support for multimodal data types and operators, enabling data processing pipelines that combine structured data, unstructured data, and model inference within a single execution engine. This allows Daft to extend beyond traditional structured data processing into multimodal workloads.

In a previous article, Exploring Daft's Local Execution: The Swordfish Engine, we used a representative image classification pipeline to walk through the internals of Daft’s single-node execution engine, Swordfish. In this article, we reuse the same example to dive into the design and implementation of Daft’s distributed execution engine, Flotilla.

Running Daft Distributed: Switching from Single-Node to Distributed Execution

Let’s revisit the image classification example.

1df = (daft
2 # Read image metadata stored in Parquet
3 .read_parquet(path="s3://ai/dataset/url_ILSVRC2012/*.parquet", io_config=IO_CONFIG)
4 # Filter images by resolution
5 .filter((col('width') >= 400) & (col('height') >= 300))
6 # Split the dataset into batches
7 .into_batches(batch_size=64)
8 # Download image bytes from URLs
9 .with_column('bytes', col('url').download(io_config=IO_CONFIG))
10 # Decode bytes into images
11 .with_column('image', col('bytes').decode_image(mode=ImageMode.RGB)).exclude('bytes')
12 # Apply image preprocessing
13 .with_column(
14 'tensor',
15 col('image').apply(
16 func=lambda img: transforms.Compose([
17 transforms.ToTensor(),
18 transforms.Resize(256),
19 transforms.CenterCrop(224),
20 transforms.Normalize(
21 mean=[0.485, 0.456, 0.406],
22 std=[0.229, 0.224, 0.225],
23 ),
24 ])(img),
25 return_dtype=daft.DataType.tensor(dtype=daft.DataType.float32()),
26 ),
27 ).exclude('image')
28 # Run offline inference with ResNet50
29 .with_column('labels', ResNetModel(col('tensor'))).exclude('tensor')
30)
31df.write_lance(uri="/opt/workspace/data/lance")

The script is largely the same as when running locally, except that we added an `into_batches` step. This into_batches step is particularly useful when data needs to be fetched dynamically from URLs. By batching, download work can be distributed across multiple nodes, avoiding network bottlenecks on a single machine and better leveraging distributed execution.

By default, Daft runs this program in single-node mode. Switching to distributed execution requires minimal effort. Flotilla uses Ray for setting up workers in a distributed cluster. Simply set this  environment variable to enable it:

1export DAFT_RUNNER=ray

Or explicitly configure it in code:

1daft.set_runner_ray(address="ray://...")

For more details on running Daft distributed, see the documentation.

Flotilla Architecture: Core Components of Daft’s Distributed Execution Engine

Daft Architecture

At a high level, Flotilla is a distributed version of the Swordfish engine. It orchestrates multiple Swordfish single-node engines to execute a query end-to-end.

On each worker node, it launches a long-lived process called a SwordfishActor. This actor receives and executes single-node physical execution plans. When a query is submitted, Flotilla decomposes the distributed physical plan into a set of SwordfishTasks. These tasks are scheduled and dispatched to the appropriate SwordfishActor instances, each of which runs a Swordfish engine locally to execute its assigned plan fragment.

On the head node of the cluster lives the Flotilla scheduler, which is responsible for running distributed query plans across the pool of SwordfishActors.

Most of Flotilla’s server-side components are implemented in Rust and include:

  • WorkerManager: Interfaces with the cluster. It tracks worker nodes, starts SwordfishActor instances, submits tasks, and handles cluster scaling.

  • Scheduler: Determines where each SwordfishTask should run. It maintains a priority queue of pending tasks, updates worker state, and triggers scaling when necessary.

  • Dispatcher: Groups tasks by target worker and submits them in batches via WorkerManager.

From Logical Plans to Distributed Physical Plans in Daft

Returning to our image classification example, let’s examine how Flotilla executes it internally.

As discussed in the Swordfish article, Daft first converts the user-defined DataFrame into a logical execution plan. This plan captures semantics but is not directly executable. It must be transformed into a physical execution plan and then further decomposed into executable tasks.

When targeting Flotilla, Daft generates a distributed physical plan. Compared with the single-node Swordfish plan, the construction process is similar, though there are subtle differences. The engine performs a depth-first traversal of the logical plan tree, mapping each logical operator to a physical operator.

The logical plan tree

Because a physical plan is a static tree, it cannot be executed directly. Flotilla, therefore, splits it into a set of executable SwordfishTasks. Conceptually, the plan is divided into multiple stages, each consisting of a pipeline of operators.

Task Generation by Operator Type

During plan traversal, each node’s produce_tasks method generates tasks. Some representative examples:

  • ScanSourceNode: Scan operators are materialized into multiple ScanTasks based on the data source. Each is wrapped as a SwordfishTask.

  • IntoBatches: This operator reshapes partitions to enforce a target batch size. It introduces a stage boundary because it may change task parallelism. Implementation is split into local and global phases. Even though two stages are created, Flotilla executes them in a streaming fashion. Stage 1 tasks are generated and scheduled as soon as enough data is available, rather than waiting for Stage 0 to finish entirely.

  • ActorUDF: UDFs with a configured concurrency are converted into ActorUDF nodes. Flotilla launches the specified number of UDF actors, initializes them, and wraps the node in a DistributedActorPoolProject before generating tasks.

In general, task generation turns a distributed physical plan into multiple single-node plan fragments. Each fragment is executed as one or more SwordfishTasks, with parallelism usually determined by the source operators.

Not all stage boundaries can be streamed. For example, broadcast joins require the full small table to be materialized before execution can proceed. In these cases, Flotilla blocks upstream execution, materializes the broadcast data into in-memory scans, and then attaches join operators downstream.

Task Scheduling and Execution in Flotilla

Flotilla performs planning, scheduling, and state management on the Head node. Actual execution happens on Worker nodes.

Cluster State Awareness

WorkerManager periodically collects snapshots of all worker nodes and their resources, including CPU, GPU, and memory. For each worker, Flotilla launches a SwordfishActor that runs a local Swordfish engine and executes assigned tasks.

Task Submission

Tasks are submitted in batches using submit_tasks_to_workers. Each task is represented as a SwordfishTask, which includes the physical plan fragment, resource requirements, configuration, and partition metadata.

On the worker, SwordfishActor invokes its asynchronous run_plan method. This creates a NativeExecutor, which executes the plan using the Swordfish engine. The details of NativeExecutor were covered in the Swordfish article and are not repeated here.

Managing Workers and Executing Tasks

Flotilla separates scheduling and dispatching into two components:

  • Scheduler: Decides which worker should execute each task.

  • Dispatcher: Groups tasks by worker and submits them in batches.

For each query, Flotilla creates a dedicated scheduler. Daft provides two implementations:

  • DefaultScheduler: Maximizes parallelism when resources allow.

  • LinearScheduler: Enforces strict serial execution, useful in niche scenarios.

Most workloads use DefaultScheduler.

The execution flow is as follows:

  1. 1.

    PlanRunner generates SwordfishTasks and submits them to the scheduler.

  2. 2.

    The scheduler maintains a priority queue and runs an event loop. For each task, it selects a worker based on the scheduling strategy:

    - Spread: Choose the worker with the most available resources that meets requirements.

    - WorkerAffinity: Prefer a specific worker, falling back to spread if allowed.

  3. 3.

    The dispatcher batches tasks by worker ID and submits them via WorkerManager.

  4. 4.

    The scheduler waits for at least one task to complete per loop iteration, emits results, and retries failed tasks.

The execution flow

Distributed Model Inference with Daft UDFs

In modern DATA + AI workflows, SQL alone is often insufficient. Model inference and custom logic are common, and many practitioners prefer DataFrame APIs combined with UDFs.

Daft supports Lambda, Batch, and Class UDFs. In this example, we use a Class UDF, ResNetModel, which loads a ResNet50 model for offline image classification.

When planning tasks, Flotilla launches UDF actors based on the UDF’s concurrency and resource requirements. It also injects a DistributedActorPoolProject node into the plan.

Offline image classification process

This node acts as a client that holds references to all UDF actors, both local and remote. When processing data, it first tries to route requests to local actors. If none are available, it rounds-robin requests to remote actors. This design minimizes cross-node communication while maintaining throughput.

Key Takeaways: When to Use Flotilla for Distributed Multimodal Pipelines

In this article, we extended our earlier image classification example to explore the internals of Daft’s distributed execution engine, Flotilla. By coordinating multiple Swordfish engines, Flotilla enables efficient, streaming execution of distributed multimodal workloads.

As a distributed execution engine, Flotilla is complex, and it is impossible to cover every detail in a single article. Our goal here was to outline the main execution path and provide a mental model for how Flotilla is designed and how it operates. If you want to learn more, please check our documentation.

Get updates, contribute code, or say hi.
Daft Engineering Blog
Join us as we explore innovative ways to handle multimodal datasets, optimize performance, and simplify your data workflows.
Github Discussions Forums
join
GitHub logo
The Distributed Data Community Slack
join
Slack logo