Skip to content

Persistence Package

paglets.persistence contains durable inactive paglet records and per-paglet managed storage.

Responsibilities

  • Represent deactivation policy and deactivation requests.
  • Store inactive paglet envelopes, queued messages, and restore metadata.
  • Provide managed storage paths scoped to a host and paglet.
  • Enforce persistent storage quotas and expose storage status.

Main Modules

paglets.persistence.persistency
Defines DeactivationPolicy, DeactivationRequest, queued inactive messages, and inactive record dataclasses used by the host.
paglets.persistence.storage
Defines ManagedStorage, StorageStatus, quota errors, and the default storage quota constant.

Implementation Notes

Inactive records are host-owned. A deactivated paglet is not running, but the host can activate it before delivery unless the caller requests a fast failure.

Managed storage is separate from mobile dataclass state. It is useful for local artifacts that should stay on a host, while mobile workflow state belongs in the paglet state dataclass.

Storage sizes use binary-scaled units in user-facing output: KB, MB, and GB scale by 1024.

API Reference

paglets.persistence.persistency

DeactivationPolicy dataclass

Policy chosen by a paglet for its inactive lifecycle.

Source code in src/paglets/persistence/persistency.py
@dataclass(slots=True)
class DeactivationPolicy:
    """Policy chosen by a paglet for its inactive lifecycle."""

    activate_on_message: bool = True
    queue_messages_when_inactive: bool = True
    activate_on_startup: bool = False
    activate_at: float | None = None

    @classmethod
    def after(cls, seconds: float, **kwargs: Any) -> DeactivationPolicy:
        return cls(activate_at=time.time() + seconds, **kwargs)

    def to_wire(self) -> dict[str, Any]:
        return {
            "activate_on_message": self.activate_on_message,
            "queue_messages_when_inactive": self.queue_messages_when_inactive,
            "activate_on_startup": self.activate_on_startup,
            "activate_at": self.activate_at,
        }

    @classmethod
    def from_wire(cls, payload: dict[str, Any] | None) -> DeactivationPolicy:
        payload = payload or {}
        activate_at = payload.get("activate_at")
        return cls(
            activate_on_message=bool(payload.get("activate_on_message", True)),
            queue_messages_when_inactive=bool(payload.get("queue_messages_when_inactive", True)),
            activate_on_startup=bool(payload.get("activate_on_startup", False)),
            activate_at=float(activate_at) if activate_at is not None else None,
        )

DeactivationRequest dataclass

Context for a deactivation request before the paglet chooses policy.

Source code in src/paglets/persistence/persistency.py
@dataclass(slots=True)
class DeactivationRequest:
    """Context for a deactivation request before the paglet chooses policy."""

    reason: str = "deactivate"
    source: str = "external"
    policy: DeactivationPolicy | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def to_wire(self) -> dict[str, Any]:
        return {
            "reason": self.reason,
            "source": self.source,
            "policy": self.policy.to_wire() if self.policy is not None else None,
            "metadata": self.metadata,
        }

    @classmethod
    def from_wire(cls, payload: dict[str, Any] | None) -> DeactivationRequest:
        payload = payload or {}
        policy_payload = payload.get("policy")
        return cls(
            reason=str(payload.get("reason") or "deactivate"),
            source=str(payload.get("source") or "external"),
            policy=DeactivationPolicy.from_wire(policy_payload) if policy_payload is not None else None,
            metadata=dict(payload.get("metadata") or {}),
        )

InactiveRecord dataclass

Durable representation of a deactivated paglet.

Source code in src/paglets/persistence/persistency.py
@dataclass(slots=True)
class InactiveRecord:
    """Durable representation of a deactivated paglet."""

    envelope: PagletEnvelope
    policy: DeactivationPolicy
    request: DeactivationRequest
    deactivated_at: float = field(default_factory=time.time)
    queued_messages: list[QueuedMessage] = field(default_factory=list)

    @property
    def agent_id(self) -> str:
        return self.envelope.agent_id

    def to_wire(self) -> dict[str, Any]:
        return {
            "schema_version": 1,
            "envelope": self.envelope.to_wire(),
            "policy": self.policy.to_wire(),
            "request": self.request.to_wire(),
            "deactivated_at": self.deactivated_at,
            "queued_messages": [message.to_wire() for message in self.queued_messages],
        }

    @classmethod
    def from_wire(cls, payload: dict[str, Any]) -> InactiveRecord:
        return cls(
            envelope=PagletEnvelope.from_wire(payload["envelope"]),
            policy=DeactivationPolicy.from_wire(payload.get("policy")),
            request=DeactivationRequest.from_wire(payload.get("request")),
            deactivated_at=float(payload.get("deactivated_at", time.time())),
            queued_messages=[QueuedMessage.from_wire(item) for item in payload.get("queued_messages", [])],
        )

QueuedMessage dataclass

Message persisted while a paglet is inactive.

Source code in src/paglets/persistence/persistency.py
@dataclass(slots=True)
class QueuedMessage:
    """Message persisted while a paglet is inactive."""

    message: Message
    oneway: bool = False
    queued_at: float = field(default_factory=time.time)

    def to_wire(self) -> dict[str, Any]:
        return {
            "message": self.message.to_wire(),
            "oneway": self.oneway,
            "queued_at": self.queued_at,
        }

    @classmethod
    def from_wire(cls, payload: dict[str, Any]) -> QueuedMessage:
        return cls(
            message=Message.from_wire(payload["message"]),
            oneway=bool(payload.get("oneway", False)),
            queued_at=float(payload.get("queued_at", time.time())),
        )

paglets.persistence.storage

ManagedStorage

Path-safe, quota-accounted storage rooted at one directory.

Source code in src/paglets/persistence/storage.py
class ManagedStorage:
    """Path-safe, quota-accounted storage rooted at one directory."""

    def __init__(self, root: Path | str, *, quota_bytes: int | None = DEFAULT_PERSISTENT_STORAGE_QUOTA_BYTES):
        self.root = Path(root).expanduser().resolve(strict=False)
        self.quota_bytes = None if quota_bytes is None else max(0, int(quota_bytes))

    def read_bytes(self, path: Path | str) -> bytes:
        return self._resolve(path).read_bytes()

    def write_bytes(self, path: Path | str, data: bytes) -> Path:
        payload = bytes(data)
        target = self._resolve(path)
        existing_size = target.stat().st_size if target.exists() and target.is_file() else 0
        projected = self._used_bytes() - existing_size + len(payload)
        if self.quota_bytes is not None and projected > self.quota_bytes:
            raise StorageQuotaError(
                f"managed storage quota exceeded: {projected} bytes would exceed {self.quota_bytes} bytes"
            )
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_bytes(payload)
        return target

    def write_text(self, path: Path | str, text: str, *, encoding: str = "utf-8") -> Path:
        return self.write_bytes(path, text.encode(encoding))

    def delete(self, path: Path | str) -> None:
        target = self._resolve(path)
        if target.is_dir() and not target.is_symlink():
            shutil.rmtree(target)
            return
        try:
            target.unlink()
        except FileNotFoundError:
            return

    def clear(self) -> None:
        if self.root.exists():
            shutil.rmtree(self.root)
        self.root.mkdir(parents=True, exist_ok=True)

    def status(self) -> StorageStatus:
        used = self._used_bytes()
        return StorageStatus(
            root=str(self.root),
            used_bytes=used,
            quota_bytes=self.quota_bytes,
            available_bytes=None if self.quota_bytes is None else max(0, self.quota_bytes - used),
        )

    def _resolve(self, path: Path | str) -> Path:
        candidate = (self.root / Path(path)).resolve(strict=False)
        if candidate != self.root and self.root not in candidate.parents:
            raise ValueError(f"managed storage path escapes root: {path!r}")
        return candidate

    def _used_bytes(self) -> int:
        if not self.root.exists():
            return 0
        total = 0
        for path in self.root.rglob("*"):
            try:
                if path.is_file():
                    total += path.stat().st_size
            except OSError:
                continue
        return total

StorageQuotaError

Bases: PagletError

Raised when a managed storage write would exceed its quota.

Source code in src/paglets/persistence/storage.py
class StorageQuotaError(PagletError):
    """Raised when a managed storage write would exceed its quota."""
  • Runtime covers activation/deactivation orchestration.
  • Core covers paglet state and lifecycle hooks.