Back to Blog
March 10, 2026

Stateless UDFs with daft.func

Row-wise, async, generator, and batch UDFs in Daft: one decorator, zero boilerplate, local or distributed

by Daft Team

TLDR

daft.func supports four stateless patterns: row-wise, generator, async, and batch — each maps to a different shape of workload. Row-wise for custom per-row logic, generators for one-to-many (PDF splitting, tokenization), async for concurrent I/O, batch for vectorized PyArrow/NumPy ops. Every pattern runs identically on your laptop or a 100-node Ray cluster. Pick the pattern that matches your problem, not the one your framework forces on you.

Your DataFrame engine has 200 built-in expressions. Your pipeline needs the 201st - a custom regex, an API call, a PDF splitter. In most frameworks, that means dropping out of the DataFrame, writing glue code, and managing parallelism yourself. In Daft, it means adding one decorator to the Python function you already wrote.

@daft.func turns a regular Python function into a distributed operation. No boilerplate. No code changes between your laptop and a 100-node Ray cluster. You keep your programming mindset; Daft handles the scale.

This post walks through the four stateless UDF patterns (row-wise, generator, async, and batch) with runnable code for each. Every section opens with the problem it solves, because the right pattern depends on the shape of your workload, not on the API.

Row-wise: one row in, one row out

The problem: You need to apply custom logic to every row - normalizing text, parsing a field, computing a derived value - and the built-in expressions don't cover your case.

Row-wise is the default. Write a function with type hints, decorate it with @daft.func, and call it on DataFrame columns. Daft infers the return type from your annotations and runs the function on each row in parallel.

1import daft
2
3@daft.func
4def normalize_email(raw: str) -> str:
5 local, domain = raw.strip().lower().split("@")
6 local = local.split("+")[0] # strip plus-addressing
7 return f"{local}@{domain}"
8
9
10df = daft.from_pydict({"email": [
11 "Alice+work@Gmail.COM",
12 " bob@example.org ",
13 "CAROL+spam@yahoo.com",
14]})
15df = df.with_column("clean_email", normalize_email(df["email"]))
16df.show()

That's it. The function looks like regular Python because it is regular Python. You can mix DataFrame columns with literal arguments:

1@daft.func
2def format_price(cents: int, currency: str = "USD", decimals: int = 2) -> str:
3 amount = cents / (10 ** decimals)
4 return f"{currency} {amount:.{decimals}f}"
5
6
7df = daft.from_pydict({"price_cents": [1999, 4550, 120]})
8df = df.with_column("price_formatted", format_price(df["price_cents"], currency="EUR"))
9df.show()

Column arguments map to distributed data. Literal arguments pass through unchanged. You write the function the same way you would in a script - Daft figures out the rest.

Generators: one row in, many rows out

The problem: Each input row produces a variable number of output rows. Splitting a document into pages, tokenizing text into words, expanding a nested record into child rows. Traditional UDFs force you to return a fixed-size output; generators let you yield as many rows as the data demands.

Return an Iterator from your function, and Daft automatically detects the generator pattern. Each yielded value becomes a separate row in the output.

1from typing import Iterator
2import daft
3
4
5@daft.func
6def split_into_sentences(text: str) -> Iterator[str]:
7 for sentence in text.split(". "):
8 cleaned = sentence.strip().rstrip(".")
9 if cleaned:
10 yield cleaned
11
12
13df = daft.from_pydict({"paragraph": [
14 "Daft runs locally. It also runs distributed. No code changes needed.",
15 "UDFs bridge the gap. Your Python scales automatically.",
16]})
17df = df.with_column("sentence", split_into_sentences(df["paragraph"]))
18df.show()

One paragraph in, multiple sentences out. The original two rows expand to five - no explode() call, no intermediate lists, no memory spike from materializing all results before yielding.

This pattern is especially useful for document processing. A PDF extraction pipeline yields pages from documents, chunks from pages, entities from chunks - each stage is a generator UDF that naturally maps one-to-many relationships:

1import daft
2from typing import Iterator, TypedDict
3import pymupdf
4
5
6class PdfPage(TypedDict):
7 page_number: int
8 page_text: str
9 page_image_bytes: bytes
10
11@daft.func
12def extract_pdf(file: daft.File) -> Iterator[PdfPage]:
13 """Extracts the content of a PDF file."""
14 pymupdf.TOOLS.mupdf_display_errors(False) # Suppress non-fatal MuPDF warnings
15
16
17 with file.to_tempfile() as tmp:
18 doc = pymupdf.Document(filename=str(tmp.name), filetype="pdf")
19 for pno, page in enumerate(doc):
20 row = PdfPage(
21 page_number=pno,
22 page_text=page.get_text("text"),
23 page_image_bytes=page.get_pixmap().tobytes(),
24 )
25 yield row
26
27
28if __name__ == "__main__":
29 from daft import col
30
31 df = (
32 daft.from_glob_path("hf://datasets/Eventual-Inc/sample-files/papers/*.pdf")
33 .with_column("pdf_file", daft.functions.file(col("path")))
34 .with_column("page", extract_pdf(col("pdf_file")))
35 .select("path", "size", daft.functions.unnest(col("page")))
36 )
37
38 df.show()

A 50-page report yields 50 rows. A 2-page invoice yields 2. The generator handles the variability; Daft handles the parallelism.

Async: concurrent I/O without threads

The problem: Your function calls an external service -- an API, a database, a model endpoint. Sequential execution means each row waits for the previous one to finish. You need concurrency, but managing thread pools or callback chains is fragile and error-prone.

Make your function async, and Daft runs it concurrently across rows using Python's native asyncio. No thread management, no executor boilerplate.

1import daft
2import asyncio
3
4
5@daft.func
6async def fetch_status(url: str) -> int:
7 import aiohttp
8
9 async with aiohttp.ClientSession() as session:
10 async with session.get(url) as response:
11 return response.status
12
13
14df = daft.from_pydict({"endpoint": [
15 "https://httpbin.org/status/200",
16 "https://httpbin.org/status/404",
17 "https://httpbin.org/status/500",
18]})
19
20df = df.select(
21 df["endpoint"],
22 fetch_status(df["endpoint"]).alias("status_code"),
23)
24df.show()

Each request fires concurrently instead of sequentially. For I/O-bound work - API calls, database queries, webhook triggers - async UDFs give you concurrency with zero changes to how you think about the function. You write async def and await exactly as you would outside of Daft.

This matters at scale. Hitting a rate-limited API with 10,000 rows sequentially could take hours. With async, Daft overlaps the waiting, and your pipeline finishes in the time it takes to process the slowest batch.

Batch: vectorized operations on columns

The problem: Your transformation operates on arrays, not scalars. You want to use PyArrow compute kernels, NumPy vectorization, or a library that expects batch inputs. Row-wise execution adds per-row overhead you don't need.

Use @daft.func.batch to receive entire columns as daft.Series objects and return a Series in one shot.

1import daft
2from daft import DataType, Series
3
4
5@daft.func.batch(return_dtype=DataType.float64())
6def z_score(values: Series) -> Series:
7 import pyarrow.compute as pc
8
9
10 arr = values.to_arrow()
11 mean = pc.mean(arr).as_py()
12 stddev = pc.stddev(arr).as_py()
13 if stddev == 0:
14 return Series.from_arrow(pc.subtract(arr, mean))
15 centered = pc.subtract(arr, mean)
16 return Series.from_arrow(pc.divide(centered, stddev))
17
18
19df = daft.from_pydict({"measurement": [10.0, 20.0, 30.0, 40.0, 50.0]})
20df = df.select(z_score(df["measurement"]).alias("z"))
21df.show()

Batch UDFs bypass per-row dispatch entirely. The function receives the full partition as Arrow-backed Series, operates on it with vectorized kernels, and returns the result. For numerical transformations, this can be orders of magnitude faster than row-wise execution.

You can also mix Series with scalar arguments, which is useful when a batch operation needs a parameter that doesn't come from the data:

1@daft.func.batch(return_dtype=DataType.float64())
2def clip_values(values: Series, lower: float, upper: float) -> Series:
3 import pyarrow.compute as pc
4
5
6 arr = values.to_arrow()
7 clipped = pc.max_element_wise(arr, lower)
8 clipped = pc.min_element_wise(clipped, upper)
9 return Series.from_arrow(clipped)
10
11
12df = daft.from_pydict({"signal": [0.1, 5.5, -3.2, 12.0, 0.8]})
13df = df.select(clip_values(df["signal"], 0.0, 10.0).alias("clipped"))
14df.show()

values receives the column data as a Series. lower and upper pass through as Python floats. Daft distinguishes column arguments from literals automatically.

Choosing the right pattern

The four patterns map to four shapes of work:

Pattern

Signature

When to use

Row-wise

def f(x: T) -> U

Custom logic per row -- parsing, formatting, validation

Generator

def f(x: T) -> Iterator[U]

One input produces variable outputs -- splitting, tokenizing, expanding

Async

async def f(x: T) -> U

External I/O -- API calls, database queries, downloads

Batch

@daft.func.batch with Series

Vectorized math, array operations, PyArrow compute

All four share the same principle: write the Python you would write anyway, add a decorator, and let Daft distribute it. The function you wrote for 100 rows on your laptop runs unchanged on 10 million rows across a cluster.

The decorator is the only change. Your code stays yours. Daft handles the rest.

Get updates, contribute code, or say hi.
Daft Engineering Blog
Join us as we explore innovative ways to handle multimodal datasets, optimize performance, and simplify your data workflows.
Github Discussions Forums
join
GitHub logo
The Distributed Data Community Slack
join
Slack logo