We Value Your Privacy

We use cookies to enhance your browser experience, analyze site traffic, and personalize content. By clicking "Accept All," you consent to our use of cookies. For more, read our Privacy Policy
Back to Blog
September 24, 2025

After the First Run

Using Daft’s observability tools to uncover performance pitfalls

by Srinivas Lade

Developing AI workloads can be a complex and iterative process. Often, the initial implementation of an AI pipeline, while functionally correct, might suffer from unexpected bottlenecks or scale imperfectly. Identifying and resolving these issues can be a time-consuming and frustrating endeavor.

At Eventual, we’re building Daft to be the foremost engine for running models on data. Thus, while performance is critical, our priority is reducing the time it takes to successfully implement a workload. To this end, we have developed a variety of tools for lightweight profiling during development. This blog post will demonstrate how one could utilize these tools while building a Daft pipeline.

Watch a quick 3 minute tutorial

Developer Advocate ChanChan Mao walks through the 3 key steps mentioned in this blog post to show you exactly how to identify bottlenecks and optimize your data processing pipeline.

Watch the video →

Let’s look at a (fun) example

I recently moved to San Francisco, so in order to force myself to explore the city, I’ve been going to various parks in different neighborhoods. Since SF is a very dog-friendly city, there have been a bunch of dogs playing around at every park I visit. So I thought a fun mini-project would be to figure out what breeds are the most common to find in the city.

To get started, I found and saved a bunch of pictures of SF dog parks, as many as I could find. I then wrote a simple Python function that:

  • For each image: Look for and detect dogs in the image using the Huggingface moondream2 model

  • For each dog:

    - Crop the image around the dog and save it

    - Give the image to the OpenAI visions API and ask it “What’s the breed of the dog in this picture?”

With some trial-and-error, I was able to get this simple script working for a single image: 

1from io import BytesIO
2import os
3import base64
4from uuid import uuid4
5
6from transformers import AutoModelForCausalLM
7from PIL import Image
8import openai
9
10
11# Model for object detection
12model = AutoModelForCausalLM.from_pretrained(
13 "vikhyatk/moondream2",
14 revision="2025-06-21",
15 trust_remote_code=True,
16 device_map={"": "mps"}
17)
18
19# OpenAI client for dog breed detection
20client = openai.OpenAI(api_key=os.environ["OPENAI_KEY"])
21
22
23def analyze_image(path):
24 """
25 Given a file path to an image:
26 1) Find all dogs in the image
27 2) Crop and save the image around the dog
28 2) Identify their breed
29 """
30 original_image = Image.open(path)
31 output = model.detect(original_image, "dog")
32
33 descriptions = []
34 for bbox in output["objects"]:
35 x_min = int(bbox["x_min"] * original_image.width)
36 y_min = int(bbox["y_min"] * original_image.height)
37 x_max = int(bbox["x_max"] * original_image.width)
38 y_max = int(bbox["y_max"] * original_image.height)
39
40 image = original_image.crop((x_min, y_min, x_max, y_max)).resize((854, 480))
41 byte_buffer = BytesIO()
42 image.save(byte_buffer, format='JPEG')
43 png_bytes = byte_buffer.getvalue()
44
45 out_name = f"images/{uuid4()}.jpeg"
46 with open(out_name, "wb") as f:
47 f.write(png_bytes)
48
49 b64_image = base64.b64encode(png_bytes).decode("utf-8")
50 response = client.responses.create(
51 model="gpt-4o-mini",
52 input=[
53 {
54 "role": "user",
55 "content": [
56 {"type": "input_text", "text": "What dog breed does this look like? Best guess. Name only"},
57 {"type": "input_image", "image_url": f"data:image/jpeg;base64,{b64_image}"}
58 ],
59 }
60 ],
61 )
62 descriptions.append({"description": response.output[0].content[0].text, "image_out": out_name})
63
64 return descriptions
65

Let’s try it out on the following image:

After analyzing, this is what we get:

Awesome, it works! At this point, I thought it would be great if I could scale up my code with Daft. Plus, I could use its built-in operations for listing all the images I have. Daft makes it very easy to take arbitrary Python code and run it in a pipeline using the user-defined-function (UDF) API. This was my initial version:

1from io import BytesIO
2import os
3import base64
4from uuid import uuid4
5
6from transformers import AutoModelForCausalLM
7from PIL import Image
8import openai
9
10import daft
11from daft import col, DataType
12
13
14@daft.udf(return_dtype=list[{
15    "description": str,
16    "cropped": str,
17    "bbox": DataType.fixed_size_list(DataType.uint64(), 4),
18}])
19class MassiveUDF():
20    def __init__(self):
21        self.model = AutoModelForCausalLM.from_pretrained(
22            "vikhyatk/moondream2",
23            revision="2025-06-21",
24            trust_remote_code=True,
25            device_map={"": "mps"}
26        )
27        
28        self.client = openai.OpenAI(api_key=os.environ["OPENAI_KEY"])
29    
30    def run_one(self, path):
31        original_image = Image.open(path.removeprefix("file://"))
32        output = self.model.detect(original_image, "dog")
33
34        descriptions = []
35        for bbox in output["objects"]:
36            x_min = int(bbox["x_min"] * original_image.width)
37            y_min = int(bbox["y_min"] * original_image.height)
38            x_max = int(bbox["x_max"] * original_image.width)
39            y_max = int(bbox["y_max"] * original_image.height)
40
41            image = original_image.crop((x_min, y_min, x_max, y_max)).resize((854, 480))
42            byte_buffer = BytesIO()
43            image.save(byte_buffer, format='JPEG')
44            jpeg_bytes = byte_buffer.getvalue()
45
46            out_name = f"images/{uuid4()}.jpeg"
47            with open(out_name, "wb") as f:
48                f.write(jpeg_bytes)
49
50            b64_image = base64.b64encode(jpeg_bytes).decode("utf-8")
51            response = self.client.responses.create(
52                model="gpt-4o-mini",
53                input=[
54                    {
55                        "role": "user",
56                        "content": [
57                            {"type": "input_text", "text": "What dog breed does this look like? Best guess. Name only"},
58                            {"type": "input_image", "image_url": f"data:image/jpeg;base64,{b64_image}"}
59                        ],
60                    }
61                ],
62            )
63            descriptions.append({
64                "description": response.output[0].content[0].text,
65                "cropped": out_name,
66                "bbox": [x_min, y_min, x_max - x_min, y_max - y_min],
67            })
68        return descriptions
69
70    def __call__(self, images):
71        return [self.run_one(image) for image in images]
72
73
74df = daft.from_glob_path("sources/*.jpeg")
75df = df.with_column("output", MassiveUDF(col("path")))
76df = df.explode("output").select(
77    col("path").alias("original"),
78    col("output")["bbox"],
79    col("output")["cropped"],
80    col("output")["description"],
81)
82df.show(n=25, max_width=50)
83

 

But when I ran it via python detect.py, it was annoyingly slow. For just 6 images with 25 dogs in total, it took more than a minute. Plus, looking at htop, I could see that I’m barely using my CPU cores.

What’s going on? Looking at the progress bar, I don’t see much going on, so I’m not sure what to do next.

Using Daft as a code wrapper

You may have run into a similar issue when first using Daft. Since Daft makes it easy to include UDFs, it’s very easy to just dump all of your code into a single UDF and call it a day.

This isn’t necessarily a bad approach. Given a UDF, the underlying Daft execution engine can spin up multiple instances of the code and run them in parallel.

However, Daft does not modify the Python code inside of UDFs, effectively treating them as black boxes. While that helps ensure correctness, it makes it more difficult to see what’s actually going on. In our case, without starting to add some timers and print statements, we don’t know why the massive UDF takes so long.

So how can I go about profiling and optimizing my script?

Thankfully, Daft comes with a variety of first-class tools to profile your code and help you find bottlenecks. For example, you may have seen the progress bar that appears when we run a Python script from the command line. In addition, there is a 

In general, there are 3 tips to optimizing your code:

  1. 1.

    Break UDFs down into smaller pieces

  2. 2.

    Rewrite UDFs into native Daft expressions

  3. 3.

    Tune your UDFs

Let’s go through these steps 1-by-1 to see how they can help you iterate on your workflow to take full advantage of Daft

1) Break UDF’s down into smaller pieces

By splitting UDF’s into smaller pieces, Daft will have more awareness about the flow of operations in your workload and can make better decisions. Not only can Daft now track metrics between the different UDFs, but it can also identify potential bottlenecks and allocate more resources. For example, if you have two UDFs, udf_a and udf_b, where udf_b takes significantly longer, then the engine will run more instances of udf_b and fewer of udf_a.

Note that in order to split the UDF into pieces, you need to know the types of the data that pass between, since Daft has to serialize the data in between. This can be a good indicator of where it makes most sense to split the function.

Let’s take our code and break it down into much smaller pieces.

1from io import BytesIO
2import os
3import base64
4from uuid import uuid4
5
6from transformers import AutoModelForCausalLM
7from PIL import Image
8import openai
9
10import daft
11from daft import col, DataType
12
13
14@daft.udf(return_dtype=DataType.image())
15def load_image(paths):
16    return [Image.open(path.removeprefix("file://")) for path in paths]
17
18
19@daft.udf(return_dtype=DataType.list(DataType.fixed_size_list(DataType.uint64(), 4)))
20class DetectDogs:
21    def __init__(self):
22        self.model = AutoModelForCausalLM.from_pretrained(
23            "vikhyatk/moondream2",
24            revision="2025-06-21",
25            trust_remote_code=True,
26            device_map={"": "mps"}
27        )
28    
29    def run_one(self, image):
30        image = Image.fromarray(image)
31        output = self.model.detect(image, "dog")
32        bboxes = []
33        for bbox in output["objects"]:
34            x_min = int(bbox["x_min"] * image.width)
35            y_min = int(bbox["y_min"] * image.height)
36            x_max = int(bbox["x_max"] * image.width)
37            y_max = int(bbox["y_max"] * image.height)
38            bboxes.append([x_min, y_min, x_max, y_max])
39        return bboxes
40
41    def __call__(self, images):
42        return [self.run_one(image) for image in images]
43
44
45@daft.udf(return_dtype=DataType.binary())
46def crop_dog_to_jpeg(images, bboxes):
47    outputs = []
48    for image, bbox in zip(images, bboxes):
49        cropped = Image.fromarray(image).crop(tuple(bbox)).resize((854, 480))
50        byte_buffer = BytesIO()
51        cropped.save(byte_buffer, format='JPEG')
52        outputs.append(byte_buffer.getvalue())
53    return outputs
54
55
56@daft.udf(return_dtype=DataType.string())
57class IdentifyBreed():
58    def __init__(self):
59        self.client = openai.OpenAI(api_key=os.environ["OPENAI_KEY"])
60
61    def run_one(self, image_jpeg: bytes):
62        b64_image = base64.b64encode(image_jpeg).decode("utf-8")
63        response = self.client.responses.create(
64            model="gpt-4o-mini",
65            input=[
66                {
67                    "role": "user",
68                    "content": [
69                        {"type": "input_text", "text": "What dog breed does this look like? Best guess. Name only"},
70                        {"type": "input_image", "image_url": f"data:image/jpeg;base64,{b64_image}"}
71                    ],
72                }
73            ],
74        )
75        return response.output[0].content[0].text
76
77    def __call__(self, images):
78        return [self.run_one(image) for image in images]
79
80
81@daft.udf(return_dtype=DataType.string())
82def save_image(image_jpegs):
83    outputs = []
84    for image_jpeg in image_jpegs:
85        out_name = f"images/{uuid4()}.jpeg"
86        with open(out_name, "wb") as f:
87            f.write(image_jpeg)
88        outputs.append(out_name)
89    return outputs
90
91
92df = daft.from_glob_path("sources/*.jpeg")
93df = df.with_column("image", load_image(col("path")))
94df = df.with_column("bbox", DetectDogs(col("image")).explode())
95df = df.with_column("dog_image", crop_dog_to_jpeg(col("image"), col("bbox")))
96df = df.with_column("description", IdentifyBreed(col("dog_image")))
97df = df.with_column("cropped", save_image(col("dog_image")))
98df = df.select(
99    col("path").alias("original"),
100    col("bbox"),
101    col("cropped"),
102    col("description"),
103)
104df.show(n=25, max_width=50)
105

While the code does look longer, it’s easier to see the flow of steps at the bottom. Plus, the steps now appear in the progress bar!

It can be hard at this point to track how long each step takes, particularly the really short ones. For this case, Daft provides a profiling tool that will track runtimes of operators and write them out to a Markdown using Mermaid for a diagram. You can enable it via the environment variable DAFT_DEV_ENABLE_EXPLAIN_ANALYZE=1. If we rerun with it enabled, we get this profile out:

From this, we can quickly identify that the OpenAI call to identify the breed takes the longest. What can we do from here? 

2) Use native Daft functions

In addition to orchestrating your AI pipeline, Daft comes with built-in functions for many common data operations, particularly on multimodal data. These functions are often written in Rust with performance in mind, battle-tested at scale, and can be simpler to use than a custom UDF. Let’s rewrite our pipeline with Daft functions and try that.

1import base64
2import os
3
4from transformers import AutoModelForCausalLM
5from PIL import Image
6import openai
7
8import daft
9from daft import col, DataType
10
11
12@daft.udf(return_dtype=DataType.list(DataType.fixed_size_list(DataType.uint64(), 4)))
13class DetectDogs:
14    def __init__(self):
15        self.model = AutoModelForCausalLM.from_pretrained(
16            "vikhyatk/moondream2",
17            revision="2025-06-21",
18            trust_remote_code=True,
19            device_map={"": "mps"}
20        )
21    
22    def run_one(self, image):
23        image = Image.fromarray(image)
24        output = self.model.detect(image, "dog")
25        bboxes = []
26        for bbox in output["objects"]:
27            x_min = int(bbox["x_min"] * image.width)
28            y_min = int(bbox["y_min"] * image.height)
29            x_max = int(bbox["x_max"] * image.width)
30            y_max = int(bbox["y_max"] * image.height)
31            # Need width and height
32            bboxes.append([x_min, y_min, x_max - x_min, y_max - y_min])
33        return bboxes
34
35    def __call__(self, images):
36        return [self.run_one(image) for image in images]
37
38
39@daft.udf(return_dtype=DataType.string())
40class IdentifyBreed():
41    def __init__(self):
42        self.client = openai.OpenAI(api_key=os.environ["OPENAI_KEY"])
43
44    def run_one(self, image_jpeg: bytes):
45        b64_image = base64.b64encode(image_jpeg).decode("utf-8")
46        response = self.client.responses.create(
47            model="gpt-4o-mini",
48            input=[
49                {
50                    "role": "user",
51                    "content": [
52                        {"type": "input_text", "text": "What dog breed does this look like? Best guess. Name only"},
53                        {"type": "input_image", "image_url": f"data:image/jpeg;base64,{b64_image}"}
54                    ],
55                }
56            ],
57        )
58        return response.output[0].content[0].text
59
60    def __call__(self, images):
61        return [self.run_one(image) for image in images]
62
63
64df = daft.from_glob_path("sources/*.jpeg")
65df = df.with_column("image", col("path").url.download().image.decode())
66df = df.with_column("bbox", DetectDogs(col("image")).explode())
67df = df.with_column("dog_image", col("image").image.crop(col("bbox")).image.resize(854, 480).image.encode("jpeg"))
68df = df.with_column("description", IdentifyBreed(col("dog_image")))
69df = df.with_column("cropped", col("dog_image").url.upload("images"))
70df = df.select(
71    col("path").alias("original"),
72    col("bbox"),
73    col("cropped"),
74    col("description"),
75)
76df.show(n=25, max_width=50)
77

Looks like we were able to rewrite a lot of the UDFs for working with images into Daft functions; the only remaining ones are for detection and inference. At a minimum, the script is simpler to follow now. However, since the bottleneck was in the OpenAI inference, we don’t see much of a speedup in our profile.

3) Tune your UDFs

Now that we’ve converted some of our UDFs into native Daft expressions, we’re left with code that we have to keep in Python. So what do we do? For situations like this, Daft provides additional variants and parameters for UDFs to tune them to your specific use-case.

Using the right function variant for the situation

Depending on the type and return value of the function, Daft can perform additional optimizations at execution time. For example, if the function is async, the engine can run more instances concurrently. In our case, UDF DetectDogs returns a list of values, but we could alternatively write it as a generator function so we can process more iteratively.

1# Replacing DetectDogs UDF with a generator function
2
3model = AutoModelForCausalLM.from_pretrained(
4    "vikhyatk/moondream2",
5    revision="2025-06-21",
6    trust_remote_code=True,
7    device_map={"": "mps"}
8)
9
10@daft.func(return_dtype=DataType.fixed_size_list(DataType.uint64(), 4))
11def detect_dogs(image: Image.Image):
12    image = Image.fromarray(image)
13    output = model.detect(image, "dog")
14
15    for bbox in output["objects"]:
16        x_min = int(bbox["x_min"] * image.width)
17        y_min = int(bbox["y_min"] * image.height)
18        x_max = int(bbox["x_max"] * image.width)
19        y_max = int(bbox["y_max"] * image.height)
20        # Need width and height
21        yield [x_min, y_min, x_max - x_min, y_max - y_min]

Set UDF parameters

If you want to go the extra mile, the daft.udf decorator has additional parameters to tune how it will be executed. Most of the parameters, like concurrency, num_gpus, and memory_bytes help reduce contention and potential OOMs by limiting the number of concurrently running instances.

But for our case, we want to tune the batch_size parameter, which controls the number of rows provided as input to the function. Smaller batch sizes can help parallelize smaller workloads like our case, while larger batch sizes can reduce overhead and maximize throughput. Since we’re working with a few dozen rows, let’s set batch_size=1

With our changes, our workload finished in 24s; we made it 3x faster! Note that even though the profile states that IdentifyBreed took longer, it was executed concurrently, so overall it was faster.

Note: In reality, you may perform these three steps in any order and multiple times throughout the development process, depending on what your bottleneck actually is and if you’re happy with the performance. But the general idea of iterative development still applies.

What’s next?

Now with a robust foundation, we can start experimenting with larger changes, such as:

  • Using different models for object detection, such as YOLO

  • Trying out a locally hosted image model for classification to avoid those OpenAI API costs

  • Annotating the input image with bounding boxes and labels for each dog

Generally speaking, this 3-stage approach, paired with our built-in observability and profiling tools, allows developers to quickly identify bottlenecks and iterate, whether running on a single laptop or at scale.

We encourage you to explore Daft further and apply these techniques to your own AI workloads. Check us out on Github, follow us on LinkedIn, and join us on Slack.

Get updates, contribute code, or say hi.
Daft Engineering Blog
Join us as we explore innovative ways to handle multimodal datasets, optimize performance, and revolutionize your data workflows.
Github Discussions Forums
join
GitHub logo
The Distributed Data Community Slack
join
Slack logo