Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
da232ae
feat: add SupportsSetRange protocol and store implementations
d-v-b Apr 15, 2026
e57bd5a
Merge branch 'main' into feat/byte-range-setter
d-v-b Apr 15, 2026
579ff16
test: add tests for SupportsSetRange on MemoryStore and LocalStore
d-v-b Apr 15, 2026
2b9d804
docs: changelog
d-v-b Apr 15, 2026
25f05e6
Merge branch 'main' into feat/byte-range-setter
d-v-b Apr 16, 2026
f04f594
Merge branch 'main' into feat/byte-range-setter
d-v-b Apr 23, 2026
5c26a08
test: add tests for open / not open
d-v-b May 1, 2026
91590dd
fixup
d-v-b May 1, 2026
7757ecd
Merge branch 'main' into feat/byte-range-setter
d-v-b May 1, 2026
e7745ac
Merge branch 'feat/byte-range-setter' of https://github.com/d-v-b/zar…
d-v-b May 1, 2026
a9da33a
chore: mypy
d-v-b May 1, 2026
1af7019
Merge branch 'main' into feat/byte-range-setter
d-v-b May 11, 2026
ca60e90
Merge branch 'main' into feat/byte-range-setter
d-v-b May 15, 2026
25760bf
Merge branch 'main' into feat/byte-range-setter
d-v-b May 15, 2026
f3b8afe
Merge branch 'main' into feat/byte-range-setter
d-v-b May 15, 2026
b70f70d
Merge branch 'main' into feat/byte-range-setter
d-v-b May 22, 2026
7121616
Merge branch 'main' into feat/byte-range-setter
d-v-b May 29, 2026
6aa4e6d
Merge branch 'main' into feat/byte-range-setter
d-v-b May 29, 2026
225cea0
Potential fix for pull request finding
d-v-b May 30, 2026
2db2c9a
feat: add per-key locking to MemoryStore and LocalStore set_range (#178)
mkitti May 31, 2026
e02437c
fix: set_range capability detection, bounds check, and managed path p…
d-v-b Jun 3, 2026
a8f1fe7
fix: share managed-store set_range locks; isolate from_url filesystems
d-v-b Jun 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changes/3907.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add the `SupportsSetRange` protocol for stores that support writing to a byte range within an existing value, implemented by `LocalStore` and `MemoryStore`. This is necessary to support in-place writes of sharded arrays (e.g. writing a single subchunk without rewriting the entire shard).

Byte-range writes are exposed as an opt-in protocol rather than a method on the `Store` ABC. Only a few stores can perform them natively, and most cannot. A universal method with a read-modify-write fallback (as in the Rust `zarrs` crate) would let every store participate, but for the motivating use case that fallback would silently rewrite an entire shard, defeating the purpose. The opt-in protocol keeps the cost model honest and keeps `set_range` out of the signatures of stores that will never support it; any fallback strategy is left to the caller (the sharding codec). Stores satisfy the protocol structurally, so `GpuMemoryStore` (which has no use case for in-place GPU byte-range writes) disclaims it and is correctly reported as unsupported by `isinstance`.

It is entirely the caller's responsibility to ensure consistency: concurrent writes to overlapping ranges are order-dependent, `set_range` racing against `set`/`delete` is a race, and writes are not guaranteed to be atomic with respect to a process crash. A write that does not fit within the existing value raises `ValueError` consistently across implementations.
57 changes: 57 additions & 0 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"Store",
"SupportsDeleteSync",
"SupportsGetSync",
"SupportsSetRange",
"SupportsSetSync",
"SupportsSyncStore",
"set_or_delete",
Expand Down Expand Up @@ -787,6 +788,62 @@ async def delete(self) -> None: ...
async def set_if_not_exists(self, default: Buffer) -> None: ...


# Design note: byte-range writes are exposed as an opt-in protocol rather than a
# method on the `Store` ABC. Only a few stores can do them natively (`LocalStore`,
# `MemoryStore`); most (cloud, zip, read-only) cannot. A universal `Store.set_range`
# with a read-modify-write fallback (as in the Rust `zarrs` crate's
# `WritableStorageTraits::set_partial` + `supports_set_partial`) would let every
# store participate, but for our motivating use case — writing one subchunk without
# rewriting the whole shard — that fallback is a footgun: it would silently rewrite
# an entire (possibly multi-GB) shard, defeating the purpose while appearing to
# succeed. An opt-in protocol instead keeps the cost model honest (a store either
# supports cheap ranged writes or doesn't advertise the capability at all) and keeps
# `set_range` out of the signatures of stores that will never support it.
#
# Stores satisfy this protocol *structurally* (by defining the methods), not by
# nominal inheritance, so a subclass can disclaim it by setting the methods to `None`
# (see `GpuMemoryStore`). Any read-modify-write fallback strategy belongs in the
# caller (the sharding codec), which already has to decide between in-place and
# buffer-and-rewrite — mirroring the zarrs layering (storage writes bytes, codec owns
# strategy) without making every store carry the method. If broad-backend partial
# encoding is wanted later, adding a `supports_set_range()` capability flag plus a
# codec-level fallback is an additive change that does not require retrofitting stores.
@runtime_checkable
class SupportsSetRange(Protocol):
"""Protocol for stores that support writing to a byte range within an existing value.

Overwrites `len(value)` bytes starting at byte offset `start` within the
existing stored value for `key`. The key must already exist and the write
must fit within the existing value (i.e., `start + len(value) <= len(existing)`);
a write that does not fit raises `ValueError`.

Concurrency and atomicity
-------------------------
**It is entirely the caller's responsibility to ensure consistency.** Any
coordination needed to keep stored values consistent must be arranged by the
caller. In particular:

- Concurrent `set_range` calls that write to **disjoint** byte ranges of the
same key are safe.
- Concurrent `set_range` calls that write to **overlapping** ranges of the same
key have order-dependent, unspecified results. The caller must serialize them.
- A `set_range` racing against a `set` or `delete` on the same key is a race
condition, just as concurrent `set` calls are. The caller must serialize these.
- Writes are **not** guaranteed to be atomic with respect to a process crash:
a crash mid-write may leave the value partially updated. The caller is
responsible for any durability or recovery guarantees it requires.

What an implementation does to honor (or fall short of) this contract — locking,
atomic replacement, and so on — is documented on the implementing store, not here.
The intended consumer (the sharding codec writing inner chunks of deterministic
compressed size) coordinates writes so that they target disjoint ranges.
"""

async def set_range(self, key: str, value: Buffer, start: int) -> None: ...

def set_range_sync(self, key: str, value: Buffer, start: int) -> None: ...


@runtime_checkable
class SupportsGetSync(Protocol):
def get_sync(
Expand Down
9 changes: 8 additions & 1 deletion src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,14 @@ def from_url(
from fsspec.core import url_to_fs

opts = storage_options or {}
opts = {"asynchronous": True, **opts}
# ``skip_instance_cache=True`` forces a fresh filesystem instance instead of
# an fsspec instance-cached one. Without it, two ``from_url`` calls with the
# same URL/options receive the *same* cached ``AsyncFileSystem``; closing one
# store (which we mark as owning the fs) would tear down the shared aiohttp
# session out from under the other store — and any other fsspec consumer in
# the process. By skipping the cache we own an instance no one else shares, so
# ``close()`` is safe.
opts = {"asynchronous": True, "skip_instance_cache": True, **opts}

fs, path = url_to_fs(url, **opts)
if not fs.async_impl:
Expand Down
109 changes: 109 additions & 0 deletions src/zarr/storage/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import os
import shutil
import sys
import threading
import time
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Self
Expand All @@ -20,6 +22,7 @@
from zarr.core.buffer import Buffer
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import AccessModeLiteral, concurrent_map
from zarr.storage._utils import _check_set_range_bounds

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable, Iterator
Expand Down Expand Up @@ -58,6 +61,18 @@ def _safe_move(src: Path, dst: Path) -> None:
os.unlink(src)


_LOCK_POLL_INTERVAL = 0.01 # seconds between lock-file existence checks
_LOCK_STALE_TIMEOUT = 60.0 # seconds before an abandoned lock file is reclaimed


def _is_stale_lock(lock_path: Path) -> bool:
"""Return True if lock_path either doesn't exist or is older than _LOCK_STALE_TIMEOUT."""
try:
return time.time() - lock_path.stat().st_mtime > _LOCK_STALE_TIMEOUT
except FileNotFoundError:
return True


@contextlib.contextmanager
def _atomic_write(
path: Path,
Expand All @@ -77,6 +92,20 @@ def _atomic_write(
raise


def _put_range(path: Path, value: Buffer, start: int) -> None:
"""Write bytes at a specific offset within an existing file."""
view = value.as_buffer_like()
with path.open("r+b") as f:
# Validate bounds before writing: a bare seek+write would silently extend the
# file (zero-filling any gap), but the SupportsSetRange contract requires the
# write to fit within the existing value, so we fail consistently with
# MemoryStore instead.
existing_length = f.seek(0, os.SEEK_END)
_check_set_range_bounds(existing_length, start, len(value))
f.seek(start)
f.write(view)


def _put(path: Path, value: Buffer, exclusive: bool = False) -> int:
path.parent.mkdir(parents=True, exist_ok=True)
# write takes any object supporting the buffer protocol
Expand Down Expand Up @@ -109,6 +138,8 @@ class LocalStore(Store):
supports_listing: bool = True

root: Path
_key_locks: dict[str, asyncio.Lock]
_key_locks_sync: dict[str, threading.Lock]

def __init__(self, root: Path | str, *, read_only: bool = False) -> None:
super().__init__(read_only=read_only)
Expand All @@ -119,6 +150,8 @@ def __init__(self, root: Path | str, *, read_only: bool = False) -> None:
f"'root' must be a string or Path instance. Got an instance of {type(root)} instead."
)
self.root = root
self._key_locks = {}
self._key_locks_sync = {}

def with_read_only(self, read_only: bool = False) -> Self:
# docstring inherited
Expand Down Expand Up @@ -292,6 +325,82 @@ async def _set(self, key: str, value: Buffer, exclusive: bool = False) -> None:
path = self.root / key
await asyncio.to_thread(_put, path, value, exclusive=exclusive)

async def set_range(self, key: str, value: Buffer, start: int) -> None:
if not self._is_open:
await self._open()
self._check_writable()
path = self.root / key
lock_path = path.with_name(path.name + ".__lock__")
in_process_lock = self._key_locks.setdefault(key, asyncio.Lock())

# Acquire the file lock (steps 1-5 from the concurrency plan).
while True:
# Step 1: spin-wait until no lock file is present (or it is stale).
while await asyncio.to_thread(lock_path.exists):
if await asyncio.to_thread(_is_stale_lock, lock_path):
break
await asyncio.sleep(_LOCK_POLL_INTERVAL)

# Steps 2-5: serialise the rename under an in-process lock so that
# only one coroutine per process attempts the atomic file move at a time.
acquired = False
async with in_process_lock:
# Step 3: re-check after acquiring the in-process lock.
if not await asyncio.to_thread(lock_path.exists):
try:
# Step 4: atomic rename — raises FileExistsError if another
# process grabbed the lock between steps 3 and 4.
await asyncio.to_thread(_safe_move, path, lock_path)
acquired = True
except FileExistsError:
pass
# Step 5: in-process lock released on context exit.

if acquired:
break

# Step 6: perform the partial write on the lock file.
try:
await asyncio.to_thread(_put_range, lock_path, value, start)
finally:
# Steps 7-9: re-acquire in-process lock, rename lock file back, release.
async with in_process_lock:
await asyncio.to_thread(lock_path.replace, path)

def set_range_sync(self, key: str, value: Buffer, start: int) -> None:
self._ensure_open_sync()
self._check_writable()
path = self.root / key
lock_path = path.with_name(path.name + ".__lock__")
in_process_lock = self._key_locks_sync.setdefault(key, threading.Lock())

# Acquire the file lock (same double-checked pattern as the async path).
while True:
# Step 1: spin-wait.
while lock_path.exists():
if _is_stale_lock(lock_path):
break
time.sleep(_LOCK_POLL_INTERVAL)

acquired = False
with in_process_lock:
if not lock_path.exists():
try:
_safe_move(path, lock_path)
acquired = True
except FileExistsError:
pass

if acquired:
break

# Partial write, then release.
try:
_put_range(lock_path, value, start)
finally:
with in_process_lock:
lock_path.replace(path)

async def delete(self, key: str) -> None:
"""
Remove a key from the store.
Expand Down
Loading
Loading