Persistence Package
paglets.persistence contains durable inactive paglet records and per-paglet
managed storage.
Responsibilities
- Represent deactivation policy and deactivation requests.
- Store inactive paglet envelopes, queued messages, and restore metadata.
- Provide managed storage paths scoped to a host and paglet.
- Enforce persistent storage quotas and expose storage status.
Main Modules
paglets.persistence.persistency
- Defines
DeactivationPolicy, DeactivationRequest, queued inactive
messages, and inactive record dataclasses used by the host.
paglets.persistence.storage
- Defines
ManagedStorage, StorageStatus, quota errors, and the default
storage quota constant.
Implementation Notes
Inactive records are host-owned. A deactivated paglet is not running, but the
host can activate it before delivery unless the caller requests a fast failure.
Managed storage is separate from mobile dataclass state. It is useful for local
artifacts that should stay on a host, while mobile workflow state belongs in the
paglet state dataclass.
Storage sizes use binary-scaled units in user-facing output: KB, MB, and
GB scale by 1024.
API Reference
paglets.persistence.persistency
DeactivationPolicy
dataclass
Policy chosen by a paglet for its inactive lifecycle.
Source code in src/paglets/persistence/persistency.py
| @dataclass(slots=True)
class DeactivationPolicy:
"""Policy chosen by a paglet for its inactive lifecycle."""
activate_on_message: bool = True
queue_messages_when_inactive: bool = True
activate_on_startup: bool = False
activate_at: float | None = None
@classmethod
def after(cls, seconds: float, **kwargs: Any) -> DeactivationPolicy:
return cls(activate_at=time.time() + seconds, **kwargs)
def to_wire(self) -> dict[str, Any]:
return {
"activate_on_message": self.activate_on_message,
"queue_messages_when_inactive": self.queue_messages_when_inactive,
"activate_on_startup": self.activate_on_startup,
"activate_at": self.activate_at,
}
@classmethod
def from_wire(cls, payload: dict[str, Any] | None) -> DeactivationPolicy:
payload = payload or {}
activate_at = payload.get("activate_at")
return cls(
activate_on_message=bool(payload.get("activate_on_message", True)),
queue_messages_when_inactive=bool(payload.get("queue_messages_when_inactive", True)),
activate_on_startup=bool(payload.get("activate_on_startup", False)),
activate_at=float(activate_at) if activate_at is not None else None,
)
|
DeactivationRequest
dataclass
Context for a deactivation request before the paglet chooses policy.
Source code in src/paglets/persistence/persistency.py
| @dataclass(slots=True)
class DeactivationRequest:
"""Context for a deactivation request before the paglet chooses policy."""
reason: str = "deactivate"
source: str = "external"
policy: DeactivationPolicy | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_wire(self) -> dict[str, Any]:
return {
"reason": self.reason,
"source": self.source,
"policy": self.policy.to_wire() if self.policy is not None else None,
"metadata": self.metadata,
}
@classmethod
def from_wire(cls, payload: dict[str, Any] | None) -> DeactivationRequest:
payload = payload or {}
policy_payload = payload.get("policy")
return cls(
reason=str(payload.get("reason") or "deactivate"),
source=str(payload.get("source") or "external"),
policy=DeactivationPolicy.from_wire(policy_payload) if policy_payload is not None else None,
metadata=dict(payload.get("metadata") or {}),
)
|
InactiveRecord
dataclass
Durable representation of a deactivated paglet.
Source code in src/paglets/persistence/persistency.py
| @dataclass(slots=True)
class InactiveRecord:
"""Durable representation of a deactivated paglet."""
envelope: PagletEnvelope
policy: DeactivationPolicy
request: DeactivationRequest
deactivated_at: float = field(default_factory=time.time)
queued_messages: list[QueuedMessage] = field(default_factory=list)
@property
def agent_id(self) -> str:
return self.envelope.agent_id
def to_wire(self) -> dict[str, Any]:
return {
"schema_version": 1,
"envelope": self.envelope.to_wire(),
"policy": self.policy.to_wire(),
"request": self.request.to_wire(),
"deactivated_at": self.deactivated_at,
"queued_messages": [message.to_wire() for message in self.queued_messages],
}
@classmethod
def from_wire(cls, payload: dict[str, Any]) -> InactiveRecord:
return cls(
envelope=PagletEnvelope.from_wire(payload["envelope"]),
policy=DeactivationPolicy.from_wire(payload.get("policy")),
request=DeactivationRequest.from_wire(payload.get("request")),
deactivated_at=float(payload.get("deactivated_at", time.time())),
queued_messages=[QueuedMessage.from_wire(item) for item in payload.get("queued_messages", [])],
)
|
QueuedMessage
dataclass
Message persisted while a paglet is inactive.
Source code in src/paglets/persistence/persistency.py
| @dataclass(slots=True)
class QueuedMessage:
"""Message persisted while a paglet is inactive."""
message: Message
oneway: bool = False
queued_at: float = field(default_factory=time.time)
def to_wire(self) -> dict[str, Any]:
return {
"message": self.message.to_wire(),
"oneway": self.oneway,
"queued_at": self.queued_at,
}
@classmethod
def from_wire(cls, payload: dict[str, Any]) -> QueuedMessage:
return cls(
message=Message.from_wire(payload["message"]),
oneway=bool(payload.get("oneway", False)),
queued_at=float(payload.get("queued_at", time.time())),
)
|
paglets.persistence.storage
ManagedStorage
Path-safe, quota-accounted storage rooted at one directory.
Source code in src/paglets/persistence/storage.py
| class ManagedStorage:
"""Path-safe, quota-accounted storage rooted at one directory."""
def __init__(self, root: Path | str, *, quota_bytes: int | None = DEFAULT_PERSISTENT_STORAGE_QUOTA_BYTES):
self.root = Path(root).expanduser().resolve(strict=False)
self.quota_bytes = None if quota_bytes is None else max(0, int(quota_bytes))
def read_bytes(self, path: Path | str) -> bytes:
return self._resolve(path).read_bytes()
def write_bytes(self, path: Path | str, data: bytes) -> Path:
payload = bytes(data)
target = self._resolve(path)
existing_size = target.stat().st_size if target.exists() and target.is_file() else 0
projected = self._used_bytes() - existing_size + len(payload)
if self.quota_bytes is not None and projected > self.quota_bytes:
raise StorageQuotaError(
f"managed storage quota exceeded: {projected} bytes would exceed {self.quota_bytes} bytes"
)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(payload)
return target
def write_text(self, path: Path | str, text: str, *, encoding: str = "utf-8") -> Path:
return self.write_bytes(path, text.encode(encoding))
def delete(self, path: Path | str) -> None:
target = self._resolve(path)
if target.is_dir() and not target.is_symlink():
shutil.rmtree(target)
return
try:
target.unlink()
except FileNotFoundError:
return
def clear(self) -> None:
if self.root.exists():
shutil.rmtree(self.root)
self.root.mkdir(parents=True, exist_ok=True)
def status(self) -> StorageStatus:
used = self._used_bytes()
return StorageStatus(
root=str(self.root),
used_bytes=used,
quota_bytes=self.quota_bytes,
available_bytes=None if self.quota_bytes is None else max(0, self.quota_bytes - used),
)
def _resolve(self, path: Path | str) -> Path:
candidate = (self.root / Path(path)).resolve(strict=False)
if candidate != self.root and self.root not in candidate.parents:
raise ValueError(f"managed storage path escapes root: {path!r}")
return candidate
def _used_bytes(self) -> int:
if not self.root.exists():
return 0
total = 0
for path in self.root.rglob("*"):
try:
if path.is_file():
total += path.stat().st_size
except OSError:
continue
return total
|
StorageQuotaError
Bases: PagletError
Raised when a managed storage write would exceed its quota.
Source code in src/paglets/persistence/storage.py
| class StorageQuotaError(PagletError):
"""Raised when a managed storage write would exceed its quota."""
|
Related Pages
- Runtime covers activation/deactivation orchestration.
- Core covers paglet state and lifecycle hooks.