Skip to content

Patterns Package

paglets.patterns contains small ergonomic layers on top of the raw paglet runtime. These helpers are additive: the core Message, lifecycle, dispatch, clone, and registered-file mobility APIs remain available for paglets that need custom protocols.

Responsibilities

  • Provide typed task status, request, result, and client helpers.
  • Provide typed operation routing for paglets that expose several public operations without writing a manual handle_message() switch.
  • Provide small coordinator helpers for clone fan-out, child cleanup, timeout expiry, and cursor-based drains.
  • Wrap mesh user-info notifications so notification failures stay non-fatal.
  • Provide reusable file mobility helpers and an optional one-file transfer task built on natural registered-file mobility.

API Reference

paglets.patterns.tasks

TaskPaglet

Bases: Paglet[StateT], Generic[RequestT, ResultT, StateT]

Base class for simple typed paglets with start/status/wait operations.

Source code in src/paglets/patterns/tasks.py
class TaskPaglet(Paglet[StateT], Generic[RequestT, ResultT, StateT]):
    """Base class for simple typed paglets with start/status/wait operations."""

    Request: type[Any]
    Result: type[Any]

    def handle_message(self, message: Message):
        if message.kind == TASK_START.name:
            request = TASK_START.decode_request(message)
            return self._start_task(request)
        if message.kind == TASK_STATUS.name:
            return dataclass_to_wire(self.task_status_reply())
        if message.kind == TASK_WAIT.name:
            request = TASK_WAIT.decode_request(message)
            self.wait_state(lambda state: state.done, timeout=max(0.0, float(request.wait_timeout)))
            return dataclass_to_wire(self.task_status_reply())
        custom = self.handle_task_message(message)
        if custom is not None:
            return custom
        return self.not_handled()

    def handle_task_message(self, message: Message) -> Any | None:
        _ = message
        return None

    def run_task(self, request: RequestT) -> ResultT | PagletProxy | PagletProxyRef | None:
        _ = request
        raise NotImplementedError

    def task_status_reply(self) -> TaskStatusReply:
        with self.locked_state() as state:
            return TaskStatusReply(
                status=state.status,
                done=state.done,
                request=dict(state.request),
                result=dict(state.result),
                error=state.error,
                started_at=state.started_at,
                completed_at=state.completed_at,
                agent_id=self.agent_id,
                host_name=self.context.name,
                host_url=self.context.address,
            )

    def complete_task(self, result: ResultT, *, status: TaskStatus = TaskStatus.COMPLETED) -> None:
        if not is_dataclass(result) or isinstance(result, type):
            raise HostError("Task results must be dataclass instances")
        with self.locked_state() as state:
            state.status = status
            state.done = True
            state.result = dataclass_to_wire(result)
            state.error = ""
            state.completed_at = time.time()
        self.notify_all_state_changed()

    def fail_task(self, error: str | BaseException) -> None:
        message = str(error)
        with self.locked_state() as state:
            state.status = TaskStatus.FAILED
            state.done = True
            state.error = message
            state.completed_at = time.time()
        self.notify_all_state_changed()

    def set_task_status(self, status: TaskStatus, *, done: bool | None = None) -> None:
        with self.locked_state() as state:
            state.status = status
            if done is not None:
                state.done = bool(done)
        self.notify_all_state_changed()

    def _start_task(self, request: TaskStartRequest) -> dict[str, Any]:
        typed_request = dataclass_from_wire(self.request_class(), request.request)
        with self.locked_state() as state:
            state.status = TaskStatus.RUNNING
            state.done = False
            state.request = dict(request.request)
            state.result = {}
            state.error = ""
            state.started_at = time.time()
            state.completed_at = 0.0
        self.notify_all_state_changed()
        try:
            outcome = self.run_task(typed_request)
        except Exception as exc:
            self.fail_task(exc)
            return dataclass_to_wire(self.task_status_reply())
        if isinstance(outcome, PagletProxy):
            return outcome.to_wire()
        if isinstance(outcome, PagletProxyRef):
            return outcome.to_wire()
        if outcome is not None:
            self.complete_task(outcome)
        return dataclass_to_wire(self.task_status_reply())

    @classmethod
    def request_class(cls) -> type[RequestT]:
        request_cls = getattr(cls, "Request", None)
        if request_cls is None:
            raise HostError(f"{cls.__name__} must define Request")
        return request_cls

    @classmethod
    def result_class(cls) -> type[ResultT]:
        result_cls = getattr(cls, "Result", None)
        if result_cls is None:
            raise HostError(f"{cls.__name__} must define Result")
        return result_cls

paglets.patterns.operations

OperationClient dataclass

Small proxy wrapper for typed operation calls.

Source code in src/paglets/patterns/operations.py
@dataclass(slots=True)
class OperationClient:
    """Small proxy wrapper for typed operation calls."""

    proxy: PagletProxy

    def call(
        self,
        operation: ServiceOperation[ReqT, RepT],
        request: ReqT | None = None,
        *,
        activate_if_inactive: bool = True,
        no_delay: bool = False,
        timeout: float | None = None,
    ) -> RepT:
        payload = self.proxy.send(
            operation.to_message(request),
            activate_if_inactive=activate_if_inactive,
            no_delay=no_delay,
            timeout=timeout,
        )
        return operation.decode_reply(payload)

    def send_oneway(
        self,
        operation: ServiceOperation[ReqT, Any],
        request: ReqT | None = None,
        *,
        activate_if_inactive: bool = True,
        no_delay: bool = False,
        timeout: float | None = None,
    ) -> None:
        self.proxy.send_oneway(
            operation.to_message(request),
            activate_if_inactive=activate_if_inactive,
            no_delay=no_delay,
            timeout=timeout,
        )

OperationPaglet

Bases: Paglet[StateT], Generic[StateT]

Paglet base class for typed multi-operation protocols.

Source code in src/paglets/patterns/operations.py
class OperationPaglet(Paglet[StateT], Generic[StateT]):
    """Paglet base class for typed multi-operation protocols."""

    Operations: ClassVar[tuple[ServiceOperation[Any, Any], ...]] = ()

    def handle_message(self, message: Message):
        operation = self.operation_for_message(message)
        if operation is not None:
            return self.dispatch_operation(operation, message)
        fallback = self.handle_operation_message(message)
        if fallback is not None:
            return fallback
        return self.not_handled()

    def operation_handlers(self) -> Mapping[ServiceOperation[Any, Any], Callable[[Any], Any]]:
        return {}

    def handle_operation_message(self, message: Message) -> Any | None:
        _ = message
        return None

    def operation_for_message(self, message: Message) -> ServiceOperation[Any, Any] | None:
        handlers = self.operation_handlers()
        for operation in (*self.Operations, *handlers.keys()):
            if operation.name == message.kind:
                return operation
        return None

    def dispatch_operation(self, operation: ServiceOperation[Any, Any], message: Message) -> dict[str, Any]:
        handler = self.operation_handlers().get(operation)
        if handler is None:
            raise ServiceContractError(f"No handler registered for operation {operation.name!r}")
        request = operation.decode_request(message)
        reply = handler(request)
        return operation.encode_reply(reply)

paglets.patterns.coordination

CursorDrainMixin

Helpers for cursor-based event drains.

Source code in src/paglets/patterns/coordination.py
class CursorDrainMixin:
    """Helpers for cursor-based event drains."""

    def cursor_append_events(self, events: list[dict[str, Any]]) -> int:
        with cast(Any, self).locked_state() as state:
            last_cursor = int(state.next_cursor) - 1
            for event in events:
                item = dict(event)
                item["cursor"] = int(state.next_cursor)
                last_cursor = int(state.next_cursor)
                state.next_cursor += 1
                state.events.append(item)
        cast(Any, self).notify_all_state_changed()
        return last_cursor

    def cursor_drain_events(self, *, after_cursor: int, limit: int) -> tuple[list[dict[str, Any]], int, bool]:
        limit = max(1, int(limit))
        with cast(Any, self).locked_state() as state:
            matching = [event for event in state.events if int(event.get("cursor", 0)) > after_cursor]
            events = matching[:limit]
            cursor = after_cursor
            if events:
                cursor = max(int(event.get("cursor", 0)) for event in events)
            more = len(matching) > len(events)
        return events, cursor, more

MeshFanoutMixin

Common parent/child clone fanout helpers.

Source code in src/paglets/patterns/coordination.py
class MeshFanoutMixin:
    """Common parent/child clone fanout helpers."""

    def fanout_reset(self, *, timeout: float) -> None:
        with cast(Any, self).locked_state() as state:
            state.role = "parent"
            state.parent_host_url = cast(Any, self).context.address
            state.parent_agent_id = cast(Any, self).agent_id
            state.target_host_name = ""
            state.target_host_url = ""
            state.pending_hosts = []
            state.done_hosts = []
            state.child_proxies = {}
            state.errors = {}
            state.cleanup_errors = {}
            state.deadline = time.monotonic() + max(0.0, float(timeout))
        cast(Any, self).notify_all_state_changed()

    def fanout_available_hosts(self, *, include_self: bool = True) -> list[HostRef]:
        return list(cast(Any, self).context.available_hosts(online_only=True, include_self=include_self))

    def fanout_select_hosts(self, targets: list[str] | tuple[str, ...], *, include_self: bool = True) -> list[HostRef]:
        if not targets:
            return self.fanout_available_hosts(include_self=include_self)
        selected: list[HostRef] = []
        for target in targets:
            ref = cast(Any, self).context.host_status(target)
            if ref is None or not ref.online:
                self.fanout_record_error(target, "target host is not online or not visible in the mesh")
                continue
            selected.append(ref)
        return selected

    def fanout_prepare_clone(self, host: HostRef) -> None:
        with cast(Any, self).locked_state() as state:
            state.pending_hosts.append(host.name)
            state.role = "child"
            state.target_host_name = host.name
            state.target_host_url = host.url

    def fanout_finish_clone_prepare(self) -> None:
        with cast(Any, self).locked_state() as state:
            state.role = "parent"
            state.target_host_name = ""
            state.target_host_url = ""

    def fanout_record_child_proxy(self, host_name: str, proxy: PagletProxy) -> None:
        with cast(Any, self).locked_state() as state:
            state.child_proxies[host_name] = proxy.to_wire()

    def fanout_record_error(self, host_name: str, error: str) -> None:
        with cast(Any, self).locked_state() as state:
            state.pending_hosts = [name for name in state.pending_hosts if name != host_name]
            state.errors[host_name] = error
        cast(Any, self).notify_all_state_changed()

    def fanout_record_done(self, host_name: str) -> None:
        if not host_name:
            return
        with cast(Any, self).locked_state() as state:
            state.pending_hosts = [name for name in state.pending_hosts if name != host_name]
            if host_name not in state.done_hosts:
                state.done_hosts.append(host_name)
        cast(Any, self).notify_all_state_changed()

    def fanout_expire_pending(self, error: str) -> None:
        with cast(Any, self).locked_state() as state:
            if not state.pending_hosts or state.deadline <= 0 or time.monotonic() < state.deadline:
                return
            pending = list(state.pending_hosts)
            for host_name in pending:
                state.errors[host_name] = error
            state.pending_hosts = []
        cast(Any, self).notify_all_state_changed()

    def fanout_wait_for(self, ready, *, wait_timeout: float) -> None:
        timeout = max(0.0, float(wait_timeout))
        with cast(Any, self).locked_state() as state:
            if state.deadline > 0:
                timeout = min(timeout, max(0.0, state.deadline - time.monotonic()))
        cast(Any, self).wait_state(ready, timeout=timeout)

    def fanout_cleanup_children(self) -> None:
        with cast(Any, self).locked_state() as state:
            children = {host_name: dict(proxy) for host_name, proxy in state.child_proxies.items()}
        for host_name, proxy_wire in children.items():
            try:
                PagletProxy.from_wire(proxy_wire, cast(Any, self).context.host.client).dispose()
            except Exception as exc:
                with cast(Any, self).locked_state() as state:
                    state.cleanup_errors[host_name] = str(exc)
        cast(Any, self).notify_all_state_changed()

paglets.patterns.notifications

NotificationMixin

Non-fatal user-info notifications for paglets.

Source code in src/paglets/patterns/notifications.py
class NotificationMixin:
    """Non-fatal user-info notifications for paglets."""

    def notify_user_info(
        self,
        severity: NotificationSeverity,
        title: str,
        message: str,
        *,
        job_id: str = "",
        timeout: float = 2.0,
        scope: ServiceScope = ServiceScope.MESH,
        metadata: dict[str, str] | None = None,
    ) -> bool:
        with contextlib.suppress(Exception):
            handle = self.require_contract(USER_INFO, operation=NOTIFY_USER, scope=scope)
            handle.call(
                NOTIFY_USER,
                UserInfoRequest(
                    severity=severity.value,
                    title=title,
                    message=message,
                    source_agent_id=getattr(self, "agent_id", ""),
                    job_id=job_id,
                    timestamp=time.time(),
                    metadata=dict(metadata or {}),
                ),
                timeout=max(0.0, float(timeout)),
            )
            return True
        return False

paglets.patterns.file_mobility

FileMobilityMixin

Bases: NotificationMixin

Reusable helpers for paglets that move one registered file naturally.

Source code in src/paglets/patterns/file_mobility.py
class FileMobilityMixin(NotificationMixin):
    """Reusable helpers for paglets that move one registered file naturally."""

    registered_file_name: ClassVar[str] = "file"
    notification_title: ClassVar[str] = "File transfer"

    def require_transfer_target(self, request: FileTransferRequest) -> str:
        if not request.target_host:
            raise ValueError("target_host is required")
        return request.target_host

    def stat_transfer_source(self, source_path: str | Path) -> FileTransferSource:
        source = Path(source_path).expanduser()
        if not source.is_file():
            raise ValueError(f"source is not a file: {source}")
        stat = source.stat()
        return FileTransferSource(
            path=source,
            file_name=source.name,
            size_bytes=int(stat.st_size),
            created_at=float(getattr(stat, "st_birthtime", stat.st_ctime)),
            modified_at=float(stat.st_mtime),
        )

    def prepare_file_transfer(self, request: FileTransferRequest) -> FileTransferPlan:
        target_host = self.require_transfer_target(request)
        source = self.stat_transfer_source(request.source_path)
        destination = self.remember_transfer_request(request, source)
        return FileTransferPlan(
            request=request,
            source=source,
            destination_path=destination,
            target_host=target_host,
        )

    def remember_transfer_request(self, request: FileTransferRequest, source: FileTransferSource) -> str:
        self.require_transfer_target(request)
        transfer_id = request.transfer_id or f"file-transfer-{uuid.uuid4().hex}"
        destination = self.plan_destination_path(request.destination_path, source.file_name)
        with cast(Any, self).locked_state() as state:
            state.transfer_id = transfer_id
            state.mode = request.mode
            state.dry_run = bool(request.dry_run)
            state.overwrite = bool(request.overwrite)
            state.source_host_name = cast(Any, self).context.name
            state.source_host_url = cast(Any, self).context.address
            state.destination_host_name = request.destination_label
            state.destination_host_url = request.target_host
            state.source_path = str(source.path.resolve(strict=False))
            state.destination_path = request.destination_path
            state.final_path = destination
            state.file_name = source.file_name
            state.size_bytes = source.size_bytes
            state.source_created_at = source.created_at
            state.source_modified_at = source.modified_at
        return destination

    def remember_transfer_arrival(self, destination: Path) -> str:
        final_path = str(destination.resolve(strict=False))
        with cast(Any, self).locked_state() as state:
            state.destination_host_name = cast(Any, self).context.name
            state.destination_host_url = cast(Any, self).context.address
            state.final_path = final_path
        return final_path

    def register_transfer_file(self, source: FileTransferSource, mode: FileTransferMode) -> None:
        cast(Any, self).register_file(source.path, name=self.registered_file_name, mode=mode.value)

    def register_planned_file(self, plan: FileTransferPlan) -> None:
        self.register_transfer_file(plan.source, plan.request.mode)

    def mark_waiting_for_arrival(self) -> None:
        cast(Any, self).set_task_status(TaskStatus.WAITING_FOR_ARRIVAL, done=False)

    def build_transfer_result(self, *, destination_path: str, dry_run: bool) -> FileTransferResult:
        with cast(Any, self).locked_state() as state:
            return FileTransferResult(
                transfer_id=state.transfer_id,
                mode=state.mode,
                dry_run=dry_run,
                source=FileTransferEndpoint(
                    host_name=state.source_host_name,
                    host_url=state.source_host_url,
                    path=state.source_path,
                    created_at=state.source_created_at,
                    modified_at=state.source_modified_at,
                ),
                destination=FileTransferEndpoint(
                    host_name=state.destination_host_name,
                    host_url=state.destination_host_url,
                    path=destination_path,
                ),
                file_name=state.file_name,
                size_bytes=state.size_bytes,
                size=format_bytes(state.size_bytes),
            )

    def plan_destination_path(self, destination: str, source_name: str) -> str:
        return _planned_destination_text(destination, source_name)

    def resolve_destination_path(self, destination: str, source_name: str, *, check_existing_dir: bool = True) -> Path:
        return _destination_path(destination, source_name, check_existing_dir=check_existing_dir)

    def atomic_copy_file(self, source: Path, destination: Path, *, overwrite: bool) -> Path:
        if destination.exists() and not overwrite:
            raise FileExistsError(f"destination already exists: {destination}")
        destination.parent.mkdir(parents=True, exist_ok=True)
        tmp_path = destination.with_name(f".{destination.name}.{uuid.uuid4().hex}.part")
        try:
            shutil.copy2(source, tmp_path)
            os.replace(tmp_path, destination)
        finally:
            with contextlib.suppress(FileNotFoundError):
                tmp_path.unlink()
        return destination.resolve(strict=False)

    def save_registered_file_to_destination(self, *, destination: str, source_name: str, overwrite: bool) -> Path:
        scratch_path = cast(Any, self).file_path(self.registered_file_name)
        final_destination = self.resolve_destination_path(destination, source_name, check_existing_dir=True)
        return self.atomic_copy_file(scratch_path, final_destination, overwrite=overwrite)

    def current_transfer_arrival(self) -> FileTransferArrival | None:
        with cast(Any, self).locked_state() as state:
            if state.status is not TaskStatus.WAITING_FOR_ARRIVAL:
                return None
            source_name = state.file_name
            destination_path = state.destination_path
            overwrite = bool(state.overwrite)
            mode = state.mode
        return FileTransferArrival(
            scratch_path=cast(Any, self).file_path(self.registered_file_name),
            destination_path=destination_path,
            source_name=source_name,
            overwrite=overwrite,
            mode=mode,
        )

    def save_arrived_file(self, arrival: FileTransferArrival) -> Path:
        destination = self.resolve_destination_path(arrival.destination_path, arrival.source_name)
        return self.atomic_copy_file(arrival.scratch_path, destination, overwrite=arrival.overwrite)

    def notify_transfer_info(self, title: str, message: str) -> bool:
        return self.notify_user_info(
            NotificationSeverity.INFO,
            f"{self.notification_title} {title}",
            message,
            job_id=cast(Any, self).state.transfer_id,
        )

    def notify_transfer_error(self, title: str, message: str) -> bool:
        return self.notify_user_info(
            NotificationSeverity.ERROR,
            f"{self.notification_title} {title}",
            message,
            job_id=cast(Any, self).state.transfer_id,
        )

SingleFileTransferPaglet

Bases: FileMobilityMixin, TaskPaglet[FileTransferRequest, FileTransferResult, FileTransferState]

Convenience task paglet that moves one registered file to a destination host.

Source code in src/paglets/patterns/file_mobility.py
class SingleFileTransferPaglet(
    FileMobilityMixin,
    TaskPaglet[FileTransferRequest, FileTransferResult, FileTransferState],
):
    """Convenience task paglet that moves one registered file to a destination host."""

    Request = FileTransferRequest
    Result = FileTransferResult
    State = FileTransferState

    def run_task(self, request: FileTransferRequest) -> FileTransferResult | PagletProxy | None:
        plan = self.prepare_file_transfer(request)
        self._notify_info(
            "found file",
            f"Found {plan.source.path} ({format_bytes(plan.source.size_bytes)}); "
            f"destination {request.destination_label or request.target_host}:{plan.destination_path}",
        )
        if request.dry_run:
            result = self.build_transfer_result(destination_path=plan.destination_path, dry_run=True)
            self.complete_task(result)
            self._notify_info(
                "dry run",
                f"Would {request.mode.value} {plan.source.path} to "
                f"{request.destination_label or request.target_host}:{plan.destination_path}",
            )
            return None
        self.register_planned_file(plan)
        self.mark_waiting_for_arrival()
        self._notify_info(
            "dispatching",
            f"Dispatching to {request.destination_label or request.target_host} "
            f"with registered file {plan.source.file_name}",
        )
        return self.dispatch(plan.target_host)

    def on_arrival(self, event) -> None:
        _ = event
        arrival = self.current_transfer_arrival()
        if arrival is None:
            return
        try:
            destination = self.save_arrived_file(arrival)
        except Exception as exc:
            self.fail_task(exc)
            self._notify_error("failed", str(exc))
            raise
        final_path = self.remember_transfer_arrival(destination)
        result = self.build_transfer_result(destination_path=final_path, dry_run=False)
        self.complete_task(result)
        self._notify_info(
            "saved file",
            f"Saved {format_bytes(self.state.size_bytes)} to "
            f"{self.context.name}:{self.state.final_path} ({arrival.mode.value})",
        )

    def _notify_info(self, title: str, message: str) -> bool:
        return self.notify_transfer_info(title, message)

    def _notify_error(self, title: str, message: str) -> bool:
        return self.notify_transfer_error(title, message)
  • Implementing Paglets introduces the pattern selection guide and the lower-level raw message API.
  • File Grabber shows a file transfer paglet that keeps the workflow visible while reusing FileMobilityMixin.
  • Performance Benchmark and Mesh Search use typed operations with fan-out and drain helpers while keeping benchmark and search work in the examples.