
Stateless UDFs with daft.func
Row-wise, async, generator, and batch UDFs in Daft: one decorator, zero boilerplate, local or distributed
by Daft TeamTLDR
daft.func supports four stateless patterns: row-wise, generator, async, and batch — each maps to a different shape of workload. Row-wise for custom per-row logic, generators for one-to-many (PDF splitting, tokenization), async for concurrent I/O, batch for vectorized PyArrow/NumPy ops. Every pattern runs identically on your laptop or a 100-node Ray cluster. Pick the pattern that matches your problem, not the one your framework forces on you.
Your DataFrame engine has 200 built-in expressions. Your pipeline needs the 201st - a custom regex, an API call, a PDF splitter. In most frameworks, that means dropping out of the DataFrame, writing glue code, and managing parallelism yourself. In Daft, it means adding one decorator to the Python function you already wrote.
@daft.func turns a regular Python function into a distributed operation. No boilerplate. No code changes between your laptop and a 100-node Ray cluster. You keep your programming mindset; Daft handles the scale.
This post walks through the four stateless UDF patterns (row-wise, generator, async, and batch) with runnable code for each. Every section opens with the problem it solves, because the right pattern depends on the shape of your workload, not on the API.
Row-wise: one row in, one row out
The problem: You need to apply custom logic to every row - normalizing text, parsing a field, computing a derived value - and the built-in expressions don't cover your case.
Row-wise is the default. Write a function with type hints, decorate it with @daft.func, and call it on DataFrame columns. Daft infers the return type from your annotations and runs the function on each row in parallel.
1import daft23@daft.func4def normalize_email(raw: str) -> str:5 local, domain = raw.strip().lower().split("@")6 local = local.split("+")[0] # strip plus-addressing7 return f"{local}@{domain}"8910df = daft.from_pydict({"email": [11 "Alice+work@Gmail.COM",12 " bob@example.org ",13 "CAROL+spam@yahoo.com",14]})15df = df.with_column("clean_email", normalize_email(df["email"]))16df.show()
That's it. The function looks like regular Python because it is regular Python. You can mix DataFrame columns with literal arguments:
1@daft.func2def format_price(cents: int, currency: str = "USD", decimals: int = 2) -> str:3 amount = cents / (10 ** decimals)4 return f"{currency} {amount:.{decimals}f}"567df = daft.from_pydict({"price_cents": [1999, 4550, 120]})8df = df.with_column("price_formatted", format_price(df["price_cents"], currency="EUR"))9df.show()
Column arguments map to distributed data. Literal arguments pass through unchanged. You write the function the same way you would in a script - Daft figures out the rest.
Generators: one row in, many rows out
The problem: Each input row produces a variable number of output rows. Splitting a document into pages, tokenizing text into words, expanding a nested record into child rows. Traditional UDFs force you to return a fixed-size output; generators let you yield as many rows as the data demands.
Return an Iterator from your function, and Daft automatically detects the generator pattern. Each yielded value becomes a separate row in the output.
1from typing import Iterator2import daft345@daft.func6def split_into_sentences(text: str) -> Iterator[str]:7 for sentence in text.split(". "):8 cleaned = sentence.strip().rstrip(".")9 if cleaned:10 yield cleaned111213df = daft.from_pydict({"paragraph": [14 "Daft runs locally. It also runs distributed. No code changes needed.",15 "UDFs bridge the gap. Your Python scales automatically.",16]})17df = df.with_column("sentence", split_into_sentences(df["paragraph"]))18df.show()
One paragraph in, multiple sentences out. The original two rows expand to five - no explode() call, no intermediate lists, no memory spike from materializing all results before yielding.
This pattern is especially useful for document processing. A PDF extraction pipeline yields pages from documents, chunks from pages, entities from chunks - each stage is a generator UDF that naturally maps one-to-many relationships:
1import daft2from typing import Iterator, TypedDict3import pymupdf456class PdfPage(TypedDict):7 page_number: int8 page_text: str9 page_image_bytes: bytes1011@daft.func12def extract_pdf(file: daft.File) -> Iterator[PdfPage]:13 """Extracts the content of a PDF file."""14 pymupdf.TOOLS.mupdf_display_errors(False) # Suppress non-fatal MuPDF warnings151617 with file.to_tempfile() as tmp:18 doc = pymupdf.Document(filename=str(tmp.name), filetype="pdf")19 for pno, page in enumerate(doc):20 row = PdfPage(21 page_number=pno,22 page_text=page.get_text("text"),23 page_image_bytes=page.get_pixmap().tobytes(),24 )25 yield row262728if __name__ == "__main__":29 from daft import col3031 df = (32 daft.from_glob_path("hf://datasets/Eventual-Inc/sample-files/papers/*.pdf")33 .with_column("pdf_file", daft.functions.file(col("path")))34 .with_column("page", extract_pdf(col("pdf_file")))35 .select("path", "size", daft.functions.unnest(col("page")))36 )3738 df.show()
A 50-page report yields 50 rows. A 2-page invoice yields 2. The generator handles the variability; Daft handles the parallelism.
Async: concurrent I/O without threads
The problem: Your function calls an external service -- an API, a database, a model endpoint. Sequential execution means each row waits for the previous one to finish. You need concurrency, but managing thread pools or callback chains is fragile and error-prone.
Make your function async, and Daft runs it concurrently across rows using Python's native asyncio. No thread management, no executor boilerplate.
1import daft2import asyncio345@daft.func6async def fetch_status(url: str) -> int:7 import aiohttp89 async with aiohttp.ClientSession() as session:10 async with session.get(url) as response:11 return response.status121314df = daft.from_pydict({"endpoint": [15 "https://httpbin.org/status/200",16 "https://httpbin.org/status/404",17 "https://httpbin.org/status/500",18]})1920df = df.select(21 df["endpoint"],22 fetch_status(df["endpoint"]).alias("status_code"),23)24df.show()
Each request fires concurrently instead of sequentially. For I/O-bound work - API calls, database queries, webhook triggers - async UDFs give you concurrency with zero changes to how you think about the function. You write async def and await exactly as you would outside of Daft.
This matters at scale. Hitting a rate-limited API with 10,000 rows sequentially could take hours. With async, Daft overlaps the waiting, and your pipeline finishes in the time it takes to process the slowest batch.
Batch: vectorized operations on columns
The problem: Your transformation operates on arrays, not scalars. You want to use PyArrow compute kernels, NumPy vectorization, or a library that expects batch inputs. Row-wise execution adds per-row overhead you don't need.
Use @daft.func.batch to receive entire columns as daft.Series objects and return a Series in one shot.
1import daft2from daft import DataType, Series345@daft.func.batch(return_dtype=DataType.float64())6def z_score(values: Series) -> Series:7 import pyarrow.compute as pc8910 arr = values.to_arrow()11 mean = pc.mean(arr).as_py()12 stddev = pc.stddev(arr).as_py()13 if stddev == 0:14 return Series.from_arrow(pc.subtract(arr, mean))15 centered = pc.subtract(arr, mean)16 return Series.from_arrow(pc.divide(centered, stddev))171819df = daft.from_pydict({"measurement": [10.0, 20.0, 30.0, 40.0, 50.0]})20df = df.select(z_score(df["measurement"]).alias("z"))21df.show()
Batch UDFs bypass per-row dispatch entirely. The function receives the full partition as Arrow-backed Series, operates on it with vectorized kernels, and returns the result. For numerical transformations, this can be orders of magnitude faster than row-wise execution.
You can also mix Series with scalar arguments, which is useful when a batch operation needs a parameter that doesn't come from the data:
1@daft.func.batch(return_dtype=DataType.float64())2def clip_values(values: Series, lower: float, upper: float) -> Series:3 import pyarrow.compute as pc456 arr = values.to_arrow()7 clipped = pc.max_element_wise(arr, lower)8 clipped = pc.min_element_wise(clipped, upper)9 return Series.from_arrow(clipped)101112df = daft.from_pydict({"signal": [0.1, 5.5, -3.2, 12.0, 0.8]})13df = df.select(clip_values(df["signal"], 0.0, 10.0).alias("clipped"))14df.show()
values receives the column data as a Series. lower and upper pass through as Python floats. Daft distinguishes column arguments from literals automatically.
Choosing the right pattern
The four patterns map to four shapes of work:
Pattern | Signature | When to use |
|---|---|---|
Row-wise |
| Custom logic per row -- parsing, formatting, validation |
Generator |
| One input produces variable outputs -- splitting, tokenizing, expanding |
Async |
| External I/O -- API calls, database queries, downloads |
Batch |
| Vectorized math, array operations, PyArrow compute |
All four share the same principle: write the Python you would write anyway, add a decorator, and let Daft distribute it. The function you wrote for 100 rows on your laptop runs unchanged on 10 million rows across a cluster.
The decorator is the only change. Your code stays yours. Daft handles the rest.