Back to Blog
March 30, 2026
Daft UDF Patterns: Four Patterns, One Notebook

Daft UDF Patterns: Four Patterns, One Notebook

Row-wise, generator, async, and stateful UDFs — one notebook, one dataset, runnable side by side.

by Daft Team

Daft has four UDF patterns, each designed for a different kind of workload. This notebook puts all four in one place — row-wise transforms, generators, async I/O, and stateful classes — with a single dataset so they're easy to compare and run side by side.

This is Week 5 of the UDF series. Previous posts: stateless UDFs with @daft.func, stateful UDFs with @daft.cls, GPU inference with @daft.cls.

Each section walks through a problem, shows the pattern that solves it, and includes runnable code. The full notebook is available in the Daft examples.


The UDF series so far


Section 1: Row-wise -- clean every row with zero setup

Problem: You need custom logic on every row -- normalize an email, parse a phone number, validate a field -- and the built-in expressions don't cover it.

Pattern: @daft.func (docs) with type hints. One row in, one row out.

import daft
 
@daft.func
def normalize_email(raw: str) -> str:
    local, domain = raw.strip().lower().split("@")
    local = local.split("+")[0]
    return f"{local}@{domain}"
 
df = daft.from_pydict({"email": [
    "Alice+work@Gmail.COM",
    "  bob@example.org  ",
    "CAROL+spam@yahoo.com",
]})
df = df.select(normalize_email(df["email"]).alias("clean"))
df.show()

When to use it: Your function takes a single row and returns a single value. The logic is pure Python -- no external services, no model weights, no variable-length output. This is the workhorse pattern for data cleaning.


Section 2: Generator -- one document becomes many chunks

Problem: Each input row needs to produce a variable number of output rows. Splitting a PDF into pages, tokenizing text into sentences, expanding a nested record into child rows. You don't know ahead of time how many outputs each input will produce.

Pattern: @daft.func returning Iterator[T]. Yield as many rows as the data demands.

from typing import Iterator
import daft
 
@daft.func
def split_into_chunks(text: str, max_len: int = 200) -> Iterator[str]:
    words = text.split()
    chunk = []
    length = 0
    for word in words:
        if length + len(word) + 1 > max_len and chunk:
            yield " ".join(chunk)
            chunk = []
            length = 0
        chunk.append(word)
        length += len(word) + 1
    if chunk:
        yield " ".join(chunk)
 
df = daft.from_pydict({"document": [
    "Daft runs locally and distributed with zero code changes. " * 10,
    "UDFs bridge the gap between built-in expressions and your custom logic. " * 5,
]})
df = df.select(split_into_chunks(df["document"]).alias("chunk"))
df.show()

When to use it: One-to-many transformations. Document chunking for RAG pipelines, audio segmentation, tokenization -- anywhere yield is the natural way to express "this input produces N outputs." No intermediate lists, no explode(), no memory spike from materializing everything at once.


Section 3: Async -- hit APIs without waiting in line

Problem: Your function calls an external service -- a translation API, a geocoder, a model endpoint. Sequential execution means each row waits for the previous one to finish. At 10,000 rows, that's hours of idle waiting.

Pattern: async def with @daft.func. Daft overlaps the I/O automatically.

import daft
 
@daft.func
async def fetch_status(url: str) -> int:
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return resp.status
 
df = daft.from_pydict({"endpoint": [
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/404",
    "https://httpbin.org/status/500",
]})
df = df.select(
    df["endpoint"],
    fetch_status(df["endpoint"]).alias("status"),
)
df.show()

When to use it: Any I/O-bound workload -- API calls, database lookups, webhook triggers, model endpoints behind a REST API. You write async def and await exactly as you would outside Daft. The concurrency is automatic; no thread pools, no executor boilerplate.


Section 4: Stateful -- load a model once, infer on every row

Problem: Loading a 2 GB model for every partition is killing your pipeline. Seven minutes of cold-start time, repeated across every worker, before a single row gets processed.

Pattern: @daft.cls (docs) with __init__ for setup and __call__ for inference. The model loads once per worker and stays in memory for every row that worker handles.

import daft
 
@daft.cls
class SentimentClassifier:
    def __init__(self):
        from transformers import pipeline
        self.pipe = pipeline(
            "sentiment-analysis",
            model="distilbert-base-uncased-finetuned-sst-2-english",
        )
 
    def __call__(self, text: str) -> str:
        return self.pipe(text)[0]["label"]
 
classifier = SentimentClassifier()
 
df = daft.from_pydict({"review": [
    "This product is amazing",
    "Worst purchase I've ever made",
    "It's okay, nothing special",
]})
df = df.select(classifier(df["review"]).alias("sentiment"))
df.show()

When to use it: Any workload with expensive initialization -- model loading, database connection pools, API clients with authentication. __init__ runs once per worker; the method runs on every row. You amortize the cold-start cost across all the rows that worker processes. Add gpus=1 to the decorator when you need GPU allocation, max_concurrency to cap parallel instances, use_process=True to escape the GIL.


Pick your pattern

I need to...PatternDecorator
Transform each row with custom logicRow-wise@daft.func
Produce multiple rows from one inputGenerator@daft.func + Iterator[T]
Call external services concurrentlyAsync@daft.func + async def
Reuse expensive resources across rowsStateful@daft.cls + __init__

Every pattern works locally on your laptop. Set daft.set_runner_ray("ray://cluster:10001") and the same code runs distributed across a cluster. No rewrites.


Try it

The full notebook lives in the Daft docs: daft-udf-patterns.ipynb. Open it, run every cell. Fifteen minutes to hands-on experience with all four UDF patterns.

Your Python. Daft's scale.

Get updates, contribute code, or say hi.
Daft Engineering Blog
Join us as we explore innovative ways to handle multimodal datasets, optimize performance, and simplify your data workflows.
Github Discussions Forums
join
GitHub logo
The Distributed Data Community Slack
join
Slack logo