
Stateful UDFs with daft.cls
Daft's stateful UDFs provide clean pythonic interfaces for custom operations.
by Daft TeamLast week we covered @daft.func : four patterns for turning stateless Python functions into distributed operations. Row-wise, async, generator, batch. If you missed it, that post is worth reading first, because everything here builds directly on what you learn there.
But @daft.func has a limitation: every invocation starts from scratch. There's no way to hold onto something between rows. If your function needs a database connection, an API client with a session, or any resource that's expensive to create, you're paying that cost on every call. That's where @daft.cls comes in.
@daft.cls turns a Python class into a distributed operator. Your __init__ runs once per worker. Your methods run on every row, reusing whatever __init__ set up. Decorate a class, and Daft handles instantiation, scheduling, and parallelism across your cluster. No other distributed data framework has anything like it.
1. The anatomy of a stateful UDF
A stateful UDF has four parts: imports, decorator, __init__, and a method. If you've written a Python class before, this will look familiar.
1import daft2from transformers import pipeline345@daft.cls6class SentimentClassifier:7 def __init__(self):8 # Runs ONCE per worker -- download weights, allocate memory9 self.pipe = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")101112 def __call__(self, text: str) -> str:13 # Runs on EVERY ROW -- reuses the loaded model14 return self.pipe(text)[0]["label"]151617classifier = SentimentClassifier()181920df = daft.from_pydict({"review": [21 "This product is amazing",22 "Worst purchase I've ever made",23 "It's okay, nothing special",24]})25df = df.select(classifier(df["review"]).alias("sentiment"))26df.show()
When you write SentimentClassifier(), Daft doesn't instantiate the class immediately. During query execution, each worker calls __init__ once, then reuses that instance for every row it processes. The heavy setup of downloading weights, allocating memory, or opening connections happens once and the per-row work reuses it.
This is the same deferred execution model you saw with @daft.func in week 1's intro. Daft builds a plan, then executes it. The difference is that @daft.cls gives your operator persistent state.
2. It looks like Python because it is Python
In the stateless UDFs post, one of the key takeaways was that @daft.func requires almost no changes to your programming mindset. You write a normal Python function, add a decorator, and it scales. @daft.cls extends that same philosophy to classes.
If your function needs to hold onto something between rows, like a loaded model, a database connection pool, or an API client with authentication, you wrap it in a class. The mental model is simple: write any Python class you'd normally write, then add @daft.cls on top.
1import daft2import aiohttp345@daft.cls6class APIClient:7 def __init__(self, api_key: str):8 self.api_key = api_key91011 async def fetch(self, url: str) -> str:12 async with aiohttp.ClientSession() as session:13 headers = {"Authorization": f"Bearer {self.api_key}"}14 async with session.get(url, headers=headers) as resp:15 return await resp.text()161718client = APIClient("sk-my-secret-key")19df = daft.from_pydict({"endpoint": [20 "https://httpbin.org/get",21 "https://httpbin.org/ip",22]})23df = df.select(client.fetch(df["endpoint"]).alias("response"))
Notice how the APIClient stores the API key in __init__ and reuses it across every fetch call. With @daft.func, you'd either pass the key as a column (awkward) or hardcode it in the function (inflexible). The class gives you a natural place to hold configuration and resources.
Async, sync, classes, functions -- write Python the way you already do and Daft handles the distribution.
3. Same four patterns, now with state
Remember the four stateless patterns from last week's post? Row-wise, async, generator, and batch. Every one of them carries over to @daft.cls through @daft.method. The decorator changes from @daft.func to @daft.method, but the signatures and behavior stay the same. You're just adding persistent state on top.
Sync: row-by-row, the default.
Here the class holds precomputed statistics that get reused on every row:
1@daft.cls2class Normalizer:3 def __init__(self, mean: float, std: float):4 self.mean = mean5 self.std = std67 @daft.method8 def normalize(self, value: float) -> float:9 return (value - self.mean) / self.std
Async: for I/O-bound work where you want concurrency.
The class holds credentials and can maintain persistent connections across requests:
1@daft.cls2class TranslationClient:3 def __init__(self, api_key: str):4 self.api_key = api_key56 @daft.method7 async def translate(self, text: str) -> str:8 async with aiohttp.ClientSession() as session:9 resp = await session.post(10 "https://api.deepl.com/v2/translate",11 data={"text": text, "target_lang": "DE",12 "auth_key": self.api_key},13 )14 result = await resp.json()15 return result["translations"][0]["text"]
Generator: one input, many outputs.
The class loads a tokenizer once and reuses it to split every input into multiple rows:
1from typing import Iterator234@daft.cls5class Tokenizer:6 def __init__(self, model_name: str):7 from transformers import AutoTokenizer8 self.tokenizer = AutoTokenizer.from_pretrained(model_name)91011 @daft.method12 def tokenize(self, text: str) -> Iterator[str]:13 for token in self.tokenizer.tokenize(text):14 yield token
Batch: when your operation expects tensors or arrays, not individual scalars.
The class loads the model once, then processes entire batches at a time:
1from daft import DataType, Series234@daft.cls5class BatchEmbedder:6 def __init__(self, model_name: str):7 from sentence_transformers import SentenceTransformer8 self.model = SentenceTransformer(model_name)91011 @daft.method.batch(return_dtype=DataType.list(DataType.float64()))12 def embed(self, texts: Series) -> list:13 return self.model.encode(texts.to_pylist()).tolist()
If you followed along with the @daft.func patterns, these should feel immediately familiar. The only new concept is that your class holds state between calls. Everything else -- the type hints, the return behavior, the way Daft inspects your signature -- works the same way.
4. Real patterns: transcription, inference, clients
To make this concrete, here's an example drawn from a production voice AI pipeline. FasterWhisperTranscriber loads a Whisper model once per worker, then transcribes audio files across the cluster:
1import daft2from faster_whisper import WhisperModel, BatchedInferencePipeline345@daft.cls()6class FasterWhisperTranscriber:7 def __init__(self, model="distil-large-v3", compute_type="float32", device="auto"):8 self.model = WhisperModel(model, compute_type=compute_type, device=device)9 self.pipe = BatchedInferencePipeline(self.model)101112 @daft.method(return_dtype=daft.DataType.struct({13 "transcript": daft.DataType.string(),14 "segments": daft.DataType.list(daft.DataType.struct({15 "start": daft.DataType.float64(),16 "end": daft.DataType.float64(),17 "text": daft.DataType.string(),18 })),19 }))20 def transcribe(self, audio_file: daft.File):21 with audio_file.to_tempfile() as tmp:22 segments_iter, info = self.pipe.transcribe(23 str(tmp.name), vad_filter=True, batch_size=1624 )25 segments = [{"start": s.start, "end": s.end, "text": s.text}26 for s in segments_iter]27 text = " ".join(seg["text"] for seg in segments)28 return {"transcript": text, "segments": segments}293031transcriber = FasterWhisperTranscriber()32df = daft.from_glob_path("s3://my-bucket/audio/*.mp3")33df = df.with_column("result", transcriber.transcribe(daft.col("path")))34df.show()
The __init__ downloads and initializes Whisper once per worker. The transcribe method runs on every audio file, reusing the loaded model each time. A single hour of 48 kHz stereo audio can consume 518 MB of memory -- reinitializing for every file would make the pipeline unusable.
The same pattern applies broadly: image classification (ByteDance processes 300K+ images without OOM using @daft.cls), document embedding, API clients that maintain sessions, database connectors that pool connections. Anywhere you have a resource that's expensive to create and cheap to reuse, @daft.cls is the right tool.
Next week we'll go deeper on the AI/ML inference use cases specifically -- GPU allocation, memory management, and production patterns for running models at scale. This week is about understanding the building block: the stateful UDF itself.
5. Configure the worker, not the code
One more thing that @daft.cls gives you over @daft.func: resource configuration at the decorator level. Need a GPU? Want to cap concurrency? Need to escape the GIL? You declare it once on the class, and Daft enforces it across every worker:
1@daft.cls(2 gpus=1, # Reserve 1 GPU per worker instance3 max_concurrency=4, # Limit to 4 concurrent instances4 use_process=True, # Run in a separate process (escape the GIL)5)6class ImageClassifier:7 def __init__(self, model_name: str):8 import torch9 self.model = torch.hub.load("pytorch/vision", model_name, pretrained=True)10 self.model.cuda().eval()111213 def __call__(self, image_path: str) -> str:14 image = load_and_preprocess(image_path)15 with torch.no_grad():16 output = self.model(image.cuda())17 return decode_prediction(output)
Three lines of configuration. Daft handles GPU allocation, concurrency limits, and process isolation. The configuration lives on the decorator -- not scattered through your pipeline code, not in a separate config file, not in your cluster orchestrator. It's right next to the class it applies to, where you'd expect to find it.
We'll cover these resource options in detail in next week's post on AI/ML inference. For now, the key insight is that @daft.cls isn't just about state -- it's about giving you a single place to define what your operator needs and how it should run.