Back to Blog
March 23, 2026
GPU Inference with @daft.cls

GPU Inference with @daft.cls

Run GPU models on millions of rows without OOM. Real patterns from ByteDance, Essential AI, and more.

by Daft Team

Essential AI used Daft to build Essential-Web v1.0 -- 23.6 billion LLM queries across 24 trillion tokens in 7 days, consuming 90,000 AMD MI300X GPU hours. Zero crashes. The pattern behind it: a Python class that loads the model once and runs it on every row. @daft.cls makes that pattern distributed.

This post shows you how. From a toy sentiment classifier to the exact configuration that powered 24 trillion tokens, the building block is the same: a stateful UDF that participates in the query plan, manages GPU allocation, and scales from a laptop to a Ray cluster with three parameters on a decorator.

This is Week 4 of the UDF series. If you are new to Daft UDFs, start with What is a UDF?, then stateless UDFs with @daft.func, then stateful UDFs with @daft.cls.

Load a Model and Run It

The simplest path from "I have a model" to "I'm running inference on a DataFrame" is four parts: imports, decorator, __init__, and a method.

import daft
from transformers import pipeline
 
@daft.cls
class SentimentClassifier:
    def __init__(self):
        self.pipe = pipeline(
            "sentiment-analysis",
            model="distilbert-base-uncased-finetuned-sst-2-english",
        )
 
    def __call__(self, text: str) -> str:
        return self.pipe(text)[0]["label"]
 
classifier = SentimentClassifier()
 
df = daft.from_pydict({"review": [
    "This product is amazing",
    "Worst purchase I've ever made",
    "It's okay, nothing special",
]})
df = df.select(classifier(df["review"]).alias("sentiment"))
df.show()

__init__ runs once per worker. __call__ runs on every row. When you write SentimentClassifier(), Daft saves your arguments but does not instantiate anything. During execution, each worker calls __init__ once, loads the model into memory, then reuses that instance for every row it processes. Load once, infer many.

This is the stateful UDF pattern. Other frameworks handle statefulness differently: PySpark has mapInPandas with grouped state, and Ray has Actors that can hold models across calls. But Ray Actors sit outside the query planner -- they are standalone objects you manage yourself. @daft.cls is a first-class operator in the query plan. It participates in optimization, predicate pushdown, and scheduling the same way a filter or join does. That means Daft can reason about your model's resource requirements, data dependencies, and execution order as part of the same plan that reads your data and writes your results.

Real Inference: Transcription with Faster-Whisper

Sentiment classification is a toy example. Here is the exact pattern from a production voice AI pipeline. The FasterWhisperTranscriber loads a Whisper model once per worker, then transcribes audio files across the cluster.

import daft
from faster_whisper import WhisperModel, BatchedInferencePipeline
 
@daft.cls()
class FasterWhisperTranscriber:
    def __init__(self, model="distil-large-v3", compute_type="float32", device="auto"):
        self.model = WhisperModel(model, compute_type=compute_type, device=device)
        self.pipe = BatchedInferencePipeline(self.model)
 
    @daft.method(return_dtype=daft.DataType.struct({
        "transcript": daft.DataType.string(),
        "segments": daft.DataType.list(daft.DataType.struct({
            "start": daft.DataType.float64(),
            "end": daft.DataType.float64(),
            "text": daft.DataType.string(),
        })),
    }))
    def transcribe(self, audio_file: daft.File):
        with audio_file.to_tempfile() as tmp:
            segments_iter, info = self.pipe.transcribe(
                str(tmp.name), vad_filter=True, batch_size=16
            )
            segments = [{"start": s.start, "end": s.end, "text": s.text}
                        for s in segments_iter]
            text = " ".join(seg["text"] for seg in segments)
            return {"transcript": text, "segments": segments}
 
transcriber = FasterWhisperTranscriber()
df = daft.from_glob_path("s3://my-bucket/audio/*.mp3")
df = df.with_column("result", transcriber.transcribe(daft.col("path")))
df.show()

A single hour of 48 kHz/24-bit stereo audio consumes 518 MB of memory. You do not want to reinitialize Whisper for every file. The @daft.method decorator lets you annotate the return type as a struct and use daft.File as input -- Daft handles the file download, the model handles the audio, and the stateful pattern keeps memory stable.

Batch Embeddings with @daft.method.batch

Some models expect tensors, not scalars. Embedding models in particular perform best when you feed them batches of text. @daft.method.batch gives you the same stateful pattern with batch-level inputs.

import daft
from daft import DataType, Series
 
@daft.cls
class TextEmbedder:
    def __init__(self, model_name: str = "Qwen/Qwen3-Embedding-0.6B"):
        from sentence_transformers import SentenceTransformer
        import torch
        self.model = SentenceTransformer(model_name)
        if torch.cuda.is_available():
            self.model = self.model.to("cuda")
 
    @daft.method.batch(return_dtype=DataType.list(DataType.float32()))
    def embed(self, texts: Series) -> list:
        text_list = texts.to_pylist()
        embeddings = self.model.encode(text_list, convert_to_numpy=True)
        return [row.tolist() for row in embeddings]
 
embedder = TextEmbedder()
 
df = daft.from_pydict({"document": [
    "Daft distributes Python across a cluster",
    "Stateful UDFs amortize model initialization",
    "Batch processing maximizes GPU throughput",
]})
df = df.select(embedder.embed(df["document"]).alias("embedding"))
df.show()

This is the same pattern used in production to embed millions of documents with Qwen3-Embedding-0.6B at near-100% GPU utilization across 8 GPU nodes. The SentenceTransformer loads once per worker, processes batches of 16 on the GPU, and writes 1024-dimensional vectors to a vector database. Adjusting batch size and worker count lets you trade off memory for throughput without rewriting any application code.

GPU Allocation and Concurrency

Running models on a CPU works for prototyping. Running them in production means GPUs, and GPUs mean resource management. @daft.cls puts all of that configuration on the decorator -- not scattered through your pipeline.

@daft.cls(
    gpus=1,
    max_concurrency=4,
    use_process=True,
)
class ImageClassifier:
    def __init__(self, model_name: str):
        import torch
        self.model = torch.hub.load("pytorch/vision", model_name, pretrained=True)
        self.model.cuda().eval()
 
    def __call__(self, image_path: str) -> str:
        image = load_and_preprocess(image_path)
        with torch.no_grad():
            output = self.model(image.cuda())
        return decode_prediction(output)

Three parameters control the worker:

  • gpus=1 -- reserves one GPU per worker instance. Daft's scheduler ensures this class only runs on nodes with available GPUs. If you are running a model that requires tensor parallelism across multiple GPUs, set gpus=2 or gpus=4 accordingly.
  • max_concurrency=4 -- limits the total number of concurrent instances across the cluster. Four instances means four GPUs loaded simultaneously. Set this to match your available GPU count so you saturate your hardware without oversubscribing memory.
  • use_process=True -- runs each instance in a separate OS process, escaping the Python GIL. This matters when your model uses C extensions or CUDA kernels that release the GIL, because other Python threads in the same process can still contend for the GIL during pre/post-processing.

Every time you are not using those GPUs, you are wasting money. These three parameters -- gpus, max_concurrency, use_process -- are how you keep them saturated.

Escaping the GIL with use_process=True

use_process=True launches each UDF instance in its own process with its own GIL. Use it when:

  • Your model uses PyTorch, TensorFlow, or any library with C/CUDA extensions
  • You are running multiple concurrent UDF instances on the same node
  • Profiling shows CPU-bound pre/post-processing is the bottleneck, not the GPU forward pass

Skip it when:

  • Your UDF is purely async (API calls, network I/O) -- the GIL does not apply
  • You have a single UDF instance per node -- there is no contention

Production Patterns: ByteDance and Essential AI

These are not hypothetical configurations. Two production deployments illustrate how @daft.cls performs at scale.

ByteDance: Tuning Distributed UDFs on Ray

ByteDance's Volcengine LAS Team runs large-scale data processing through Daft UDFs on Ray. Their key insight: partitions become tasks, tasks get assigned to workers, then each task's data is sliced into morsels by batch_size. The formula for optimal distribution is:

(concurrency / num_workers) * batch_size = rows_per_task

They identified three failure modes that @daft.cls configuration helps prevent:

  1. Data skew -- hash-based repartitioning on uneven data creates imbalanced workloads. Solution: use into_batches() to rebalance before the UDF stage.
  2. Idle workers -- insufficient partitions leave actors without tasks. Solution: ensure partition count >= worker count using repartition() or into_batches().
  3. Underutilization from large batches -- setting batch_size=500 on 1,000 rows produces only two morsels, not enough to engage all configured actors. Solution: align batch size so each task generates sufficient morsels.

The result: even data distribution across all workers with full concurrency utilization. No manual batch size tuning. No OOM.

Essential AI: 24 Trillion Tokens, Zero Crashes

The Essential AI deployment described in the introduction used the same pattern at extreme scale. Their vLLM-inference pipeline wrapped the inference engine in a stateful UDF, let @daft.cls manage GPU allocation, and ran distributed on Ray for 7 straight days without a single crash.

Essential AI's Ritvik Kapila: "Daft's massively parallel compute, cloud-native I/O, and painless transition from local testing to seamless distributed scaling made this possible."

Choosing Your Configuration

The right @daft.cls configuration depends on your workload. Here is a decision guide:

Embedding or classification (single GPU per model):

@daft.cls(gpus=1, max_concurrency=NUM_GPUS)

Set max_concurrency to your available GPU count. Use @daft.method.batch with a batch_size matched to your GPU memory.

Large language model inference (multi-GPU, tensor parallel):

@daft.cls(gpus=NUM_GPUS_PER_MODEL, max_concurrency=1, use_process=True)

A single model instance sharded across GPUs. use_process=True because LLM pre/post-processing is CPU-heavy.

API-based inference (OpenAI, Anthropic, hosted endpoints):

@daft.cls
class APIInferenceClient:
    def __init__(self, api_key: str):
        import openai
        self.client = openai.OpenAI(api_key=api_key)
 
    async def generate(self, prompt: str) -> str:
        response = await self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
        )
        return response.choices[0].message.content

No GPU configuration needed. The stateful pattern still pays off: you initialize the client once per worker and reuse the connection pool.

Audio/video transcription (GPU + heavy I/O):

@daft.cls(gpus=1, max_concurrency=NUM_GPUS, use_process=True)

Whisper, Parakeet, and other transcription models benefit from process isolation because audio decoding is CPU-intensive.

What You Get

@daft.cls turns a Python class into a distributed inference operator. You get:

  • One initialization per worker -- load the model once, run it on every row
  • GPU-aware scheduling -- Daft places instances on nodes with available GPUs
  • Concurrency control -- cap the number of simultaneous model instances across your cluster
  • GIL escape -- use_process=True gives each instance its own process
  • Same code, any scale -- debug locally, distribute to a Ray cluster with daft.context.set_runner_ray()

The pattern works for sentiment classification and for processing 24 trillion tokens. The difference is three parameters on a decorator.

Get updates, contribute code, or say hi.
Daft Engineering Blog
Join us as we explore innovative ways to handle multimodal datasets, optimize performance, and simplify your data workflows.
Github Discussions Forums
join
GitHub logo
The Distributed Data Community Slack
join
Slack logo