
Daft v0.7.14: Parquet Reader Rewrite, Streaming Distributed Limits, and UUIDv7
Daft v0.7.14 rewrites the Parquet reader on arrow-rs for up to 17x faster remote reads, ships streaming distributed limits, and adds native UUIDv7 generation.
by Daft TeamParquet Reader Rewrite
The parquet reader has been rewritten from scratch on top of arrow-rs's array_reader API, replacing the previous parquet2-based implementation. This is not a tuning pass — it is a new reader with a new IO model, new concurrency strategy, and new predicate pushdown path.
Every code example in this post is a self-contained PEP 723 script. Copy it into a file and run it with
uv run script.py— dependencies install automatically. Don't have uv? Install it here.
# /// script
# description = "Read parquet with projection and filter"
# requires-python = ">=3.12"
# dependencies = ["daft==0.7.14", "typing_extensions"]
# ///
import daft
df = daft.from_pydict({
"id": list(range(1000)),
"category": ["a", "b", "c", "d"] * 250,
"value": [i * 1.5 for i in range(1000)],
})
df.write_parquet("/tmp/bench_sample")
result = (
daft.read_parquet("/tmp/bench_sample")
.where(daft.col("value") > 500)
.select("id", "value")
)
result.show()The architecture breaks down by storage tier:
- Local reads use positioned coalesced
preads with a 64KB gap merge — no mmap, no whole-fileBytesallocation. - Remote reads issue per-row-group range GETs with adjacency merge (gaps up to 1MB are merged, runs over 24MB are split into ~16MB chunks). Fetches run in the background; decoders await an assembled column map via
Sharedand never park on per-byte IO. - Concurrency: per-row-group decode tasks are spawned concurrently via a
JoinSet, each draining into a bounded channel (capacity 1) that is read back in row-group order. Output is always file-ordered within a single file. - Predicate pushdown is two-phase: predicate columns are decoded in parallel across row groups, evaluated per-group, and reused during assembly so columns in both predicate and projection are decoded exactly once.
Benchmarks
Benchmarked on EC2 aarch64 (Graviton) in us-west-2. Local fixtures on EBS; remote reads against S3 in the same region. 2 warmups, 5-7 repeats, best-of. Comparison points: this PR vs 0.7.13 (previous PyPI latest) vs 0.7.3 (older baseline).
Local reads (selected highlights, daft.read_parquet(path).to_arrow()):
| rows | cols | row groups | 0.7.13 | v0.7.14 | speedup |
|---|---|---|---|---|---|
| 1M | 32 | 1 | 110.8 ms | 31.3 ms | 3.54x |
| 10K | 1024 | 1 | 73.9 ms | 34.5 ms | 2.14x |
| 10K | 1024 | 64 | 685.5 ms | 566.5 ms | 1.21x |
Local aggregate: 1.31x (1.11s total to 0.85s). Wide schemas with fewer row groups see the largest gains from the new decode path.
Remote reads (selected highlights, S3 same-region):
| shape | operation | 0.7.13 | v0.7.14 | speedup |
|---|---|---|---|---|
| 10M x 1 x 64 | full read | 2990.1 ms | 171.8 ms | 17.4x |
| 10M x 1 x 64 | filter | 2849.2 ms | 229.0 ms | 12.4x |
| 1M x 32 x 64 | filter | 6402.3 ms | 389.6 ms | 16.4x |
| 1M x 32 x 64 | full read | 3600.1 ms | 330.6 ms | 10.9x |
| 10K x 1024 x 64 | projection | 3080.5 ms | 395.5 ms | 7.8x |
| 10K x 1024 x 64 | full read | 4386.4 ms | 892.2 ms | 4.9x |
Remote aggregate: 3.82x (52.5s total to 13.7s). The many-row-group shapes show the most dramatic improvements — 0.7.13 had a remote regression on these shapes that this rewrite completely eliminates.
Full benchmark tables are in PR #6952. Rewritten by @colin-ho.
Streaming Distributed Limits
The previous distributed limit was a two-phase materialize-then-truncate approach: every partition fully materialized before a global limit was applied. For queries that scan large tables but only need the first N rows, this meant the entire dataset hit memory before being thrown away.
v0.7.14 replaces this with a streaming limit backed by a Ray actor. A LimitCounterActor pinned to the head node holds (remaining_skip, remaining_take) as atomic state. Each worker calls claim(input_id, num_rows) per morsel and receives back a (skip, take, done) tuple — the morsel is sliced in place, no buffering. When the budget is exhausted, the actor signals done and the scheduler cancels all remaining limit tasks.
# /// script
# description = "Distributed limit counter optimization"
# requires-python = ">=3.12"
# dependencies = ["daft==0.7.14", "typing_extensions"]
# ///
import daft
df = daft.from_pydict({
"partition_id": list(range(100)) * 1000,
"value": list(range(100000))
})
result = df.sort(daft.col("value"), desc=True).limit(10)
result.show()The claim mechanism is idempotent — start_task(input_id) refunds a prior attempt's claims, so retries see consistent state. The scheduler filters cancelled tasks at schedule_tasks and emits TaskEvent::Cancelled to avoid scheduling work that will never be consumed.
This matters most for queries like top-N over large shuffles, where the old approach could OOM materializing the full shuffle output before applying the limit. Implementation by @colin-ho in PR #6942.
UUIDv7 Generation
UUIDv7 solves a key problem with time-based identifiers — traditional UUID1 uses MAC addresses (privacy concerns) and UUID4 is random (poor database indexing). UUIDv7 combines a timestamp prefix with random suffix, giving you chronological ordering for efficient database operations while maintaining uniqueness guarantees. Daft's implementation delegates to the Rust uuid crate's Uuid::now_v7(), the same battle-tested library used across the Rust ecosystem.
# /// script
# description = "Generate UUIDv7 values with Daft"
# requires-python = ">=3.12"
# dependencies = ["daft==0.7.14", "typing_extensions"]
# ///
import daft
from daft.functions import uuid
df = daft.from_pydict({
"event_name": ["user_login", "page_view", "purchase"],
"user_id": [123, 456, 789]
})
df = df.with_columns({
"event": daft.col("event_name"),
"user_id": daft.col("user_id"),
"event_id": uuid(version="v7")
})
df.show()UUIDv7 timestamps maintain chronological order within the same millisecond, making them ideal for event tracking, distributed logging, and time-series data where you need both uniqueness and natural sorting. Thanks to @everettVT for implementing this in PR #6909.
JSON Array and Object Functions
Working with nested JSON requires extracting array lengths, object keys, and tuple values. This release adds three Spark-compatible functions to handle common JSON inspection patterns.
# /// script
# description = "JSON array and object functions"
# requires-python = ">=3.12"
# dependencies = ["daft==0.7.14", "typing_extensions"]
# ///
import daft
from daft.functions import json_array_length, json_object_keys, json_tuple
df = daft.from_pydict({
"json_array": ['[1, 2, 3]', '["a", "b", "c", "d"]', '[]'],
"json_object": ['{"name": "Alice", "age": 30}', '{"city": "NYC", "state": "NY", "zip": "10001"}', '{}'],
"json_tuple_data": ['{"name": "Alice", "age": 30}', '{"city": "NYC", "state": "NY"}', '{"single": "value"}']
})
df = df.with_columns({
"array_length": json_array_length(daft.col("json_array")),
"object_keys": json_object_keys(daft.col("json_object")),
"tuple_result": json_tuple(daft.col("json_tuple_data"), "name", "age")
})
df.show()The json_array_length function counts elements, json_object_keys extracts key arrays, and json_tuple extracts specific fields from JSON objects into a struct. All three maintain Spark SQL compatibility for migration scenarios. Contributed by @XuQianJin-Stars in PR #6945.
Everything Else
- Spark temporal aliases —
date_add,date_sub,add_monthsnow match Spark SQL naming conventions for easier migration. Contributed by @BABTUNA in PR #6830. - ASOF join benchmarking — standardized benchmark scripts for measuring ASOF join performance across different data patterns. Added by @euanlimzx in PR #6940.
- Conv function — PySpark-compatible
convfunction for base conversion operations. Implemented by @YuangGao in PR #6910. - Hash consistency — fixed
hash(-0.0) == hash(0.0)to match Python's floating-point hash semantics. Fixed by @rchowell in PR #6963. - Empty partition handling — short-circuit optimization for operations on empty micropartitions. Performance improvement by @rchowell in PR #6956.
- Shuffle file optimization — write one shuffle file per task instead of N partition files, reducing I/O overhead. Optimized by @colin-ho in PR #6948.
Community Contributions
@XuQianJin-Stars — shipped JSON array length, object keys, and tuple extraction functions
@YuangGao — added conv function for PySpark compatibility
@BABTUNA — implemented Spark-style temporal aliases and improved show output formatting
Upgrade
uv add "daft>=0.7.14"Or try the latest nightly:
uv pip install daft --pre --extra-index-url https://nightly.daft.aiCheck the full changelog for the complete list of merged PRs.
Join the Community
Questions, feedback, or want to show what you're building? Join us on Slack: daft.ai/slack

