
Stateless UDFs with daft.func - four patterns, one decorator
Row-wise, async, generator, and batch UDFs in Daft — one decorator, zero boilerplate, local or distributed.
by Daft TeamTLDR
daft.func supports four stateless patterns: row-wise, generator, async, and batch — each maps to a different shape of workload. Row-wise for custom per-row logic, generators for one-to-many (PDF splitting, tokenization), async for concurrent I/O, batch for vectorized PyArrow/NumPy ops. Every pattern runs identically on your laptop or a 100-node Ray cluster. Pick the pattern that matches your problem, not the one your framework forces on you.
Your DataFrame engine has 200 built-in expressions. Your pipeline needs the 201st - a custom regex, an API call, a PDF splitter. In most frameworks, that means dropping out of the DataFrame, writing glue code, and managing parallelism yourself. In Daft, it means adding one decorator to the Python function you already wrote.
@daft.func turns a regular Python function into a distributed operation. No boilerplate. No code changes between your laptop and a 100-node Ray cluster. You keep your programming mindset; Daft handles the scale.
This post walks through the four stateless UDF patterns (row-wise, generator, async, and batch) with runnable code for each. Every section opens with the problem it solves, because the right pattern depends on the shape of your workload, not on the API.
Row-wise: one row in, one row out
The problem: You need to apply custom logic to every row - normalizing text, parsing a field, computing a derived value - and the built-in expressions don't cover your case.
Row-wise is the default. Write a function with type hints, decorate it with @daft.func, and call it on DataFrame columns. Daft infers the return type from your annotations and runs the function on each row in parallel.
import daft
@daft.func
def normalize_email(raw: str) -> str:
local, domain = raw.strip().lower().split("@")
local = local.split("+")[0] # strip plus-addressing
return f"{local}@{domain}"
df = daft.from_pydict({"email": [
"Alice+work@Gmail.COM",
" bob@example.org ",
"CAROL+spam@yahoo.com",
]})
df = df.with_column("clean_email", normalize_email(df["email"]))
df.show()That's it. The function looks like regular Python because it is regular Python. You can mix DataFrame columns with literal arguments:
@daft.func
def format_price(cents: int, currency: str = "USD", decimals: int = 2) -> str:
amount = cents / (10 ** decimals)
return f"{currency} {amount:.{decimals}f}"
df = daft.from_pydict({"price_cents": [1999, 4550, 120]})
df = df.with_column("price_formatted", format_price(df["price_cents"], currency="EUR"))
df.show()Column arguments map to distributed data. Literal arguments pass through unchanged. You write the function the same way you would in a script - Daft figures out the rest.
Generators: one row in, many rows out
The problem: Each input row produces a variable number of output rows. Splitting a document into pages, tokenizing text into words, expanding a nested record into child rows. Traditional UDFs force you to return a fixed-size output; generators let you yield as many rows as the data demands.
Return an Iterator from your function, and Daft automatically detects the generator pattern. Each yielded value becomes a separate row in the output.
from typing import Iterator
import daft
@daft.func
def split_into_sentences(text: str) -> Iterator[str]:
for sentence in text.split(". "):
cleaned = sentence.strip().rstrip(".")
if cleaned:
yield cleaned
df = daft.from_pydict({"paragraph": [
"Daft runs locally. It also runs distributed. No code changes needed.",
"UDFs bridge the gap. Your Python scales automatically.",
]})
df = df.with_column("sentence", split_into_sentences(df["paragraph"]))
df.show()One paragraph in, multiple sentences out. The original two rows expand to five - no explode() call, no intermediate lists, no memory spike from materializing all results before yielding.
This pattern is especially useful for document processing. A PDF extraction pipeline yields pages from documents, chunks from pages, entities from chunks - each stage is a generator UDF that naturally maps one-to-many relationships:
import daft
from typing import Iterator, TypedDict
import pymupdf
class PdfPage(TypedDict):
page_number: int
page_text: str
page_image_bytes: bytes
@daft.func
def extract_pdf(file: daft.File) -> Iterator[PdfPage]:
"""Extracts the content of a PDF file."""
pymupdf.TOOLS.mupdf_display_errors(False) # Suppress non-fatal MuPDF warnings
with file.to_tempfile() as tmp:
doc = pymupdf.Document(filename=str(tmp.name), filetype="pdf")
for pno, page in enumerate(doc):
row = PdfPage(
page_number=pno,
page_text=page.get_text("text"),
page_image_bytes=page.get_pixmap().tobytes(),
)
yield row
if __name__ == "__main__":
from daft import col
df = (
daft.from_glob_path("hf://datasets/Eventual-Inc/sample-files/papers/*.pdf")
.with_column("pdf_file", daft.functions.file(col("path")))
.with_column("page", extract_pdf(col("pdf_file")))
.select("path", "size", daft.functions.unnest(col("page")))
)
df.show()A 50-page report yields 50 rows. A 2-page invoice yields 2. The generator handles the variability; Daft handles the parallelism.
Async: concurrent I/O without threads
The problem: Your function calls an external service -- an API, a database, a model endpoint. Sequential execution means each row waits for the previous one to finish. You need concurrency, but managing thread pools or callback chains is fragile and error-prone.
Make your function async, and Daft runs it concurrently across rows using Python's native asyncio. No thread management, no executor boilerplate.
import daft
import asyncio
@daft.func
async def fetch_status(url: str) -> int:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return response.status
df = daft.from_pydict({"endpoint": [
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
]})
df = df.select(
df["endpoint"],
fetch_status(df["endpoint"]).alias("status_code"),
)
df.show()Each request fires concurrently instead of sequentially. For I/O-bound work - API calls, database queries, webhook triggers - async UDFs give you concurrency with zero changes to how you think about the function. You write async def and await exactly as you would outside of Daft.
This matters at scale. Hitting a rate-limited API with 10,000 rows sequentially could take hours. With async, Daft overlaps the waiting, and your pipeline finishes in the time it takes to process the slowest batch.
Batch: vectorized operations on columns
The problem: Your transformation operates on arrays, not scalars. You want to use PyArrow compute kernels, NumPy vectorization, or a library that expects batch inputs. Row-wise execution adds per-row overhead you don't need.
Use @daft.func.batch to receive entire columns as daft.Series objects and return a Series in one shot.
import daft
from daft import DataType, Series
@daft.func.batch(return_dtype=DataType.float64())
def z_score(values: Series) -> Series:
import pyarrow.compute as pc
arr = values.to_arrow()
mean = pc.mean(arr).as_py()
stddev = pc.stddev(arr).as_py()
if stddev == 0:
return Series.from_arrow(pc.subtract(arr, mean))
centered = pc.subtract(arr, mean)
return Series.from_arrow(pc.divide(centered, stddev))
df = daft.from_pydict({"measurement": [10.0, 20.0, 30.0, 40.0, 50.0]})
df = df.select(z_score(df["measurement"]).alias("z"))
df.show()Batch UDFs bypass per-row dispatch entirely. The function receives the full partition as Arrow-backed Series, operates on it with vectorized kernels, and returns the result. For numerical transformations, this can be orders of magnitude faster than row-wise execution.
You can also mix Series with scalar arguments, which is useful when a batch operation needs a parameter that doesn't come from the data:
@daft.func.batch(return_dtype=DataType.float64())
def clip_values(values: Series, lower: float, upper: float) -> Series:
import pyarrow.compute as pc
arr = values.to_arrow()
clipped = pc.max_element_wise(arr, lower)
clipped = pc.min_element_wise(clipped, upper)
return Series.from_arrow(clipped)
df = daft.from_pydict({"signal": [0.1, 5.5, -3.2, 12.0, 0.8]})
df = df.select(clip_values(df["signal"], 0.0, 10.0).alias("clipped"))
df.show()values receives the column data as a Series. lower and upper pass through as Python floats. Daft distinguishes column arguments from literals automatically.
Choosing the right pattern
The four patterns map to four shapes of work:
| Pattern | Signature | When to use |
|---|---|---|
| Row-wise | def f(x: T) -> U | Custom logic per row -- parsing, formatting, validation |
| Generator | def f(x: T) -> Iterator[U] | One input produces variable outputs -- splitting, tokenizing, expanding |
| Async | async def f(x: T) -> U | External I/O -- API calls, database queries, downloads |
| Batch | @daft.func.batch with Series | Vectorized math, array operations, PyArrow compute |
All four share the same principle: write the Python you would write anyway, add a decorator, and let Daft distribute it. The function you wrote for 100 rows on your laptop runs unchanged on 10 million rows across a cluster.
The decorator is the only change. Your code stays yours. Daft handles the rest.

