class ArtifactStore:
"""Host-owned binary artifact storage with atomic temp-file cleanup."""
def __init__(
self,
root: str | Path,
*,
host_url: str,
max_artifact_bytes: int | None = DEFAULT_ARTIFACT_MAX_BYTES,
quota_bytes: int | None = DEFAULT_ARTIFACT_STORAGE_QUOTA_BYTES,
spool_ttl_seconds: float = DEFAULT_ARTIFACT_SPOOL_TTL_SECONDS,
):
self.root = Path(root).expanduser().resolve(strict=False)
self.host_url = host_url.rstrip("/")
self.max_artifact_bytes = 0 if max_artifact_bytes is None else max(0, int(max_artifact_bytes))
self.quota_bytes = None if quota_bytes is None else max(0, int(quota_bytes))
self.spool_ttl_seconds = max(1.0, float(spool_ttl_seconds))
self._blobs = self.root / "blobs"
self._meta = self.root / "metadata"
self._tmp = self.root / "tmp"
self._ensure_dirs()
def set_host_url(self, host_url: str) -> None:
self.host_url = host_url.rstrip("/")
def cleanup_temporary(self, *, now: float | None = None) -> None:
current = time.time() if now is None else now
self._ensure_dirs()
for path in self._tmp.glob("*.part"):
with contextlib.suppress(OSError):
if current - path.stat().st_mtime >= self.spool_ttl_seconds:
path.unlink()
for ref in self.list():
if ref.expires_at > 0 and ref.expires_at <= current:
self.delete(ref.artifact_id)
def create_from_path(
self,
source: str | Path,
*,
owner_agent_id: str = "",
name: str | None = None,
compression: str = "",
expires_at: float = 0.0,
expected_sha256: str | None = None,
) -> ArtifactWriteResult:
path = Path(source)
if not path.is_file():
raise TransferError(f"artifact source is not a file: {path}")
size = path.stat().st_size
with path.open("rb") as handle:
return self.create_from_stream(
handle,
size_bytes=size,
owner_agent_id=owner_agent_id,
name=name or path.name,
compression=compression,
expires_at=expires_at,
expected_sha256=expected_sha256,
)
def create_from_http_request(
self,
headers: Any,
source: Any,
*,
owner_agent_id: str = "",
name: str = "",
compression: str = "",
expires_at: float = 0.0,
expected_sha256: str | None = None,
) -> ArtifactWriteResult:
transfer_encoding = str(headers.get("Transfer-Encoding") or "").casefold()
length = int(headers.get("Content-Length") or 0)
stream = (
ChunkedRequestReader(source) if "chunked" in transfer_encoding else LimitedRequestReader(source, length)
)
size = -1 if "chunked" in transfer_encoding else length
return self.create_from_stream(
stream,
size_bytes=size,
owner_agent_id=owner_agent_id,
name=name,
compression=compression,
expires_at=expires_at,
expected_sha256=expected_sha256,
)
def create_from_stream(
self,
source: Any,
*,
size_bytes: int = -1,
owner_agent_id: str = "",
name: str = "",
compression: str = "",
expires_at: float = 0.0,
expected_sha256: str | None = None,
) -> ArtifactWriteResult:
self._ensure_dirs()
artifact_id = uuid.uuid4().hex
tmp_path = self._tmp / f"{artifact_id}.part"
blob_path = self._blob_path(artifact_id)
meta_path = self._metadata_path(artifact_id)
digest = hashlib.sha256()
written = 0
try:
self._check_declared_size(size_bytes)
with tmp_path.open("wb") as target:
while True:
chunk = source.read(STREAM_CHUNK_BYTES)
if not chunk:
break
data = bytes(chunk)
written += len(data)
self._check_declared_size(written)
digest.update(data)
target.write(data)
sha256 = digest.hexdigest()
if expected_sha256 and sha256.casefold() != expected_sha256.casefold():
raise TransferError(f"artifact checksum mismatch: expected {expected_sha256}, got {sha256}")
self._reserve_quota(written)
blob_path.parent.mkdir(parents=True, exist_ok=True)
os.replace(tmp_path, blob_path)
created_at = time.time()
ref = ArtifactRef(
host_url=self.host_url,
artifact_id=artifact_id,
name=name,
size_bytes=written,
sha256=sha256,
compression=compression,
created_at=created_at,
expires_at=float(expires_at or 0.0),
owner_agent_id=owner_agent_id,
)
meta_path.write_text(_metadata_json(ref), encoding="utf-8")
return ArtifactWriteResult(ref=ref, path=blob_path)
except Exception:
with contextlib.suppress(FileNotFoundError):
tmp_path.unlink()
with contextlib.suppress(FileNotFoundError):
blob_path.unlink()
with contextlib.suppress(FileNotFoundError):
meta_path.unlink()
raise
def ref(self, artifact_id: str) -> ArtifactRef:
path = self._metadata_path(artifact_id)
if not path.exists():
raise TransferError(f"No artifact {artifact_id!r}")
import json
payload = json.loads(path.read_text(encoding="utf-8"))
ref = ArtifactRef.from_wire(payload)
if ref.host_url != self.host_url:
ref = ArtifactRef(
host_url=self.host_url,
artifact_id=ref.artifact_id,
name=ref.name,
size_bytes=ref.size_bytes,
sha256=ref.sha256,
compression=ref.compression,
created_at=ref.created_at,
expires_at=ref.expires_at,
owner_agent_id=ref.owner_agent_id,
)
return ref
def list(self, *, owner_agent_id: str | None = None) -> list[ArtifactRef]:
self._ensure_dirs()
refs: list[ArtifactRef] = []
for path in sorted(self._meta.glob("*.json")):
try:
ref = self.ref(path.stem)
except Exception:
continue
if owner_agent_id is not None and ref.owner_agent_id != owner_agent_id:
continue
refs.append(ref)
return refs
def open_reader(self, artifact_id: str):
self.ref(artifact_id)
return self._blob_path(artifact_id).open("rb")
def blob_path(self, artifact_id: str) -> Path:
self.ref(artifact_id)
return self._blob_path(artifact_id)
def export_to_path(
self,
artifact_id: str,
target: str | Path,
*,
expected_sha256: str | None = None,
) -> ArtifactRef:
ref = self.ref(artifact_id)
target_path = Path(target)
target_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = target_path.with_name(f".{target_path.name}.{uuid.uuid4().hex}.part")
digest = hashlib.sha256()
written = 0
try:
with self.open_reader(artifact_id) as source, tmp_path.open("wb") as output:
while True:
chunk = source.read(STREAM_CHUNK_BYTES)
if not chunk:
break
data = bytes(chunk)
digest.update(data)
written += len(data)
output.write(data)
sha256 = digest.hexdigest()
if written != ref.size_bytes:
raise TransferError(f"artifact size mismatch: expected {ref.size_bytes}, got {written}")
expected = expected_sha256 or ref.sha256
if expected and sha256.casefold() != expected.casefold():
raise TransferError(f"artifact checksum mismatch: expected {expected}, got {sha256}")
os.replace(tmp_path, target_path)
return ref
except Exception:
with contextlib.suppress(FileNotFoundError):
tmp_path.unlink()
raise
def delete(self, artifact_id: str) -> None:
with contextlib.suppress(FileNotFoundError):
self._blob_path(artifact_id).unlink()
with contextlib.suppress(FileNotFoundError):
self._metadata_path(artifact_id).unlink()
def delete_owner(self, owner_agent_id: str) -> None:
for ref in self.list(owner_agent_id=owner_agent_id):
self.delete(ref.artifact_id)
def _ensure_dirs(self) -> None:
self._blobs.mkdir(parents=True, exist_ok=True)
self._meta.mkdir(parents=True, exist_ok=True)
self._tmp.mkdir(parents=True, exist_ok=True)
def _check_declared_size(self, size_bytes: int) -> None:
if self.max_artifact_bytes and size_bytes > self.max_artifact_bytes:
raise TransferError(
f"artifact exceeds maximum size: {size_bytes} bytes would exceed {self.max_artifact_bytes} bytes"
)
def _reserve_quota(self, incoming_size: int) -> None:
if self.quota_bytes is None:
return
used = 0
for path in self._blobs.glob("*.bin"):
with contextlib.suppress(OSError):
used += path.stat().st_size
projected = used + max(0, incoming_size)
if projected > self.quota_bytes:
raise TransferError(f"artifact storage quota exceeded: {projected} bytes would exceed {self.quota_bytes}")
def _blob_path(self, artifact_id: str) -> Path:
return self._blobs / f"{_safe_artifact_id(artifact_id)}.bin"
def _metadata_path(self, artifact_id: str) -> Path:
return self._meta / f"{_safe_artifact_id(artifact_id)}.json"