Skip to content

Services Package

paglets.services defines service contracts, service registry records, service handles, and resident service metadata.

Responsibilities

  • Describe service operations with request/reply payload dataclasses.
  • Validate service requests and replies against a contract.
  • Encode service records for discovery and lookup.
  • Manage resident service leases and lifecycle metadata.

Main Modules

paglets.services.contracts
Defines ServiceContract, ServiceOperation, ServiceHandle, ServiceRecord, ServiceRegistry, and service-specific errors.
paglets.services.resident
Defines ResidentServiceSpec, ServiceLease, resident lifecycle defaults, and metadata keys used by host-managed services.

Implementation Notes

Service operations are message-backed. A service handle builds typed messages from a contract and sends them to the paglet that owns the service.

ServiceScope controls whether a service is local to one host or advertised through the mesh. Resident services can be started lazily or eagerly from the launch configuration.

API Reference

paglets.services.contracts

EmptyPayload dataclass

Dataclass payload for operations with no request or reply body.

Source code in src/paglets/services/contracts.py
@dataclass(frozen=True, slots=True)
class EmptyPayload:
    """Dataclass payload for operations with no request or reply body."""

ServiceContract dataclass

Typed service interface advertised through the service registry.

Source code in src/paglets/services/contracts.py
@dataclass(frozen=True, slots=True)
class ServiceContract:
    """Typed service interface advertised through the service registry."""

    name: str
    operations: tuple[ServiceOperation[Any, Any], ...]
    version: str = "1"

    def __post_init__(self) -> None:
        name = self.name.strip()
        version = self.version.strip()
        if not name:
            raise ServiceContractError("Service contract name cannot be empty")
        if not version:
            raise ServiceContractError("Service contract version cannot be empty")
        if not self.operations:
            raise ServiceContractError("Service contract must define at least one operation")

        operations = tuple(self.operations)
        names: set[str] = set()
        for operation in operations:
            if not isinstance(operation, ServiceOperation):
                raise ServiceContractError("Service contract operations must be ServiceOperation instances")
            if operation.name in names:
                raise ServiceContractError(f"Duplicate service operation {operation.name!r}")
            names.add(operation.name)

        object.__setattr__(self, "name", name)
        object.__setattr__(self, "version", version)
        object.__setattr__(self, "operations", operations)

    @property
    def capabilities(self) -> tuple[str, ...]:
        return tuple(operation.name for operation in self.operations)

    def operation_for(self, name: str) -> ServiceOperation[Any, Any] | None:
        for operation in self.operations:
            if operation.name == name:
                return operation
        return None

    def require_operation(self, operation: ServiceOperation[Any, Any]) -> ServiceOperation[Any, Any]:
        known = self.operation_for(operation.name)
        if known is None or known != operation:
            raise ServiceContractError(f"{operation.name!r} is not part of service contract {self.name!r}")
        return known

    def metadata(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "version": self.version,
            "operations": [
                {
                    "name": operation.name,
                    "request_type": operation.request_schema,
                    "reply_type": operation.reply_schema,
                }
                for operation in self.operations
            ],
        }

    def advertise_metadata(self, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
        merged = dict(metadata or {})
        if CONTRACT_METADATA_KEY in merged:
            raise ServiceContractError(f"Metadata key {CONTRACT_METADATA_KEY!r} is reserved")
        merged[CONTRACT_METADATA_KEY] = self.metadata()
        return merged

    def matches_record(self, record: ServiceRecord) -> bool:
        return (
            record.name == self.name
            and tuple(record.capabilities) == self.capabilities
            and record.metadata.get(CONTRACT_METADATA_KEY) == self.metadata()
        )

    def route(
        self,
        message: Message,
        handlers: Mapping[ServiceOperation[Any, Any], Callable[[Any], Any]],
        *,
        default: Any = _ROUTE_DEFAULT_UNSET,
    ) -> Any:
        operation = self.operation_for(message.kind)
        if operation is None:
            if default is _ROUTE_DEFAULT_UNSET:
                raise ServiceContractError(f"{message.kind!r} is not part of service contract {self.name!r}")
            return default
        handler = handlers.get(operation)
        if handler is None:
            raise ServiceContractError(f"No handler registered for operation {operation.name!r}")
        request = operation.decode_request(message)
        reply = handler(request)
        return operation.encode_reply(reply)

ServiceHandle dataclass

Resolved typed service client for one advertised service record.

Source code in src/paglets/services/contracts.py
@dataclass(frozen=True, slots=True)
class ServiceHandle:
    """Resolved typed service client for one advertised service record."""

    contract: ServiceContract
    record: ServiceRecord
    context_or_client: Any = None

    def __post_init__(self) -> None:
        if not self.contract.matches_record(self.record):
            raise ServiceContractError(
                f"Service record {self.record.name!r} does not match contract {self.contract.name!r}"
            )

    def call(
        self,
        operation: ServiceOperation[ReqT, RepT],
        request: ReqT | None = None,
        *,
        activate_if_inactive: bool = True,
        no_delay: bool = False,
        timeout: float | None = None,
    ) -> RepT:
        operation = self.contract.require_operation(operation)  # type: ignore[assignment]
        message = operation.to_message(request)
        payload = self.record.proxy.resolve(self.context_or_client).send(
            message,
            activate_if_inactive=activate_if_inactive,
            no_delay=no_delay,
            timeout=timeout,
        )
        return operation.decode_reply(payload)

    def send_oneway(
        self,
        operation: ServiceOperation[ReqT, Any],
        request: ReqT | None = None,
        *,
        activate_if_inactive: bool = True,
        no_delay: bool = False,
        timeout: float | None = None,
    ) -> None:
        operation = self.contract.require_operation(operation)  # type: ignore[assignment]
        message = operation.to_message(request)
        self.record.proxy.resolve(self.context_or_client).send_oneway(
            message,
            activate_if_inactive=activate_if_inactive,
            no_delay=no_delay,
            timeout=timeout,
        )

ServiceOperation dataclass

Bases: Generic[ReqT, RepT]

Typed operation exposed by a service contract.

Source code in src/paglets/services/contracts.py
@dataclass(frozen=True, slots=True)
class ServiceOperation(Generic[ReqT, RepT]):
    """Typed operation exposed by a service contract."""

    name: str
    request_type: type[ReqT] = EmptyPayload  # type: ignore[assignment]
    reply_type: type[RepT] = EmptyPayload  # type: ignore[assignment]

    def __post_init__(self) -> None:
        name = self.name.strip()
        if not name:
            raise ServiceContractError("Service operation name cannot be empty")
        object.__setattr__(self, "name", name)
        _schema_name(self.request_type, "request_type")
        _schema_name(self.reply_type, "reply_type")

    @property
    def request_schema(self) -> str:
        return _schema_name(self.request_type, "request_type")

    @property
    def reply_schema(self) -> str:
        return _schema_name(self.reply_type, "reply_type")

    def to_message(self, request: ReqT | None = None) -> Message:
        return Message(self.name, self.encode_request(request))

    def encode_request(self, request: ReqT | None = None) -> dict[str, Any]:
        payload = _default_payload(self.request_type) if request is None else request
        _require_instance(payload, self.request_type, f"{self.name} request")
        return dataclass_to_wire(payload)

    def decode_request(self, message: Message | dict[str, Any]) -> ReqT:
        payload = message.args if isinstance(message, Message) else message
        return _decode_payload(self.request_type, payload, f"{self.name} request")

    def encode_reply(self, reply: RepT | None = None) -> dict[str, Any]:
        payload = _default_payload(self.reply_type) if reply is None else reply
        _require_instance(payload, self.reply_type, f"{self.name} reply")
        return dataclass_to_wire(payload)

    def decode_reply(self, payload: Any) -> RepT:
        if payload is None and self.reply_type is EmptyPayload:
            payload = {}
        if isinstance(payload, self.reply_type):
            return payload
        return _decode_payload(self.reply_type, payload, f"{self.name} reply")

paglets.services.resident

ResidentServiceSpec dataclass

Class-level declaration for a managed resident service.

Source code in src/paglets/services/resident.py
@dataclass(frozen=True, slots=True)
class ResidentServiceSpec:
    """Class-level declaration for a managed resident service."""

    contract: ServiceContract
    scope: ServiceScope = ServiceScope.LOCAL
    lifecycle: ResidentLifecycle = ResidentLifecycle.LAZY
    agent_id: str | None = None
    singleton: bool = True
    idle_timeout: float = DEFAULT_RESIDENT_IDLE_TIMEOUT_SECONDS
    state: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        require_enum(self.lifecycle, ResidentLifecycle, "lifecycle")
        require_enum(self.scope, ServiceScope, "scope")
        if self.idle_timeout < 0:
            raise ValueError("ResidentServiceSpec idle_timeout must be non-negative")

ServiceLease dataclass

TTL-backed lease that keeps a managed resident service active.

Source code in src/paglets/services/resident.py
@dataclass(slots=True)
class ServiceLease:
    """TTL-backed lease that keeps a managed resident service active."""

    handle: ServiceHandle
    lease_id: str
    host_url: str
    expires_at: float
    client: HostClient = field(default_factory=HostClient)
    _released: bool = False

    def __enter__(self) -> ServiceHandle:
        return self.handle

    def __exit__(self, exc_type, exc, tb) -> None:
        self.release()

    @property
    def expired(self) -> bool:
        return self.expires_at <= time.time()

    def release(self) -> None:
        if self._released:
            return
        self._released = True
        self.client.post_json(
            f"{self.host_url.rstrip('/')}/services/leases/{self.lease_id}/release",
            {},
        )
  • Core covers messages and service scope enums.
  • Configuration covers resident services in launch config.
  • Remote covers mesh-visible service lookup.