Back to Blog
April 10, 2026
Daft v0.7.8: Dashboard Heatmaps, Vectorized GroupBy, and Native Image Hashing

Daft v0.7.8: Dashboard Heatmaps, Vectorized GroupBy, and Native Image Hashing

Daft's query dashboard now shows you exactly where time is going. Slow operators light up red, completed nodes turn green, and arrows trace the data flow through your pipeline. No more guessing which

by Daft Team

Daft's query dashboard now shows you exactly where time is going. Slow operators light up red, completed nodes turn green, and arrows trace the data flow through your pipeline. No more guessing which stage is the bottleneck.

Under the hood, grouped aggregations got a full rewrite — a new vectorized path replaces per-group index lists with dense group-id arrays and tight accumulator loops, delivering up to 4.4x faster groupbys at high cardinality. And image_hash() ships as a first-class function: eight perceptual hashing algorithms implemented in Rust, bit-exact with the Python imagehash library, for large-scale image deduplication without custom UDFs.

Dashboard Heatmaps and Pipeline Direction

The Daft query dashboard now visualizes operator performance in real time. @universalmind303 added heatmap coloring that maps each operator node to a color based on its execution time — slow operators glow red, and completed nodes transition to green (#6628).

A companion PR adds directional arrows between nodes so you can trace data flow through the pipeline at a glance (#6625).

Together these turn the dashboard from a static topology view into a live execution monitor. When a query is running, you see the hot path immediately — no need to scrub through JSONL event logs or guess which operator is stalling.

Dashboard heatmap showing operator performance

Vectorized Grouped Aggregation

The standard agg_groupby path materializes per-group index lists — Vec<Vec<u64>> — during make_groups(), then iterates each group's indices to compute the aggregate. At low cardinality that's fine. At high cardinality it means millions of small heap allocations and scattered memory access.

@desmondcheongzx rewrote this as a two-phase vectorized path (#6345):

Phase 1 (hash probe): A single pass over the groupby columns builds a dense group_ids: Vec<u32> mapping each row to its group, plus group_sizes: Vec<u64> for per-group row counts.

Phase 2 (vectorized accumulation): Each accumulator runs a tight loop over the dense group_ids array:

Old path:
  Group A: [0, 2, 4] → vals[0] + vals[2] + vals[4]  // scattered access
  Group B: [1, 3]     → vals[1] + vals[3]

New path:
  group_ids = [0, 1, 0, 1, 0]
  for (gid, val) in group_ids.zip(vals):
      sums[gid] += val                                // sequential access

Count accumulators skip the scatter loop entirely, using pre-computed group_sizes from Phase 1. Accumulators use an AggAccumulator enum instead of trait objects to avoid vtable dispatch in the hot loops.

@BABTUNA extended the inline path with min/max accumulator types (#6604), bringing the full set to count, sum, min, and max. Min/max required per-type accumulator variants (unlike sum which widens to Int64), with NaN propagation matching the fallback path exactly.

@srilman then optimized the hash map layer (#6613): swapping FnvHashMap for HashBrown with foldhash, using SmallVec<[u64; 2]> to cut jemalloc deallocation overhead in make_groups, and restructuring list_agg to avoid millions of small vector allocations. The combined effect: a 4x improvement on connected components workloads (250s → 70s).

Benchmarks on ClickBench (c6a.4xlarge, 100 parquet files, ~100M rows):

QueryBeforeAfterSpeedupPattern
Q3313.09s9.51s1.38xGROUP BY WatchID, ClientIP (high cardinality)
Q182.56s1.83s1.40xGROUP BY UserID, SearchPhrase
Q162.20s1.68s1.31xGROUP BY UserID
Total (43 queries)118.4s106.6s-10%

The total ClickBench time dropped 10% below the v0.7.4 baseline — the first time the streaming engine has beaten the pre-regression numbers.

Native Image Hashing

v0.7.7 shipped the perceptual hashing internals. v0.7.8 ships the user-facing API.

@chenghuichen added image_hash() — a first-class Daft function for large-scale image similarity detection and deduplication (#6485):

from daft.functions import image_hash
 
df = daft.read_parquet("s3://my-bucket/images/")
df = df.with_column("hash", image_hash(df["image"], method="phash"))
 
# Find near-duplicates by grouping on hash
dupes = df.groupby("hash").count().where(df["count"] > 1)

Eight algorithms are available:

AlgorithmBest for
phash (default)General-purpose — full 2D DCT, most robust
dhashFast structural comparison
ahashFastest — average hash
whashMulti-level Haar wavelet
crop_resistantRobust against cropping
colorhashHSV color distribution

All algorithms are implemented natively in Rust with an FFT-based DCT and multi-level Haar DWT. Results are bit-exact against the Python imagehash library. Batch hashing runs in parallel across all CPU cores via rayon, with the resize kernel operating on single-channel luma instead of RGB — together yielding 5-25x speedups over an equivalent Python UDF.

Null images produce null hashes, consistent with other Daft column operations. Closes #4889.

df.skip_existing() — Checkpoint-Based Deduplication

@everySympathy added df.skip_existing() for idempotent pipeline runs (#5931):

df = daft.read_parquet("s3://bucket/input/")
df = df.skip_existing(
    existing_path="s3://bucket/output/",
    key_column="id",
    file_format="parquet",
)
df.write_parquet("s3://bucket/output/", write_mode="append")

The operator reads keys from the output path and filters rows that have already been processed. Supports composite keys for pipelines where a single column isn't sufficient:

# RAG chunking pipeline — chunk_id is the dedup key
df = df.skip_existing(
    existing_path="s3://bucket/output/",
    key_column=["doc_id", "chunk_id"],
    file_format="parquet",
)

Currently supported on the distributed (Ray) runner. This addresses a long-standing request from the community (discussion #5868) — re-running ETL or RAG pipelines without reprocessing rows that already exist in the output.

Everything Else

  • Subscriber API improvements@srilman exposed nicer public APIs for working with subscribers, aimed at OpenLineage support (#6631). @cckellogg collapsed the subscriber trait to a single on_event method (#6593).
  • DataFrame.skew() and DataFrame.var()@kerwin-zk added global aggregation convenience methods for skewness (#6619) and variance (#6584).
  • Flotilla checkpoint wiring@rohitkulshreshtha threaded CheckpointId from distributed task creation through the local execution layer, with a new StageCheckpointKeysOperator that stages key columns to the checkpoint store (#6567).
  • Catalog function support@gavin9402 added a get_function interface on the Catalog trait for resolving metastore-registered functions during planning (#6524).
  • Configurable Flotilla worker timeout@desmondcheongzx replaced the hardcoded 120s actor startup timeout with daft.set_execution_config(worker_startup_timeout=300) or DAFT_WORKER_STARTUP_TIMEOUT env var (#6592).
  • Paimon DataSource migration@chenghuichen migrated PaimonScanOperator to the new DataSource API (#6600).
  • Local shuffle disk read@srilman made flight shuffle read local data directly from disk, bypassing the network stack (#6436).
  • BatchManager consolidation@universalmind303 consolidated BatchManager as a single buffer abstraction (#6566).
  • Repartition exchange unification@ohbh unified the repartition exchange write flow across Ray and Flight (#6499).
  • Iceberg schema evolution fix@sankarreddy-atlan fixed predicate evaluation against Parquet files that predate an Iceberg schema evolution — columns absent from old files no longer crash row group pruning or post-read filters (#6551).
  • Ray nightly fix@jeevb fixed nightly Daft version resolution in Ray runtime environments (#6630).
  • Dashboard fixes@samstokes fixed partition set passing for topology matching (#6576), smart per-node stats aggregation (#6574), and spurious Flotilla lifecycle events (#6573).
  • IO retry on transient errors@desmondcheongzx added retry for transient errors on initial GET requests (#6544).
  • Byte range fix@gweaverbiodev fixed the async reader skipping byte retrieval when range start equals end (#6602).
  • Lance SQL filter docs@Jay-ju added documentation for custom SQL filters in Lance (#5916).

Community Contributions

Upgrade

uv add "daft>=0.7.8"

Or try the latest nightly:

uv pip install daft --pre --extra-index-url https://nightly.daft.ai

Check the full changelog for the complete list of 34 merged PRs.

Suggested Posts

Get updates, contribute code, or say hi.
Daft Engineering Blog
Join us as we explore innovative ways to handle multimodal datasets, optimize performance, and simplify your data workflows.
Github Discussions Forums
join
GitHub logo
The Distributed Data Community Slack
join
Slack logo