Skip to content

Artifacts Package

paglets.artifacts contains the value types and host-owned store used for binary artifact transport and registered paglet file mobility.

Responsibilities

  • Represent hosted artifact blobs with ArtifactRef.
  • Represent paglet-owned registered files with PagletFileRef.
  • Stream artifact data through temporary .part files and checksum validation.
  • Remove failed temporary receives immediately and clean stale temporary files during host sweeps.
  • Keep low-level artifact storage separate from per-paglet scratch/work directories.

API Reference

paglets.artifacts

ArtifactStore

Host-owned binary artifact storage with atomic temp-file cleanup.

Source code in src/paglets/artifacts.py
class ArtifactStore:
    """Host-owned binary artifact storage with atomic temp-file cleanup."""

    def __init__(
        self,
        root: str | Path,
        *,
        host_url: str,
        max_artifact_bytes: int | None = DEFAULT_ARTIFACT_MAX_BYTES,
        quota_bytes: int | None = DEFAULT_ARTIFACT_STORAGE_QUOTA_BYTES,
        spool_ttl_seconds: float = DEFAULT_ARTIFACT_SPOOL_TTL_SECONDS,
    ):
        self.root = Path(root).expanduser().resolve(strict=False)
        self.host_url = host_url.rstrip("/")
        self.max_artifact_bytes = 0 if max_artifact_bytes is None else max(0, int(max_artifact_bytes))
        self.quota_bytes = None if quota_bytes is None else max(0, int(quota_bytes))
        self.spool_ttl_seconds = max(1.0, float(spool_ttl_seconds))
        self._blobs = self.root / "blobs"
        self._meta = self.root / "metadata"
        self._tmp = self.root / "tmp"
        self._ensure_dirs()

    def set_host_url(self, host_url: str) -> None:
        self.host_url = host_url.rstrip("/")

    def cleanup_temporary(self, *, now: float | None = None) -> None:
        current = time.time() if now is None else now
        self._ensure_dirs()
        for path in self._tmp.glob("*.part"):
            with contextlib.suppress(OSError):
                if current - path.stat().st_mtime >= self.spool_ttl_seconds:
                    path.unlink()
        for ref in self.list():
            if ref.expires_at > 0 and ref.expires_at <= current:
                self.delete(ref.artifact_id)

    def create_from_path(
        self,
        source: str | Path,
        *,
        owner_agent_id: str = "",
        name: str | None = None,
        compression: str = "",
        expires_at: float = 0.0,
        expected_sha256: str | None = None,
    ) -> ArtifactWriteResult:
        path = Path(source)
        if not path.is_file():
            raise TransferError(f"artifact source is not a file: {path}")
        size = path.stat().st_size
        with path.open("rb") as handle:
            return self.create_from_stream(
                handle,
                size_bytes=size,
                owner_agent_id=owner_agent_id,
                name=name or path.name,
                compression=compression,
                expires_at=expires_at,
                expected_sha256=expected_sha256,
            )

    def create_from_http_request(
        self,
        headers: Any,
        source: Any,
        *,
        owner_agent_id: str = "",
        name: str = "",
        compression: str = "",
        expires_at: float = 0.0,
        expected_sha256: str | None = None,
    ) -> ArtifactWriteResult:
        transfer_encoding = str(headers.get("Transfer-Encoding") or "").casefold()
        length = int(headers.get("Content-Length") or 0)
        stream = (
            ChunkedRequestReader(source) if "chunked" in transfer_encoding else LimitedRequestReader(source, length)
        )
        size = -1 if "chunked" in transfer_encoding else length
        return self.create_from_stream(
            stream,
            size_bytes=size,
            owner_agent_id=owner_agent_id,
            name=name,
            compression=compression,
            expires_at=expires_at,
            expected_sha256=expected_sha256,
        )

    def create_from_stream(
        self,
        source: Any,
        *,
        size_bytes: int = -1,
        owner_agent_id: str = "",
        name: str = "",
        compression: str = "",
        expires_at: float = 0.0,
        expected_sha256: str | None = None,
    ) -> ArtifactWriteResult:
        self._ensure_dirs()
        artifact_id = uuid.uuid4().hex
        tmp_path = self._tmp / f"{artifact_id}.part"
        blob_path = self._blob_path(artifact_id)
        meta_path = self._metadata_path(artifact_id)
        digest = hashlib.sha256()
        written = 0
        try:
            self._check_declared_size(size_bytes)
            with tmp_path.open("wb") as target:
                while True:
                    chunk = source.read(STREAM_CHUNK_BYTES)
                    if not chunk:
                        break
                    data = bytes(chunk)
                    written += len(data)
                    self._check_declared_size(written)
                    digest.update(data)
                    target.write(data)
            sha256 = digest.hexdigest()
            if expected_sha256 and sha256.casefold() != expected_sha256.casefold():
                raise TransferError(f"artifact checksum mismatch: expected {expected_sha256}, got {sha256}")
            self._reserve_quota(written)
            blob_path.parent.mkdir(parents=True, exist_ok=True)
            os.replace(tmp_path, blob_path)
            created_at = time.time()
            ref = ArtifactRef(
                host_url=self.host_url,
                artifact_id=artifact_id,
                name=name,
                size_bytes=written,
                sha256=sha256,
                compression=compression,
                created_at=created_at,
                expires_at=float(expires_at or 0.0),
                owner_agent_id=owner_agent_id,
            )
            meta_path.write_text(_metadata_json(ref), encoding="utf-8")
            return ArtifactWriteResult(ref=ref, path=blob_path)
        except Exception:
            with contextlib.suppress(FileNotFoundError):
                tmp_path.unlink()
            with contextlib.suppress(FileNotFoundError):
                blob_path.unlink()
            with contextlib.suppress(FileNotFoundError):
                meta_path.unlink()
            raise

    def ref(self, artifact_id: str) -> ArtifactRef:
        path = self._metadata_path(artifact_id)
        if not path.exists():
            raise TransferError(f"No artifact {artifact_id!r}")
        import json

        payload = json.loads(path.read_text(encoding="utf-8"))
        ref = ArtifactRef.from_wire(payload)
        if ref.host_url != self.host_url:
            ref = ArtifactRef(
                host_url=self.host_url,
                artifact_id=ref.artifact_id,
                name=ref.name,
                size_bytes=ref.size_bytes,
                sha256=ref.sha256,
                compression=ref.compression,
                created_at=ref.created_at,
                expires_at=ref.expires_at,
                owner_agent_id=ref.owner_agent_id,
            )
        return ref

    def list(self, *, owner_agent_id: str | None = None) -> list[ArtifactRef]:
        self._ensure_dirs()
        refs: list[ArtifactRef] = []
        for path in sorted(self._meta.glob("*.json")):
            try:
                ref = self.ref(path.stem)
            except Exception:
                continue
            if owner_agent_id is not None and ref.owner_agent_id != owner_agent_id:
                continue
            refs.append(ref)
        return refs

    def open_reader(self, artifact_id: str):
        self.ref(artifact_id)
        return self._blob_path(artifact_id).open("rb")

    def blob_path(self, artifact_id: str) -> Path:
        self.ref(artifact_id)
        return self._blob_path(artifact_id)

    def export_to_path(
        self,
        artifact_id: str,
        target: str | Path,
        *,
        expected_sha256: str | None = None,
    ) -> ArtifactRef:
        ref = self.ref(artifact_id)
        target_path = Path(target)
        target_path.parent.mkdir(parents=True, exist_ok=True)
        tmp_path = target_path.with_name(f".{target_path.name}.{uuid.uuid4().hex}.part")
        digest = hashlib.sha256()
        written = 0
        try:
            with self.open_reader(artifact_id) as source, tmp_path.open("wb") as output:
                while True:
                    chunk = source.read(STREAM_CHUNK_BYTES)
                    if not chunk:
                        break
                    data = bytes(chunk)
                    digest.update(data)
                    written += len(data)
                    output.write(data)
            sha256 = digest.hexdigest()
            if written != ref.size_bytes:
                raise TransferError(f"artifact size mismatch: expected {ref.size_bytes}, got {written}")
            expected = expected_sha256 or ref.sha256
            if expected and sha256.casefold() != expected.casefold():
                raise TransferError(f"artifact checksum mismatch: expected {expected}, got {sha256}")
            os.replace(tmp_path, target_path)
            return ref
        except Exception:
            with contextlib.suppress(FileNotFoundError):
                tmp_path.unlink()
            raise

    def delete(self, artifact_id: str) -> None:
        with contextlib.suppress(FileNotFoundError):
            self._blob_path(artifact_id).unlink()
        with contextlib.suppress(FileNotFoundError):
            self._metadata_path(artifact_id).unlink()

    def delete_owner(self, owner_agent_id: str) -> None:
        for ref in self.list(owner_agent_id=owner_agent_id):
            self.delete(ref.artifact_id)

    def _ensure_dirs(self) -> None:
        self._blobs.mkdir(parents=True, exist_ok=True)
        self._meta.mkdir(parents=True, exist_ok=True)
        self._tmp.mkdir(parents=True, exist_ok=True)

    def _check_declared_size(self, size_bytes: int) -> None:
        if self.max_artifact_bytes and size_bytes > self.max_artifact_bytes:
            raise TransferError(
                f"artifact exceeds maximum size: {size_bytes} bytes would exceed {self.max_artifact_bytes} bytes"
            )

    def _reserve_quota(self, incoming_size: int) -> None:
        if self.quota_bytes is None:
            return
        used = 0
        for path in self._blobs.glob("*.bin"):
            with contextlib.suppress(OSError):
                used += path.stat().st_size
        projected = used + max(0, incoming_size)
        if projected > self.quota_bytes:
            raise TransferError(f"artifact storage quota exceeded: {projected} bytes would exceed {self.quota_bytes}")

    def _blob_path(self, artifact_id: str) -> Path:
        return self._blobs / f"{_safe_artifact_id(artifact_id)}.bin"

    def _metadata_path(self, artifact_id: str) -> Path:
        return self._meta / f"{_safe_artifact_id(artifact_id)}.json"
  • Artifact Transport covers user-facing file mobility and low-level artifact workflows.
  • Remote covers client and proxy transfer helpers.