
Daft UDF Patterns: Four Patterns, One Notebook
Row-wise, generator, async, and stateful UDFs — one notebook, one dataset, runnable side by side.
by Daft TeamDaft has four UDF patterns, each designed for a different kind of workload. This notebook puts all four in one place — row-wise transforms, generators, async I/O, and stateful classes — with a single dataset so they're easy to compare and run side by side.
This is Week 5 of the UDF series. Previous posts: stateless UDFs with @daft.func, stateful UDFs with @daft.cls, GPU inference with @daft.cls.
Each section walks through a problem, shows the pattern that solves it, and includes runnable code. The full notebook is available in the Daft examples.
The UDF series so far
- Week 1: What is a UDF? — When built-in expressions aren't enough, five lines of
@daft.funcget custom Python running distributed. - Week 2: Stateless UDFs with
@daft.func— Row-wise, generator, async, and batch. Four patterns, one decorator. - Week 3: Stateful UDFs with
@daft.cls— Load a model once per worker with@daft.cls, reuse it for every row. - Week 4: GPU Inference with
@daft.cls— GPU allocation,max_concurrency, anduse_process=Truefor production inference. Patterns from ByteDance and Essential AI.
Section 1: Row-wise -- clean every row with zero setup
Problem: You need custom logic on every row -- normalize an email, parse a phone number, validate a field -- and the built-in expressions don't cover it.
Pattern: @daft.func (docs) with type hints. One row in, one row out.
import daft
@daft.func
def normalize_email(raw: str) -> str:
local, domain = raw.strip().lower().split("@")
local = local.split("+")[0]
return f"{local}@{domain}"
df = daft.from_pydict({"email": [
"Alice+work@Gmail.COM",
" bob@example.org ",
"CAROL+spam@yahoo.com",
]})
df = df.select(normalize_email(df["email"]).alias("clean"))
df.show()When to use it: Your function takes a single row and returns a single value. The logic is pure Python -- no external services, no model weights, no variable-length output. This is the workhorse pattern for data cleaning.
Section 2: Generator -- one document becomes many chunks
Problem: Each input row needs to produce a variable number of output rows. Splitting a PDF into pages, tokenizing text into sentences, expanding a nested record into child rows. You don't know ahead of time how many outputs each input will produce.
Pattern: @daft.func returning Iterator[T]. Yield as many rows as the data demands.
from typing import Iterator
import daft
@daft.func
def split_into_chunks(text: str, max_len: int = 200) -> Iterator[str]:
words = text.split()
chunk = []
length = 0
for word in words:
if length + len(word) + 1 > max_len and chunk:
yield " ".join(chunk)
chunk = []
length = 0
chunk.append(word)
length += len(word) + 1
if chunk:
yield " ".join(chunk)
df = daft.from_pydict({"document": [
"Daft runs locally and distributed with zero code changes. " * 10,
"UDFs bridge the gap between built-in expressions and your custom logic. " * 5,
]})
df = df.select(split_into_chunks(df["document"]).alias("chunk"))
df.show()When to use it: One-to-many transformations. Document chunking for RAG pipelines, audio segmentation, tokenization -- anywhere yield is the natural way to express "this input produces N outputs." No intermediate lists, no explode(), no memory spike from materializing everything at once.
Section 3: Async -- hit APIs without waiting in line
Problem: Your function calls an external service -- a translation API, a geocoder, a model endpoint. Sequential execution means each row waits for the previous one to finish. At 10,000 rows, that's hours of idle waiting.
Pattern: async def with @daft.func. Daft overlaps the I/O automatically.
import daft
@daft.func
async def fetch_status(url: str) -> int:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return resp.status
df = daft.from_pydict({"endpoint": [
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
]})
df = df.select(
df["endpoint"],
fetch_status(df["endpoint"]).alias("status"),
)
df.show()When to use it: Any I/O-bound workload -- API calls, database lookups, webhook triggers, model endpoints behind a REST API. You write async def and await exactly as you would outside Daft. The concurrency is automatic; no thread pools, no executor boilerplate.
Section 4: Stateful -- load a model once, infer on every row
Problem: Loading a 2 GB model for every partition is killing your pipeline. Seven minutes of cold-start time, repeated across every worker, before a single row gets processed.
Pattern: @daft.cls (docs) with __init__ for setup and __call__ for inference. The model loads once per worker and stays in memory for every row that worker handles.
import daft
@daft.cls
class SentimentClassifier:
def __init__(self):
from transformers import pipeline
self.pipe = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english",
)
def __call__(self, text: str) -> str:
return self.pipe(text)[0]["label"]
classifier = SentimentClassifier()
df = daft.from_pydict({"review": [
"This product is amazing",
"Worst purchase I've ever made",
"It's okay, nothing special",
]})
df = df.select(classifier(df["review"]).alias("sentiment"))
df.show()When to use it: Any workload with expensive initialization -- model loading, database connection pools, API clients with authentication. __init__ runs once per worker; the method runs on every row. You amortize the cold-start cost across all the rows that worker processes. Add gpus=1 to the decorator when you need GPU allocation, max_concurrency to cap parallel instances, use_process=True to escape the GIL.
Pick your pattern
| I need to... | Pattern | Decorator |
|---|---|---|
| Transform each row with custom logic | Row-wise | @daft.func |
| Produce multiple rows from one input | Generator | @daft.func + Iterator[T] |
| Call external services concurrently | Async | @daft.func + async def |
| Reuse expensive resources across rows | Stateful | @daft.cls + __init__ |
Every pattern works locally on your laptop. Set daft.set_runner_ray("ray://cluster:10001") and the same code runs distributed across a cluster. No rewrites.
Try it
The full notebook lives in the Daft docs: daft-udf-patterns.ipynb. Open it, run every cell. Fifteen minutes to hands-on experience with all four UDF patterns.
Your Python. Daft's scale.