
Daft v0.7.7: Image Dedup Hashing, Parquet Cache Fix, and df.shuffle()
Daft v0.7.7 ships five perceptual image hashing algorithms for deduplication, fixes a parquet streaming regression, and adds df.shuffle() for ML data prep.
by Daft TeamYou have a billion images and need to find the duplicates. You could hash every file, but byte-identical matching misses resized copies, crops, and recompressed variants. Daft v0.7.7 ships five perceptual image hashing algorithms that catch all of these — aHash, dHash, pHash, wHash, and a crop-resistant hash — so deduplication works on what images look like, not what their bytes contain.
This release also fixes a parquet read regression introduced in v0.7.5 (2x slower aggregations are back to baseline), adds df.shuffle() for ML data prep, and makes coalesce short-circuit per the SQL spec.
Perceptual Image Hashing
@linguoxuan implemented five image hashing algorithms for large-scale deduplication (#6338):
from daft.functions import image_hash
df = daft.read_parquet("s3://my-bucket/images/")
# Average hash — fast, good for exact and near-exact duplicates
df = df.with_column("ahash", image_hash(df["image"], "average"))
# Difference hash — better at detecting gradients and edges
df = df.with_column("dhash", image_hash(df["image"], "difference"))
# Perceptual hash — DCT-based, robust to scaling and compression
df = df.with_column("phash", image_hash(df["image"], "perceptual"))
# Wavelet hash — DWT-based, handles color and texture changes
df = df.with_column("whash", image_hash(df["image"], "wavelet"))The four standard algorithms return UInt64 — a fixed 8-byte hash you can compare with Hamming distance. Two images with a distance under 10 are likely perceptual duplicates.
For cases where images are cropped differently, there's also a crop-resistant hash:
from daft.functions import image_crop_resistant_hash
df = df.with_column("crhash", image_crop_resistant_hash(df["image"]))This returns variable-length Binary because it segments the image and hashes each segment independently, making it robust to arbitrary cropping.
All five algorithms are implemented in Rust and verified against the imagehash Python library. This closes #4889.
Parquet Streaming: Cache Regression Fixed
A performance regression introduced in v0.7.5 made aggregation workloads 2-4x slower on the streaming parquet path. @desmondcheongzx traced it to batch concatenation defeating CPU cache locality (#6558).
The streaming reader was concatenating all ~128K-row batches from each row group into a single ~500K-row batch before sending it downstream. On AMD EPYC (512KB L2 per core), a 128K-row source buffer fits comfortably in L2. A 500K-row concatenated batch blows past it — the downstream cast alone produces a 4MB output buffer that thrashes L2.
The fix extracts build_rg_reader and iterates over individual batches, sending each ~128K batch through the channel without concatenation.
Benchmarks on c6a.4xlarge (16 vCPUs, AMD EPYC), 100 ClickBench parquet files, ~100M rows:
| Workload | v0.7.4 | v0.7.5 (regressed) | v0.7.7 (fixed) |
|---|---|---|---|
| 90 SUMs over Int16 | 0.576s | 1.226s | 0.652s |
| 90 SUM(col + i) | 1.084s | 4.643s | 1.714s |
The simple SUM workload is back to within 13% of the v0.7.4 baseline. The expression-heavy workload recovered from a 4.3x regression to a 1.6x gap — the remaining difference comes from the streaming path itself, not the batch sizing.
df.shuffle()
@srilman added a first-class shuffle operation for randomly rearranging rows (#6481):
df = daft.read_parquet("s3://my-bucket/training-data/")
df = df.shuffle(seed=42)Under the hood, shuffle generates a random integer per row and sorts by it — the same approach HuggingFace Datasets uses. It's implemented as an explicit logical plan node (not sugar over sort) so the optimizer can push limits through it without hitting sort's ordering constraints.
The PR also adds a standalone random_int() expression:
from daft.functions import random_int
df = df.with_column("sample_group", random_int(low=0, high=10, seed=7))This is the first half of #2612. A batch_size variant for chunked shuffling is planned.
Coalesce Short-Circuit
coalesce(a, b, c) should stop evaluating the moment it finds a non-null value. Daft's implementation didn't — it evaluated every argument unconditionally, which meant expensive expressions in later positions ran even when earlier values were non-null.
@Lucas61000 fixed this by promoting coalesce from a ScalarUDF to a first-class Expr::Coalesce variant with early-exit semantics in the record batch evaluator (#6525). The optimizer, partitioning logic, and all expression visitors were updated to handle the new variant.
This closes #4069.
concat_ws
@euanlimzx added concat_ws — concatenate with separator, skipping nulls (#6543):
from daft import col
from daft.functions import concat_ws
# Join columns with a separator
df.select(concat_ws("/", col("bucket"), col("prefix"), col("filename")))
# "my-bucket/data/file.parquet"
# Nulls are skipped, not propagated (unlike concat)
df.select(concat_ws(" ", col("first"), col("middle"), col("last")))
# "Alice Smith" when middle is null — not nullThis is the key difference from concat: concat("a", null, "b") returns null, but concat_ws("-", "a", null, "b") returns "a-b". Common use cases: building file paths, composite keys, and display strings. Tracks #3792.
Timezone Convert and Replace
@aaron-ang shipped two timezone operations with correct edge-case handling (#6106):
# convert_time_zone: preserves the instant, changes the display
col("ts").convert_time_zone("UTC", from_timezone="+02:00")
# 2024-01-01 00:00:00 +02:00 → 2023-12-31 22:00:00 UTC
# replace_time_zone: preserves local time, swaps the timezone label
col("ts").replace_time_zone("+02:00")
# 2024-01-01 00:00:00 UTC → 2024-01-01 00:00:00 +02:00
# Remove timezone entirely
col("ts").replace_time_zone()
# 2024-01-01 00:00:00 UTC → 2024-01-01 00:00:00 (naive)The implementation now errors when from_timezone is missing on naive timestamps (instead of silently assuming UTC) and ignores from_timezone when the timestamp already carries timezone info. A new ParsedTimezone enum unifies fixed-offset and named-timezone handling internally. Closes #4096.
Everything Else
_SUCCESSfile for parquet writes — @Lucas61000 addedwrite_success_file=Trueondf.write_parquet(), writing an empty_SUCCESSmarker on completion for Spark-compatible workflows (#6090). Closes #4085.- Subscriber
on_eventdispatch — @cckellogg unified subscriber callbacks into a singleon_eventmethod with anEventenum (OperatorStarted,OperatorFinished,Stats), replacing scattered per-method calls (#6508). DAFT_TRACEconsole tracing — @cckellogg addedDAFT_TRACEandDAFT_TRACE_FORMATenv vars for console trace output incompact,pretty, orjsonformats (#6458).decode("utf-8")rewritten as cast — @srilman rewritesdecode(col, "utf-8")as a cast at the Python layer, unblocking cast-elimination and filter pushdown optimizations (#6537).- Binary size reduction — @NikkeTryHard de-monomorphized sort and dispatch paths in
daft-core, shrinking the shared library by 840KB (#6541). - Python source shim → Rust — @rchowell replaced the Python
_DataSourceShimwith a RustScanOperatorbacked by either Python or Rust data sources, an incremental step toward full streaming sources (#6556). - Supply chain hardening — @everettVT added
exclude-newerto uv and pinned CI actions to SHA tags (#6565). - Dual telemetry — @ykdojo added side-by-side telemetry to osstelemetry.io alongside the existing Scarf endpoint (#6540).
explain()after writes — @Abyss-lord fixedexplain()returning nonsensical plans afterwrite_parquet/write_csv/write_json(#6564).df.metricspreserved after shutdown — @cckellogg fixeddf.metricsreturning empty by preserving finalized stats beforeRuntimeStatsManagerexits (#6555).- Dashboard Flotilla fix — @samstokes hid the Results tab for Flotilla queries where it would never populate (#6557).
Community Contributions
- @linguoxuan — Five perceptual image hashing algorithms for deduplication
- @Lucas61000 —
_SUCCESSfile support, coalesce short-circuit fix - @euanlimzx —
concat_wsstring function - @srilman —
df.shuffle()andrandom_int(), decode-as-cast optimization - @aaron-ang — Timezone convert and replace
- @NikkeTryHard — Binary size reduction via de-monomorphization
- @Abyss-lord —
explain()fix after write operations - @ykdojo — Dual telemetry endpoint
Upgrade
uv add "daft>=0.7.7"Or try the latest nightly:
uv pip install daft --pre --extra-index-url https://nightly.daft.aiCheck the full changelog for the complete list of 17 merged PRs.