Skip to content

Core Package

paglets.core contains the user-facing programming model for paglet classes and the shared value types used across the runtime.

Responsibilities

  • Define Paglet, PagletState, PagletContext, state locking helpers, and lifecycle conveniences.
  • Define Message, FutureReply, ReplySet, and message priority constants.
  • Define lifecycle event dataclasses used by creation, mobility, cloning, and persistence hooks.
  • Define itinerary abstractions for repeatable movement workflows.
  • Centralize enums and exceptions that other packages share.

Main Modules

paglets.core.agent
Implements the base paglet class, context object, state locking, lifecycle hook defaults, messaging helpers, movement helpers, service lookup helpers, and storage access helpers.
paglets.core.messages
Defines the message object and reply coordination helpers used by proxies, mailboxes, and service calls.
paglets.core.events and paglets.core.context_events
Define lifecycle event payloads and the host-side event log/listener protocol.
paglets.core.itinerary
Provides reusable itinerary plans for paglets that should move through a sequence of hosts and execute named tasks at specific movement phases.
paglets.core.runtime_values
Provides shared enums such as ServiceScope, ResidentLifecycle, ArrivalMode, EnvelopeKind, and LaunchConfigSyncAction.
paglets.core.wire
Defines shared aliases for JSON/pickle-safe wire payloads used by messages, serialization, and runtime protocol boundaries.
paglets.core.errors
Defines the exception hierarchy used across runtime, remote, persistence, and service code.

Implementation Notes

Paglet state must be explicit dataclass state. Active processes do not preserve call stacks, threads, sockets, or arbitrary instance attributes across movement. The runtime snapshots state through the child-process protocol and reconstructs the paglet from importable class names on arrival or activation.

PagletContext is the paglet's capability boundary. It exposes host operations through a facade inside child processes and through the real host object in local host-side tests.

Message handling is actor-style per paglet process. Handlers are serialized by the runtime mailbox path, while background work inside a paglet must protect shared dataclass state with the paglet lock.

API Reference

paglets.core.agent

Paglet

Bases: Generic[StateT]

Base class for mobile Python objects.

Subclasses set State to a dataclass type and override lifecycle hooks. The runtime instantiates paglets on each host from class path + dataclass state, mirroring Aglets' mobile object plus event system without moving a call stack.

Source code in src/paglets/core/agent.py
class Paglet(Generic[StateT]):
    """Base class for mobile Python objects.

    Subclasses set ``State`` to a dataclass type and override lifecycle hooks.
    The runtime instantiates paglets on each host from class path + dataclass
    state, mirroring Aglets' mobile object plus event system without moving a
    call stack.
    """

    State: ClassVar[type[StateT]]
    ACTIVE: ClassVar[int] = ACTIVE
    INACTIVE: ClassVar[int] = INACTIVE
    MAILBOX_WORKERS: ClassVar[int] = 4

    def __init__(self, state: StateT | None = None, *, agent_id: str | None = None):
        state_cls = self.state_class()
        if state is None:
            state = state_cls()  # type: ignore[call-arg]
        if not is_dataclass(state):
            raise HostError(f"{self.__class__.__name__}.State must be a dataclass state object")
        self.agent_id = agent_id or uuid.uuid4().hex
        self.state: StateT = state
        self._state_lock = threading.RLock()
        self._state_condition = threading.Condition(self._state_lock)
        self._context: PagletContext | None = None
        self._last_proxy: PagletProxy | None = None
        self.resources = ResourceRegistry()

    @classmethod
    def state_class(cls) -> type[StateT]:
        state_cls = getattr(cls, "State", None)
        if state_cls is None:
            raise HostError(f"{cls.__name__} must define a dataclass State class")
        if not is_dataclass(state_cls):
            raise HostError(f"{cls.__name__}.State must be decorated with @dataclass")
        return state_cls

    @property
    def context(self) -> PagletContext:
        if self._context is None:
            raise HostError("Paglet is not attached to a host context")
        return self._context

    def _attach(self, context: PagletContext) -> None:
        self._context = context

    @contextmanager
    def locked(self) -> Iterator[None]:
        """Enter the paglet's reentrant lock for agent-local critical sections."""

        with self._state_lock:
            yield

    @contextmanager
    def locked_state(self) -> Iterator[StateT]:
        """Yield this paglet's dataclass state under the paglet lock."""

        with self._state_lock:
            yield self.state

    def wait_state(self, predicate: Callable[[StateT], bool], timeout: float | None = None) -> bool:
        """Wait until ``predicate(state)`` is true.

        This is for coordination between handlers/background work that mutate
        paglet state and another handler waiting for that state to change. It
        does not replace normal message delivery; incoming messages still call
        ``handle_message`` through the paglet mailbox.
        """

        deadline = None if timeout is None else time.monotonic() + max(0.0, timeout)

        with self._state_condition:
            if predicate(self.state):
                return True
            while True:
                if deadline is None:
                    remaining = None
                else:
                    remaining = deadline - time.monotonic()
                    if remaining <= 0:
                        return bool(predicate(self.state))
                self._state_condition.wait(remaining)
                if predicate(self.state):
                    return True

    def notify_state_changed(self) -> None:
        """Wake one waiter blocked in :meth:`wait_state`."""

        with self._state_condition:
            self._state_condition.notify(1)

    def notify_all_state_changed(self) -> None:
        """Wake all waiters blocked in :meth:`wait_state`."""

        with self._state_condition:
            self._state_condition.notify_all()

    # Convenience operations available from inside lifecycle/message handlers.
    def dispatch(self, target: str | TransferTicket) -> PagletProxy:
        proxy = self.context.dispatch(self.agent_id, target)
        self._last_proxy = proxy
        return proxy

    def clone(self, target: str | TransferTicket | None = None) -> PagletProxy:
        proxy = self.context.clone(self.agent_id, target)
        self._last_proxy = proxy
        return proxy

    def dispatch_to(self, name_or_url: str) -> PagletProxy:
        proxy = self.context.dispatch_to(self.agent_id, name_or_url)
        self._last_proxy = proxy
        return proxy

    def clone_to(self, name_or_url: str) -> PagletProxy:
        proxy = self.context.clone_to(self.agent_id, name_or_url)
        self._last_proxy = proxy
        return proxy

    def deactivate(
        self,
        *,
        reason: str = "deactivate",
        policy: DeactivationPolicy | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> PagletProxy:
        proxy = self.context.deactivate(
            self.agent_id,
            DeactivationRequest(
                reason=reason,
                source="self",
                policy=policy,
                metadata=metadata or {},
            ),
        )
        self._last_proxy = proxy
        return proxy

    def send(self, target_agent_id: str, message: Message, *, host_url: str | None = None) -> Any:
        return self.context.send(target_agent_id, message, host_url=host_url)

    def multicast(
        self, kind: str | Message, args: dict[str, Any] | None = None, *, include_self: bool = True
    ) -> ReplySet:
        exclude = None if include_self else {self.agent_id}
        return self.context.multicast(kind, args, exclude=exclude)

    def wait_message(self, timeout: float | None = None) -> bool:
        return self.context.host.wait_message(self.agent_id, timeout=timeout)

    def notify_message(self) -> None:
        self.context.host.notify_message(self.agent_id)

    def notify_all_messages(self) -> None:
        self.context.host.notify_all_messages(self.agent_id)

    def advertise_service(
        self,
        name: str,
        *,
        capabilities: list[str] | tuple[str, ...] | None = None,
        metadata: dict[str, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
        ttl: float | None = None,
    ) -> ServiceRecord:
        return self.context.advertise_service(
            name,
            capabilities=capabilities,
            metadata=metadata,
            scope=scope,
            ttl=ttl,
            agent_id=self.agent_id,
        )

    def unadvertise_service(self, name: str) -> list[ServiceRecord]:
        return self.context.unadvertise_service(name, agent_id=self.agent_id)

    def lookup_service(
        self,
        name: str,
        *,
        capability: str | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> PagletProxyRef | None:
        return self.context.lookup_service(name, capability=capability, scope=scope)

    def lookup_services(
        self,
        name: str | None = None,
        *,
        capability: str | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> list[ServiceRecord]:
        return self.context.lookup_services(name, capability=capability, scope=scope)

    def advertise_contract(
        self,
        contract: ServiceContract,
        *,
        scope: ServiceScope = ServiceScope.LOCAL,
        ttl: float | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> ServiceRecord:
        return self.context.advertise_contract(
            contract,
            scope=scope,
            ttl=ttl,
            metadata=metadata,
            agent_id=self.agent_id,
        )

    def lookup_contract(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> ServiceHandle | None:
        return self.context.lookup_contract(contract, operation=operation, scope=scope)

    def lookup_contracts(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> list[ServiceHandle]:
        return self.context.lookup_contracts(contract, operation=operation, scope=scope)

    def require_contract(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> ServiceHandle:
        return self.context.require_contract(contract, operation=operation, scope=scope)

    def lease_contract(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
        ttl: float = 60.0,
    ) -> ServiceLease:
        return self.context.lease_contract(contract, operation=operation, scope=scope, ttl=ttl)

    def work_dir(self, *, create: bool = True) -> Path:
        return self.context.work_dir(create=create, agent_id=self.agent_id)

    def persistent_storage(self, *, quota_bytes: int | None = None) -> ManagedStorage:
        return self.context.persistent_storage(quota_bytes=quota_bytes, agent_id=self.agent_id)

    def register_file(self, path: Path | str, *, name: str | None = None, mode: str = "copy") -> PagletFileRef:
        return self.context.register_file(path, name=name, mode=mode, agent_id=self.agent_id)

    def registered_files(self) -> list[PagletFileRef]:
        return self.context.registered_files(agent_id=self.agent_id)

    def unregister_file(self, name_or_ref: str | PagletFileRef) -> None:
        self.context.unregister_file(name_or_ref, agent_id=self.agent_id)

    def file_path(self, name_or_ref: str | PagletFileRef) -> Path:
        return self.context.file_path(name_or_ref, agent_id=self.agent_id)

    def upload_artifact(
        self,
        path: Path | str,
        *,
        host_url: str | None = None,
        owner_agent_id: str | None = None,
        name: str | None = None,
        compression: str = "",
        expires_at: float = 0.0,
    ) -> ArtifactRef:
        return self.context.upload_artifact(
            path,
            host_url=host_url,
            owner_agent_id=self.agent_id if owner_agent_id is None else owner_agent_id,
            name=name,
            compression=compression,
            expires_at=expires_at,
        )

    def download_artifact(self, artifact: ArtifactRef, target: Path | str, *, move: bool = False) -> ArtifactRef:
        return self.context.download_artifact(artifact, target, move=move)

    def delete_artifact(self, artifact: ArtifactRef) -> None:
        self.context.delete_artifact(artifact)

    @staticmethod
    def not_handled() -> _NotHandled:
        return NOT_HANDLED

    # Lifecycle/event hooks. Override these in subclasses.
    def on_creation(self, event: CreationEvent) -> None:
        pass

    def on_dispatching(self, event: MobilityEvent) -> None:
        pass

    def on_arrival(self, event: MobilityEvent) -> None:
        pass

    def on_reverting(self, event: MobilityEvent) -> None:
        pass

    def on_cloning(self, event: CloneEvent) -> None:
        pass

    def on_clone(self, event: CloneEvent) -> None:
        pass

    def on_cloned(self, event: CloneEvent) -> None:
        pass

    def on_deactivating(self, event: PersistencyEvent) -> None:
        pass

    def on_activation(self, event: PersistencyEvent) -> None:
        pass

    def on_disposing(self, event: PersistencyEvent) -> None:
        pass

    def deactivation_policy(self, request: DeactivationRequest) -> DeactivationPolicy:
        return request.policy or DeactivationPolicy()

    def run(self) -> None:
        pass

    def handle_message(self, message: Message) -> Any:
        raise NotHandledError(f"{self.__class__.__name__} did not handle {message.kind!r}")

locked() -> Iterator[None]

Enter the paglet's reentrant lock for agent-local critical sections.

Source code in src/paglets/core/agent.py
@contextmanager
def locked(self) -> Iterator[None]:
    """Enter the paglet's reentrant lock for agent-local critical sections."""

    with self._state_lock:
        yield

locked_state() -> Iterator[StateT]

Yield this paglet's dataclass state under the paglet lock.

Source code in src/paglets/core/agent.py
@contextmanager
def locked_state(self) -> Iterator[StateT]:
    """Yield this paglet's dataclass state under the paglet lock."""

    with self._state_lock:
        yield self.state

notify_all_state_changed() -> None

Wake all waiters blocked in :meth:wait_state.

Source code in src/paglets/core/agent.py
def notify_all_state_changed(self) -> None:
    """Wake all waiters blocked in :meth:`wait_state`."""

    with self._state_condition:
        self._state_condition.notify_all()

notify_state_changed() -> None

Wake one waiter blocked in :meth:wait_state.

Source code in src/paglets/core/agent.py
def notify_state_changed(self) -> None:
    """Wake one waiter blocked in :meth:`wait_state`."""

    with self._state_condition:
        self._state_condition.notify(1)

wait_state(predicate: Callable[[StateT], bool], timeout: float | None = None) -> bool

Wait until predicate(state) is true.

This is for coordination between handlers/background work that mutate paglet state and another handler waiting for that state to change. It does not replace normal message delivery; incoming messages still call handle_message through the paglet mailbox.

Source code in src/paglets/core/agent.py
def wait_state(self, predicate: Callable[[StateT], bool], timeout: float | None = None) -> bool:
    """Wait until ``predicate(state)`` is true.

    This is for coordination between handlers/background work that mutate
    paglet state and another handler waiting for that state to change. It
    does not replace normal message delivery; incoming messages still call
    ``handle_message`` through the paglet mailbox.
    """

    deadline = None if timeout is None else time.monotonic() + max(0.0, timeout)

    with self._state_condition:
        if predicate(self.state):
            return True
        while True:
            if deadline is None:
                remaining = None
            else:
                remaining = deadline - time.monotonic()
                if remaining <= 0:
                    return bool(predicate(self.state))
            self._state_condition.wait(remaining)
            if predicate(self.state):
                return True

PagletContext

Host-provided environment visible to a running paglet.

Source code in src/paglets/core/agent.py
class PagletContext:
    """Host-provided environment visible to a running paglet."""

    def __init__(self, host: Host, agent_id: str | None = None):
        self._host = host
        self._agent_id = agent_id

    @property
    def name(self) -> str:
        return self._host.name

    @property
    def address(self) -> str:
        return self._host.address

    @property
    def host(self) -> Host:
        return self._host

    @property
    def agent_id(self) -> str | None:
        return self._agent_id

    def get_proxy(self, agent_id: str, host_url: str | None = None) -> PagletProxy | None:
        if host_url is None or host_url.rstrip("/") == self.address.rstrip("/"):
            return self._host.get_proxy(agent_id, include_inactive=True)
        from paglets.remote.proxy import PagletProxy

        return PagletProxy(host_url=host_url, agent_id=agent_id, client=self._host.client)

    def get_proxies(self, state: int = ACTIVE) -> list[PagletProxy]:
        return self._host.get_proxies(state)

    def get_property(self, key: str, default: Any = None) -> Any:
        return self._host.get_property(key, default)

    def set_property(self, key: str, value: Any) -> None:
        self._host.set_property(key, value)

    def create_paglet(
        self,
        agent_cls: type[Paglet],
        state: PagletState | None = None,
        *,
        init: Any = None,
        host_url: str | None = None,
    ) -> PagletProxy:
        if host_url is not None and host_url.rstrip("/") != self.address.rstrip("/"):
            return self._host.create_remote(host_url, agent_cls, state, init=init)
        return self._host.create(agent_cls, state, init=init)

    def dispatch(self, agent_id: str, target: str | TransferTicket) -> PagletProxy:
        return self._host.dispatch(agent_id, target)

    def clone(self, agent_id: str, target: str | TransferTicket | None = None) -> PagletProxy:
        return self._host.clone(agent_id, target=target)

    def deactivate(
        self,
        agent_id: str,
        request: DeactivationRequest | None = None,
    ) -> PagletProxy:
        return self._host.deactivate(agent_id, request=request)

    def available_hosts(self, *, online_only: bool = True, include_self: bool = True) -> list[HostRef]:
        return self._host.mesh.hosts(online_only=online_only, include_self=include_self)

    def host_status(self, name_or_url: str) -> HostRef | None:
        return self._host.mesh.lookup(name_or_url)

    def is_host_online(self, name_or_url: str) -> bool:
        return self._host.mesh.is_online(name_or_url)

    def wait_for_host(self, name_or_url: str, *, timeout: float = 10.0, interval: float = 0.25) -> HostRef:
        return self._host.mesh.wait_for_host(name_or_url, timeout=timeout, interval=interval)

    def dispatch_to(self, agent_id: str, name_or_url: str) -> PagletProxy:
        return self.dispatch(agent_id, self._host.mesh.resolve_url(name_or_url))

    def clone_to(self, agent_id: str, name_or_url: str) -> PagletProxy:
        return self.clone(agent_id, self._host.mesh.resolve_url(name_or_url))

    def send(self, target_agent_id: str, message: Message, *, host_url: str | None = None) -> Any:
        proxy = self.get_proxy(target_agent_id, host_url)
        if proxy is None:
            raise HostError(f"No such local paglet: {target_agent_id}")
        if message.sender is None:
            message.sender = self.address
        return proxy.send(message)

    def multicast(
        self, kind: str | Message, args: dict[str, Any] | None = None, *, exclude: set[str] | None = None
    ) -> ReplySet:
        return self._host.multicast_message(kind, args, exclude=exclude)

    def advertise_service(
        self,
        name: str,
        *,
        capabilities: list[str] | tuple[str, ...] | None = None,
        metadata: dict[str, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
        ttl: float | None = None,
        agent_id: str | None = None,
    ) -> ServiceRecord:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("advertise_service requires an attached paglet or explicit agent_id")
        return self._host.advertise_service(
            owner_id,
            name,
            capabilities=capabilities,
            metadata=metadata,
            scope=scope,
            ttl=ttl,
        )

    def unadvertise_service(self, name: str, *, agent_id: str | None = None) -> list[ServiceRecord]:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("unadvertise_service requires an attached paglet or explicit agent_id")
        return self._host.unadvertise_service(name, agent_id=owner_id)

    def lookup_service(
        self,
        name: str,
        *,
        capability: str | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> PagletProxyRef | None:
        record = self._host.lookup_service(name, capability=capability, scope=scope)
        return record.proxy if record is not None else None

    def lookup_services(
        self,
        name: str | None = None,
        *,
        capability: str | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> list[ServiceRecord]:
        return self._host.lookup_services(name, capability=capability, scope=scope)

    def advertise_contract(
        self,
        contract: ServiceContract,
        *,
        scope: ServiceScope = ServiceScope.LOCAL,
        ttl: float | None = None,
        metadata: dict[str, Any] | None = None,
        agent_id: str | None = None,
    ) -> ServiceRecord:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("advertise_contract requires an attached paglet or explicit agent_id")
        return self.advertise_service(
            contract.name,
            capabilities=contract.capabilities,
            metadata=contract.advertise_metadata(metadata),
            scope=scope,
            ttl=ttl,
            agent_id=owner_id,
        )

    def lookup_contract(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> ServiceHandle | None:
        handles = self.lookup_contracts(contract, operation=operation, scope=scope)
        return handles[0] if handles else None

    def lookup_contracts(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> list[ServiceHandle]:
        from paglets.services.contracts import ServiceHandle

        if operation is not None:
            operation = contract.require_operation(operation)
        capability = operation.name if operation is not None else None
        return [
            ServiceHandle(contract, record, self)
            for record in self.lookup_services(contract.name, capability=capability, scope=scope)
            if contract.matches_record(record)
        ]

    def require_contract(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
    ) -> ServiceHandle:
        from paglets.core.errors import ServiceNotFoundError

        handle = self.lookup_contract(contract, operation=operation, scope=scope)
        if handle is None:
            operation_text = f" operation {operation.name!r}" if operation is not None else ""
            contract_text = f"contract {contract.name!r} version {contract.version!r}{operation_text}"
            raise ServiceNotFoundError(f"No service {contract_text} found in {scope} scope")
        return handle

    def lease_contract(
        self,
        contract: ServiceContract,
        *,
        operation: ServiceOperation[Any, Any] | None = None,
        scope: ServiceScope = ServiceScope.LOCAL,
        ttl: float = 60.0,
    ) -> ServiceLease:
        handle = self.require_contract(contract, operation=operation, scope=scope)
        lease = self._host.lease_service_handle(handle, ttl=ttl)
        if self._agent_id is not None:
            self._host.resources_for(self._agent_id).register(
                f"service-lease:{lease.lease_id}",
                lease.release,
                suppress=True,
            )
        return lease

    def resources(self, agent_id: str | None = None) -> ResourceRegistry:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("resources requires an attached paglet or explicit agent_id")
        return self._host.resources_for(owner_id)

    def work_dir(self, *, create: bool = True, agent_id: str | None = None) -> Path:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("work_dir requires an attached paglet or explicit agent_id")
        return self._host.work_dir_for(owner_id, create=create)

    def persistent_storage(
        self,
        *,
        quota_bytes: int | None = None,
        agent_id: str | None = None,
    ) -> ManagedStorage:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("persistent_storage requires an attached paglet or explicit agent_id")
        return self._host.persistent_storage_for(owner_id, quota_bytes=quota_bytes)

    def register_file(
        self,
        path: Path | str,
        *,
        name: str | None = None,
        mode: str = "copy",
        agent_id: str | None = None,
    ) -> PagletFileRef:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("register_file requires an attached paglet or explicit agent_id")
        return self._host.register_file_for(owner_id, path, name=name, mode=mode)

    def registered_files(self, *, agent_id: str | None = None) -> list[PagletFileRef]:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("registered_files requires an attached paglet or explicit agent_id")
        return self._host.registered_files_for(owner_id)

    def unregister_file(self, name_or_ref: str | PagletFileRef, *, agent_id: str | None = None) -> None:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("unregister_file requires an attached paglet or explicit agent_id")
        self._host.unregister_file_for(owner_id, name_or_ref)

    def file_path(self, name_or_ref: str | PagletFileRef, *, agent_id: str | None = None) -> Path:
        owner_id = agent_id or self._agent_id
        if owner_id is None:
            raise HostError("file_path requires an attached paglet or explicit agent_id")
        return self._host.registered_file_path_for(owner_id, name_or_ref)

    def upload_artifact(
        self,
        path: Path | str,
        *,
        host_url: str | None = None,
        owner_agent_id: str | None = None,
        name: str | None = None,
        compression: str = "",
        expires_at: float = 0.0,
    ) -> ArtifactRef:
        owner_id = owner_agent_id if owner_agent_id is not None else self._agent_id or ""
        target = (host_url or self.address).rstrip("/")
        if target == self.address.rstrip("/") and hasattr(self._host, "artifacts"):
            return self._host.artifacts.create_from_path(
                path,
                owner_agent_id=owner_id,
                name=name,
                compression=compression,
                expires_at=expires_at,
            ).ref
        return self._host.client.upload_artifact(
            target,
            path,
            owner_agent_id=owner_id,
            name=name,
            compression=compression,
            expires_at=expires_at,
        )

    def download_artifact(self, artifact: ArtifactRef, target: Path | str, *, move: bool = False) -> ArtifactRef:
        return self._host.client.download_artifact(artifact, target, move=move)

    def delete_artifact(self, artifact: ArtifactRef) -> None:
        self._host.client.delete_artifact(artifact)

PagletState

Marker base class for dataclass state objects.

Subclass this with @dataclass. Only this state object moves. Everything stored directly on the paglet instance is transient runtime state.

Source code in src/paglets/core/agent.py
class PagletState:
    """Marker base class for dataclass state objects.

    Subclass this with ``@dataclass``. Only this state object moves. Everything
    stored directly on the paglet instance is transient runtime state.
    """

state_locked(method: Callable[Concatenate[PagletT, P], ReturnT]) -> Callable[Concatenate[PagletT, P], ReturnT]

Run a paglet method under the paglet's reentrant state lock.

Source code in src/paglets/core/agent.py
def state_locked(method: Callable[Concatenate[PagletT, P], ReturnT]) -> Callable[Concatenate[PagletT, P], ReturnT]:
    """Run a paglet method under the paglet's reentrant state lock."""

    @wraps(method)
    def wrapper(self: PagletT, *args: P.args, **kwargs: P.kwargs) -> ReturnT:
        with self.locked():
            return method(self, *args, **kwargs)

    return wrapper

paglets.core.messages

FutureReply

Result handle for an asynchronous paglet message.

This is the Python analogue of Aglets' FutureReply. It wraps a concurrent.futures.Future and exposes Aglets-style names while still preserving the original exception behavior of Future.result().

Source code in src/paglets/core/messages.py
class FutureReply:
    """Result handle for an asynchronous paglet message.

    This is the Python analogue of Aglets' ``FutureReply``. It wraps a
    ``concurrent.futures.Future`` and exposes Aglets-style names while still
    preserving the original exception behavior of ``Future.result()``.
    """

    def __init__(self, future: Future[Any]):
        self._future = future
        self._reply_sets: list[ReplySet] = []
        self._future.add_done_callback(lambda _: self._notify_reply_sets())

    def added_to(self, reply_set: ReplySet) -> None:
        self._reply_sets.append(reply_set)
        if self.is_available():
            reply_set.done(self)

    def is_available(self) -> bool:
        return self._future.done()

    def wait_for_reply(self, timeout: float | None = None) -> bool:
        try:
            self._future.result(timeout=timeout)
            return True
        except TimeoutError:
            return False

    def get_reply(self, timeout: float | None = None) -> Any:
        return self._future.result(timeout=timeout)

    def get_boolean_reply(self, timeout: float | None = None) -> bool:
        return bool(self.get_reply(timeout))

    def get_int_reply(self, timeout: float | None = None) -> int:
        return int(self.get_reply(timeout))

    def get_float_reply(self, timeout: float | None = None) -> float:
        return float(self.get_reply(timeout))

    def get_string_reply(self, timeout: float | None = None) -> str:
        return str(self.get_reply(timeout))

    def _notify_reply_sets(self) -> None:
        for reply_set in list(self._reply_sets):
            reply_set.done(self)

Message dataclass

A message delivered to a paglet.

Mirrors the useful part of Aglets' Message: a kind, named arguments, optional single argument, priority, sender metadata, and a timestamp. Replies are represented by the return value of Paglet.handle_message.

Source code in src/paglets/core/messages.py
@dataclass(slots=True)
class Message:
    """A message delivered to a paglet.

    Mirrors the useful part of Aglets' ``Message``: a kind, named arguments,
    optional single argument, priority, sender metadata, and a timestamp.
    Replies are represented by the return value of ``Paglet.handle_message``.
    """

    kind: str
    args: WirePayload = field(default_factory=dict)
    arg: Any = None
    sender: str | None = None
    reply_to: str | None = None
    priority: int = NORMAL_PRIORITY
    message_type: int = SYNCHRONOUS
    timestamp: float = field(default_factory=time.time)
    message_id: str = field(default_factory=lambda: uuid.uuid4().hex)

    def get_arg(self, name: str | None = None, default: Any = None) -> Any:
        if name is None:
            return self.arg
        return self.args.get(name, default)

    def set_arg(self, name: str, value: Any) -> Message:
        self.args[name] = value
        return self

    def same_kind(self, kind: str | Message) -> bool:
        return self.kind == (kind.kind if isinstance(kind, Message) else kind)

    def increase_priority(self) -> None:
        if self.priority < MAX_PRIORITY:
            self.priority += 1

    def decrease_priority(self) -> None:
        if self.priority > MIN_PRIORITY:
            self.priority -= 1

    def to_wire(self) -> WirePayload:
        return {
            "kind": self.kind,
            "args": self.args,
            "arg": self.arg,
            "sender": self.sender,
            "reply_to": self.reply_to,
            "priority": self.priority,
            "message_type": self.message_type,
            "timestamp": self.timestamp,
            "message_id": self.message_id,
        }

    @classmethod
    def from_wire(cls, payload: WirePayload) -> Message:
        return cls(
            kind=payload["kind"],
            args=dict(payload.get("args") or {}),
            arg=payload.get("arg"),
            sender=payload.get("sender"),
            reply_to=payload.get("reply_to"),
            priority=int(payload.get("priority", NORMAL_PRIORITY)),
            message_type=int(payload.get("message_type", SYNCHRONOUS)),
            timestamp=float(payload.get("timestamp", time.time())),
            message_id=str(payload.get("message_id") or uuid.uuid4().hex),
        )

ReplySet

Container that yields FutureReply objects as replies arrive.

Source code in src/paglets/core/messages.py
class ReplySet:
    """Container that yields ``FutureReply`` objects as replies arrive."""

    def __init__(self, replies: list[FutureReply] | None = None):
        self._done: list[FutureReply] = []
        self._unavailable: list[FutureReply] = []
        self._condition = threading.Condition()
        for reply in replies or []:
            self.add_future_reply(reply)

    def add_future_reply(self, reply: FutureReply) -> None:
        with self._condition:
            if reply.is_available():
                self._done.append(reply)
            else:
                self._unavailable.append(reply)
            reply.added_to(self)
            self._condition.notify_all()

    def done(self, reply: FutureReply) -> None:
        with self._condition:
            if reply in self._done:
                return
            if reply in self._unavailable:
                self._unavailable.remove(reply)
            self._done.append(reply)
            self._condition.notify_all()

    def are_all_available(self) -> bool:
        with self._condition:
            return not self._unavailable

    def count_available(self) -> int:
        with self._condition:
            return len(self._done)

    def count_unavailable(self) -> int:
        with self._condition:
            return len(self._unavailable)

    def is_any_available(self) -> bool:
        with self._condition:
            return bool(self._done)

    def has_more_future_replies(self) -> bool:
        with self._condition:
            return bool(self._done or self._unavailable)

    def wait_for_all_replies(self, timeout: float | None = None) -> bool:
        deadline = None if timeout is None else time.monotonic() + timeout
        with self._condition:
            while self._unavailable:
                remaining = None if deadline is None else deadline - time.monotonic()
                if remaining is not None and remaining <= 0:
                    return False
                self._condition.wait(remaining)
            return True

    def wait_for_next_future_reply(self, timeout: float | None = None) -> bool:
        deadline = None if timeout is None else time.monotonic() + timeout
        with self._condition:
            while not self._done:
                if not self._unavailable:
                    return False
                remaining = None if deadline is None else deadline - time.monotonic()
                if remaining is not None and remaining <= 0:
                    return False
                self._condition.wait(remaining)
            return True

    def get_next_future_reply(self, timeout: float | None = None) -> FutureReply | None:
        if not self.wait_for_next_future_reply(timeout):
            return None
        with self._condition:
            return self._done.pop(0)

    def __iter__(self) -> Iterator[FutureReply]:
        while self.has_more_future_replies():
            reply = self.get_next_future_reply()
            if reply is not None:
                yield reply

paglets.core.events

paglets.core.context_events

ContextEvent dataclass

Host-level event emitted by the paglet context.

Source code in src/paglets/core/context_events.py
@dataclass(frozen=True, slots=True)
class ContextEvent:
    """Host-level event emitted by the paglet context."""

    event_id: int
    kind: str
    host_name: str
    host_address: str
    timestamp: float = field(default_factory=time.time)
    agent_id: str | None = None
    class_name: str | None = None
    message_id: str | None = None
    service_name: str | None = None
    data: dict[str, Any] = field(default_factory=dict)
    error: str | None = None

    def to_wire(self) -> dict[str, Any]:
        return {
            "event_id": self.event_id,
            "kind": self.kind,
            "host_name": self.host_name,
            "host_address": self.host_address,
            "timestamp": self.timestamp,
            "agent_id": self.agent_id,
            "class_name": self.class_name,
            "message_id": self.message_id,
            "service_name": self.service_name,
            "data": self.data,
            "error": self.error,
        }

    @classmethod
    def from_wire(cls, payload: dict[str, Any]) -> ContextEvent:
        return cls(
            event_id=int(payload["event_id"]),
            kind=str(payload["kind"]),
            host_name=str(payload["host_name"]),
            host_address=str(payload["host_address"]),
            timestamp=float(payload.get("timestamp", time.time())),
            agent_id=str(payload["agent_id"]) if payload.get("agent_id") is not None else None,
            class_name=str(payload["class_name"]) if payload.get("class_name") is not None else None,
            message_id=str(payload["message_id"]) if payload.get("message_id") is not None else None,
            service_name=str(payload["service_name"]) if payload.get("service_name") is not None else None,
            data=dict(payload.get("data") or {}),
            error=str(payload["error"]) if payload.get("error") is not None else None,
        )

ContextEventLog

Bounded in-memory context event log with best-effort listeners.

Source code in src/paglets/core/context_events.py
class ContextEventLog:
    """Bounded in-memory context event log with best-effort listeners."""

    def __init__(self, *, capacity: int = 1000):
        self.capacity = max(1, int(capacity))
        self._events: deque[ContextEvent] = deque(maxlen=self.capacity)
        self._listeners: list[ContextListener] = []
        self._next_id = 1
        self._lock = threading.RLock()

    def add_listener(self, listener: ContextListener) -> None:
        with self._lock:
            if listener not in self._listeners:
                self._listeners.append(listener)

    def remove_listener(self, listener: ContextListener) -> None:
        with self._lock:
            if listener in self._listeners:
                self._listeners.remove(listener)

    def emit(
        self,
        *,
        kind: str,
        host_name: str,
        host_address: str,
        agent_id: str | None = None,
        class_name: str | None = None,
        message_id: str | None = None,
        service_name: str | None = None,
        data: dict[str, Any] | None = None,
        error: str | None = None,
    ) -> ContextEvent:
        event = self._append(
            kind=kind,
            host_name=host_name,
            host_address=host_address,
            agent_id=agent_id,
            class_name=class_name,
            message_id=message_id,
            service_name=service_name,
            data=data or {},
            error=error,
        )
        self._notify(event)
        return event

    def events_since(self, since: int = 0, *, limit: int = 100) -> list[ContextEvent]:
        limit = max(0, int(limit))
        with self._lock:
            events = [event for event in self._events if event.event_id > since]
        return events[:limit] if limit else []

    def _append(self, **kwargs: Any) -> ContextEvent:
        with self._lock:
            event = ContextEvent(event_id=self._next_id, **kwargs)
            self._next_id += 1
            self._events.append(event)
            return event

    def _notify(self, event: ContextEvent) -> None:
        with self._lock:
            listeners = list(self._listeners)
        for listener in listeners:
            try:
                listener(event)
            except Exception as exc:
                failure = self._append(
                    kind="event-listener-failed",
                    host_name=event.host_name,
                    host_address=event.host_address,
                    agent_id=event.agent_id,
                    class_name=event.class_name,
                    message_id=event.message_id,
                    service_name=event.service_name,
                    data={"event_id": event.event_id, "listener": repr(listener)},
                    error=str(exc),
                )
                with self._lock:
                    remaining = [item for item in self._listeners if item is not listener]
                for other in remaining:
                    try:
                        other(failure)
                    except Exception:
                        continue

paglets.core.itinerary

ItineraryAgentMixin

Mixin for paglets whose state stores an itinerary field.

Source code in src/paglets/core/itinerary.py
class ItineraryAgentMixin:
    """Mixin for paglets whose state stores an ``itinerary`` field."""

    itinerary_attr = "itinerary"

    def get_itinerary(self) -> ItineraryPlan:
        return getattr(self.state, self.itinerary_attr)

    def go_to_next_destination(self) -> Any | None:
        return self.get_itinerary().dispatch_next(self)

    def execute_itinerary_tasks(self, phase: str, event: Any = None) -> list[Any]:
        itinerary = self.get_itinerary()
        if not isinstance(itinerary, TaskItineraryPlan):
            return []
        tasks = itinerary.tasks_for_phase(itinerary.get_current_location(), phase)
        return [self.execute_itinerary_task(task, phase, event) for task in tasks]

    def execute_itinerary_task(self, task: ItineraryTask, phase: str, event: Any = None) -> Any:
        raise NotImplementedError(f"{self.__class__.__name__} must handle itinerary task {task.name!r}")

ItineraryPlan dataclass

Serializable itinerary state for a mobile paglet.

The Java helpers increment their internal index after calling dispatch. In paglets the serializable state is captured during dispatch, so this Python helper advances before dispatching to ensure the arrived copy sees the updated route position.

Source code in src/paglets/core/itinerary.py
@dataclass(slots=True)
class ItineraryPlan:
    """Serializable itinerary state for a mobile paglet.

    The Java helpers increment their internal index after calling dispatch.
    In paglets the serializable state is captured during dispatch, so this
    Python helper advances before dispatching to ensure the arrived copy sees
    the updated route position.
    """

    destinations: list[str] = field(default_factory=list)
    current_index: int = 0
    current_location: str | None = None
    visited_destinations: list[str] = field(default_factory=list)
    mutable: bool = True
    circular: bool = False
    loop_count: int = 0
    completed: bool = False

    def add_next_destination(
        self,
        destination: str,
        *,
        index: int | None = None,
        after: str | None = None,
        allow_duplicate: bool = True,
    ) -> bool:
        if not self.mutable or not destination:
            return False
        if not allow_duplicate and destination in self.destinations:
            return False
        if after is not None:
            try:
                index = self.destinations.index(after) + 1
            except ValueError:
                self.destinations.append(after)
                index = len(self.destinations)
        if index is None:
            self.destinations.append(destination)
            return True
        if index < 0 or index > len(self.destinations):
            return False
        self.destinations.insert(index, destination)
        return True

    def add_next_destination_if_not_duplicated(self, destination: str) -> bool:
        return self.add_next_destination(destination, allow_duplicate=False)

    def remove_destination(self, destination: str) -> bool:
        if not self.mutable or destination not in self.destinations:
            return False
        self.destinations.remove(destination)
        self.current_index = min(self.current_index, len(self.destinations))
        return True

    def remove_destination_at(self, index: int) -> bool:
        if not self.mutable or index < 0 or index >= len(self.destinations):
            return False
        self.destinations.pop(index)
        self.current_index = min(self.current_index, len(self.destinations))
        return True

    def set_immutable(self) -> None:
        self.mutable = False

    def skip_next(self) -> None:
        if self.current_index < len(self.destinations):
            self.current_index += 1
        elif self.circular and self.destinations:
            self.current_index = 0
            self.loop_count += 1

    def get_destination_count(self) -> int:
        return len(self.destinations)

    def get_remaining_destination_count(self) -> int:
        return max(0, len(self.destinations) - self.current_index)

    def get_destinations(self) -> list[str]:
        return list(self.destinations)

    def get_first_destination(self) -> str | None:
        return self.destinations[0] if self.destinations else None

    def get_last_destination(self) -> str | None:
        return self.destinations[-1] if self.destinations else None

    def get_current_location(self) -> str | None:
        return self.current_location

    def get_loop_count(self) -> int:
        return self.loop_count

    def next_destination(self) -> str | None:
        if self.current_index < len(self.destinations):
            return self.destinations[self.current_index]
        if self.circular and self.destinations:
            return self.destinations[0]
        return None

    def dispatch_next(self, agent: Any) -> Any | None:
        target = self.next_destination()
        if target is None:
            self.completed = True
            return None
        if self.current_index >= len(self.destinations):
            self.current_index = 0
            self.loop_count += 1
        self.current_index += 1
        self.current_location = target
        self.visited_destinations.append(target)
        return agent.dispatch(target)

ItineraryTask dataclass

Serializable task descriptor for a task itinerary.

Source code in src/paglets/core/itinerary.py
@dataclass(slots=True)
class ItineraryTask:
    """Serializable task descriptor for a task itinerary."""

    name: str
    execution: str = EXECUTE_ON_ARRIVAL
    args: dict[str, Any] = field(default_factory=dict)

paglets.core.runtime_values

paglets.core.wire

paglets.core.errors

AuthenticationError

Bases: PagletError

Raised when a host API request is missing valid credentials.

Source code in src/paglets/core/errors.py
class AuthenticationError(PagletError):
    """Raised when a host API request is missing valid credentials."""

ForbiddenError

Bases: PagletError

Raised when a valid request is not allowed by host policy.

Source code in src/paglets/core/errors.py
class ForbiddenError(PagletError):
    """Raised when a valid request is not allowed by host policy."""

HostError

Bases: PagletError

Raised for local host/runtime errors.

Source code in src/paglets/core/errors.py
class HostError(PagletError):
    """Raised for local host/runtime errors."""

InvalidAgentError

Bases: PagletError

Raised when an agent id no longer refers to an active/deactivated paglet.

Source code in src/paglets/core/errors.py
class InvalidAgentError(PagletError):
    """Raised when an agent id no longer refers to an active/deactivated paglet."""

LifecycleError

Bases: PagletError

Raised when a lifecycle operation fails.

Source code in src/paglets/core/errors.py
class LifecycleError(PagletError):
    """Raised when a lifecycle operation fails."""

NotHandledError

Bases: PagletError

Raised when a paglet did not handle a message.

Source code in src/paglets/core/errors.py
class NotHandledError(PagletError):
    """Raised when a paglet did not handle a message."""

PagletCrashedError

Bases: PagletError

Raised when an isolated paglet process exits unexpectedly.

Source code in src/paglets/core/errors.py
class PagletCrashedError(PagletError):
    """Raised when an isolated paglet process exits unexpectedly."""

PagletError

Bases: Exception

Base exception for paglets.

Source code in src/paglets/core/errors.py
class PagletError(Exception):
    """Base exception for paglets."""

PagletInactiveError

Bases: PagletError

Raised when an inactive paglet cannot be activated for an operation.

Source code in src/paglets/core/errors.py
class PagletInactiveError(PagletError):
    """Raised when an inactive paglet cannot be activated for an operation."""

RemoteHostError

Bases: PagletError

Raised when a remote host returns an error response.

Source code in src/paglets/core/errors.py
class RemoteHostError(PagletError):
    """Raised when a remote host returns an error response."""

SerializationError

Bases: PagletError

Raised when paglet state cannot be serialized or restored.

Source code in src/paglets/core/errors.py
class SerializationError(PagletError):
    """Raised when paglet state cannot be serialized or restored."""

ServiceContractError

Bases: HostError

Raised when a typed service contract is invalid or misused.

Source code in src/paglets/core/errors.py
class ServiceContractError(HostError):
    """Raised when a typed service contract is invalid or misused."""

ServiceNotFoundError

Bases: ServiceContractError

Raised when a required typed service contract cannot be found.

Source code in src/paglets/core/errors.py
class ServiceNotFoundError(ServiceContractError):
    """Raised when a required typed service contract cannot be found."""

TransferError

Bases: PagletError

Raised when a paglet transfer cannot complete.

Source code in src/paglets/core/errors.py
class TransferError(PagletError):
    """Raised when a paglet transfer cannot complete."""
  • Runtime covers host supervision and child processes.
  • Remote covers proxy calls and message transport.
  • Serialization covers dataclass wire conversion.