
Daft UDFs: What is a UDF and why do you need one?
Daft UDFs let you run custom Python inside a distributed DataFrame regardless of whether it's a Row-wise, Async, Generators, or Batch operation.
by Daft TeamTLDR
- A UDF lets you run custom Python inside a distributed DataFrame pipeline when built-in expressions don't cover your use case
- Daft's @daft.func covers four patterns with a single decorator: row-wise, async, generator, and batch
Imagine for a moment you have a DataFrame pipeline that works -- reads, filters, joins, aggregates -- but you need to do the one thing that no built-in expression covers. Maybe you need to parse a custom text format, call an API, or run a model. Suddenly you're outside the expression system, and your options are ugly: dump to pandas and loop, export to CSV and shell out, or rewrite everything in a lower-level framework.
A user-defined function (UDF) bridges that gap. It lets you write regular Python and run it inside your distributed pipeline, on every row, without leaving the DataFrame.
Most frameworks support some version of this, but they all force tradeoffs: PySpark serializes every row through Java, pandas apply runs single-threaded on one machine, and Ray Data only supports batch-level transforms. Daft's daft.func takes a different approach: decorate a normal Python function, and it works locally or distributed with zero code changes. Row-wise, batch, async, generators -- one decorator covers all of them.
Two Paths Lead to needing a UDF
Path 1: You process data in DataFrames, but expressions don't cover your use case
You already think in DataFrames. You filter with .filter(), transform with .with_column(), aggregate with .agg(). Daft's expression system handles most of this in Rust -- fast, vectorized, zero Python overhead.
But expressions can't do everything. They cover math, string operations, date manipulation, type casting -- the standard analytics toolkit. The moment you need custom parsing logic, a domain-specific calculation, or anything that requires importing a Python library, you've left expression territory.
That's the first path: you're already in the DataFrame, and you need a UDF to cover the gap between what expressions offer and what your workload actually requires.
Path 2: You have Python code that needs to scale
Maybe you're not starting from a DataFrame at all. You have a Python function -- a text classifier, a geocoder, an API client -- and you need it to run on a million rows instead of one. You need parallelism, but you don't want to rewrite your function for Spark or set up a Ray cluster.
That's the second path: you already have the Python, and you need a way to parallelize it with minimal changes. Daft makes this trivially easy -- add a decorator, pass your function a column, and Daft handles distribution.
Five Lines, Full Pipeline
Here's a complete UDF in Daft. No boilerplate, no serialization config, no schema gymnastics:
1import daft234@daft.func5def normalize_email(email: str) -> str:6 local, domain = email.lower().split("@")7 return f"{local.strip().replace('.', '')}@{domain}"8910df = daft.from_pydict({"email": ["John.Doe@Gmail.COM", " alice@Company.ORG", "BOB.smith@Yahoo.com"]})11df = df.with_column("clean_email", normalize_email(df["email"]))12df.show()
That's it. Type hints tell Daft the input and output types. The decorator wires it into the execution engine. Run this locally on your laptop, or set daft.set_runner_ray("ray://cluster:10001") and it distributes across a cluster -- same code, no changes.
Beyond Row-Wise: Async, Generators, Batch
daft.func isn't limited to simple row-in, row-out transforms. Daft inspects your function signature and adapts:
Async: for I/O-bound work like API calls.
For transforms where you want concurrency without worrying about threads:
1import aiohttp234@daft.func5async def fetch_page(url: str) -> str:6 async with aiohttp.ClientSession() as session:7 async with session.get(url) as resp:8 return await resp.text()
Generator: when one input row produces many output rows
Generators are common when processing audio, video, and PDFs. Here we'll have a simple sentence parser to demonstrate the idea.
1from typing import Iterator234@daft.func5def split_sentences(text: str) -> Iterator[str]:6 for sentence in text.split(". "):7 yield sentence.strip()
Batch
For vectorized operations with PyArrow or NumPy, when you want maximum throughput:
1@daft.func.batch(return_dtype=daft.DataType.float64())2def zscore(values: daft.Series) -> list:3 import numpy as np4 arr = values.to_pylist()5 arr = np.array(arr, dtype=np.float64)6 return ((arr - arr.mean()) / arr.std()).tolist()
One decorator. Four patterns. No framework-specific boilerplate.
Learn more from our docs:
- •
- •
- •
For the next segment in the UDF series see Stateless UDFs with daft.func.