Implementing Paglets¶
This guide focuses on the code you write when creating paglets.
Define State Explicitly¶
A paglet moves with one dataclass state object. Keep durable workflow state in that dataclass. Treat ordinary instance attributes as transient runtime data.
from dataclasses import dataclass, field
from paglets.core.agent import Paglet, PagletState
@dataclass
class TravellerState(PagletState):
visits: list[str] = field(default_factory=list)
home_url: str = ""
class Traveller(Paglet[TravellerState]):
State = TravellerState
State values should be ordinary dataclass-serializable Python values: nested
dataclasses, primitives, collections, enums, paths, bytes, and bytearray.
Movement keeps binary values native in the streamed pickle envelope. JSON state
inspection projects binary values as tagged base64 objects for raw HTTP
clients; HostClient restores those tagged values automatically.
Active paglets run in child Python processes started with the spawn method.
Both the paglet class and the state class must be importable by module path, for
example myapp.agents:Traveller. Classes defined in __main__, a REPL, stdin,
or a throwaway script cannot be started as paglets because the child process
cannot re-import them.
Process isolation means a crash or sys.exit() in one paglet does not directly
kill the host or other paglets, and multiple worker paglets can use multiple CPU
cores. It also means paglet startup is heavier than constructing an in-memory
object, so prefer coarser batches for very small units of work.
Implement Lifecycle Hooks¶
Override only the lifecycle hooks you need:
def on_creation(self, event):
self.state.visits.append(f"created:{event.host_name}")
def on_dispatching(self, event):
self.state.visits.append(f"leaving:{event.source_host_name}")
def on_arrival(self, event):
self.state.visits.append(f"arrived:{event.host_name}")
def on_clone(self, event):
self.state.visits.append(f"clone:{event.host_name}")
def run(self):
self.state.visits.append(f"run:{self.context.name}")
run() is invoked after creation, arrival, clone arrival, and activation. Do not
expect call stacks, threads, sockets, or open files to move with the paglet.
Move Files Naturally¶
Files do not move just because a Python file handle exists. Register files that belong to the paglet instance:
path = self.work_dir() / "result.db"
path.write_bytes(result_bytes)
self.register_file(path, name="result.db", mode="move")
Registered files are copied to the target host before on_arrival() or
on_clone() runs. On the target, use the logical name to find the scratch copy:
mode="copy" leaves the source in place. mode="move" deletes the source only
after a successful dispatch. Clone always copies and never deletes the source.
Target copies live under the arriving paglet's scratch/work directory and are
removed when that paglet is disposed.
Choose The Right Pattern¶
For simple request/result paglets, prefer the typed task pattern over writing a
custom message protocol. TaskPaglet owns the low-level handle_message()
routing for start, status, and wait, while TaskClient gives callers a
small typed proxy wrapper:
from dataclasses import dataclass
from paglets.patterns.tasks import TaskPaglet, TaskState
@dataclass(frozen=True, slots=True)
class AddRequest:
value: int
@dataclass(frozen=True, slots=True)
class AddResult:
value: int
@dataclass
class AddState(TaskState):
pass
class AddPaglet(TaskPaglet[AddRequest, AddResult, AddState]):
State = AddState
Request = AddRequest
Result = AddResult
def run_task(self, request: AddRequest) -> AddResult:
return AddResult(value=request.value + 1)
Callers do not need raw Message("start") calls:
from paglets.patterns.tasks import TaskClient
task = TaskClient.for_paglet(proxy, AddPaglet)
summary = task.start_and_wait(AddRequest(value=41))
For file-moving paglets, choose the smallest helper layer that fits:
TaskPagletfor typed request/result workflows with no raw message protocol.OperationPagletandOperationClientfor paglets with several named typed operations, such asstart,drain,summary, andcleanup.MeshFanoutMixinfor parent paglets that clone children across visible mesh hosts and collect child replies.CursorDrainMixinfor streaming event drains where clients poll by cursor.FileMobilityMixinfor readable custom workflows that still share source stat, destination planning, registered-file, result, notification, and atomic write helpers.SingleFileTransferPagletfor the fully standard "stat a file, dispatch with it, and place it on arrival" workflow.
The pattern helpers remove repeated protocol and bookkeeping code, but the
domain action should still be visible in the example paglet. For example,
paglets-file-grabber still shows "prepare file, dry-run or register,
dispatch, save arrived file"; the helper only names the reusable steps.
Handle Messages¶
Paglets can still talk directly through messages delivered to handle_message.
Use this lower-level API when you need a custom protocol, mailbox priority, or
advanced fan-in behavior that does not fit TaskPaglet.
from paglets.core.messages import Message
def handle_message(self, message: Message):
if message.kind == "status":
return {"host": self.context.name, "visits": list(self.state.visits)}
return self.not_handled()
Callers use a proxy:
Supported communication patterns:
- synchronous replies with
send(Message(...)); - fire-and-forget delivery with
send_oneway(Message(...)); - future-style replies with
send_future(Message(...)); - local broadcast through
context.multicast.
bytes and bytearray in message args, message arg, and typed service
dataclasses are converted to tagged JSON automatically and restored on the
receiver. Keep ordinary messages small; large files should use application
storage, natural file mobility, or low-level artifact transport.
Normal messages are delivered through the target paglet's mailbox. The mailbox
selects higher-priority queued messages before lower-priority queued messages
and keeps FIFO order within one priority. Use UNQUEUED_PRIORITY only for the
explicit immediate bypass.
Paglets can coordinate mailbox handlers:
Use wait_message() when you need a low-level mailbox notification. When one
handler or background thread is waiting for another handler to change paglet
state, prefer the predicate-based state wait:
if self.wait_state(lambda state: not state.pending, timeout=5.0):
with self.locked_state() as state:
return state.result
After mutating state that may satisfy a waiter, wake it explicitly:
with self.locked_state() as state:
state.result = reply
state.pending = False
self.notify_all_state_changed()
Inactive paglets can still receive messages. By default, the host activates the
paglet and delivers the message. Use no_delay=True when the caller wants a
fast failure instead of activation or queueing:
Protect Shared State¶
A paglet child handles one lifecycle or message command at a time. run() may
also start background threads. When two code paths read or write the dataclass
state, protect that short critical section with the paglet lock:
Use locked() for transient instance attributes that must be updated together
with other agent-local data:
For small helper methods, @state_locked keeps the handler readable:
from paglets.core.agent import state_locked
@state_locked
def remember_result(self, result):
self.state.results.append(result)
Keep locks around short state reads and writes only. Do not hold them while waiting for another message, sleeping, calling a remote proxy, doing disk I/O, or running a long computation.
Paglet.MAILBOX_WORKERS is ignored by the process runtime. Queued
handle_message calls are actor-style serial inside one paglet process.
Parallel CPU work should be split into multiple paglet instances, not multiple
message workers in the same instance.
Do not make a parent message handler block while waiting for child paglets to send result messages back to that same parent. The parent cannot process those messages until the current handler returns. For custom protocols, use this pattern instead:
- A
startorcollectmessage stores request state, creates/clones workers, and returns quickly. - Workers run in their own paglet processes and send
child_resultmessages. - The parent records results, calls
notify_all_state_changed(), and exposes a typedstatus/waitpath or another explicit polling message.
Move Between Hosts¶
Use dispatch when the current paglet should move away:
Use clone when the current paglet should keep running and send a copy:
When hosts are in the mesh, prefer name-based helpers:
Use a TransferTicket for preflight checks, retries, or inactive arrival:
from paglets.core.runtime_values import ArrivalMode
from paglets.remote.transfer import TransferTicket
self.dispatch(
TransferTicket(
"beta",
required_capabilities=("agents:create",),
expected_code_version=self.context.host.mesh.code_version,
arrival_mode=ArrivalMode.INACTIVE,
)
)
Discover Hosts¶
Paglets can inspect the local host's mesh registry:
The registry is version-gated. Hosts with different code versions are ignored by the mesh so paglets do not move into a host that probably cannot import the same classes.
Deactivate And Activate¶
Use deactivate when a paglet should stop its active child process but keep
durable state:
A paglet can deactivate itself and choose its own inactive policy:
import time
from paglets.persistence.persistency import DeactivationPolicy
def handle_message(self, message: Message):
if message.kind == "pause":
return self.deactivate(
policy=DeactivationPolicy(activate_at=time.time() + 3600)
).to_wire()
return self.not_handled()
Override deactivation_policy when the paglet should decide how external
deactivation requests behave:
def deactivation_policy(self, request):
return DeactivationPolicy(
activate_on_message=False,
queue_messages_when_inactive=True,
activate_on_startup=request.reason == "shutdown",
)
If activate_on_message is false and queueing is enabled, normal messages are
stored with the inactive record and delivered after activation. A no_delay
message fails immediately instead.
Talk To Resident Services¶
Prefer typed service contracts for resident services. The built-in
server-info service is a ready-made example: each host declares it from launch
config, callers can discover the SERVER_INFO contract immediately, and the
provider agent starts lazily on first use:
from paglets.core.runtime_values import ServiceScope
from paglets.system.server_info import GET_DISK, SERVER_INFO, DiskRequest
service = self.require_contract(SERVER_INFO, operation=GET_DISK, scope=ServiceScope.MESH)
reply = service.call(GET_DISK, DiskRequest(paths=["/"], all_volumes=False))
For custom services, put the ServiceContract, ServiceOperation, and payload
dataclasses in an importable module shared by provider and caller. The provider
uses advertise_contract, routes with contract.route(...), and the caller
uses require_contract or lookup_contract.
Managed resident services are declared in launch config:
[[resident_services]]
class = "myapp.services.ticket_agent:TicketServiceAgent"
agent_id = "service.ticket"
lifecycle = "lazy"
scope = "mesh"
idle_timeout = 30.0
Use lifecycle = "lazy" when the service only needs to run while requests are
active. Use lifecycle = "eager" for continuous monitors or services that must
keep live local resources open. Lazy services deactivate after their idle
timeout, but their service record stays discoverable and a later call activates
them again.
TOML and JSON store these closed values as strings because those formats do not
have enums. Python code uses enum values such as ServiceScope.MESH;
configuration loading converts strings like scope = "mesh" at the boundary.
Use a lease when several calls should keep a lazy provider active:
with self.lease_contract(SERVER_INFO, operation=GET_DISK, scope=ServiceScope.MESH) as service:
first = service.call(GET_DISK, DiskRequest(paths=["/"]))
second = service.call(GET_DISK, DiskRequest(paths=["/data"]))
The lower-level string API remains available when a fully typed contract is not
needed. lookup_service returns a serializable PagletProxyRef, which can be
stored in dataclass state or resolved to a proxy:
service_ref = self.lookup_service("flight-ticket", capability="quote", scope=ServiceScope.MESH)
if service_ref is not None:
reply = service_ref.resolve(self.context).send(Message("quote", {"from": "FRA", "to": "SFO"}))
Observe Context Events¶
Hosts keep a bounded in-memory context event log and deliver events to listeners. Events cover create, arrival, dispatch, clone, retract, deactivate, activate, dispose, message delivery/failure, service changes, and transfer failures.
host.add_listener(lambda event: print(event.event_id, event.kind))
events = host.list_events(since=0, limit=100)
The HTTP API exposes the same log at GET /events?since=<id>&limit=<n>.
Clean Up Runtime Resources¶
Only dataclass state moves or persists. Register transient resources that need cleanup before dispatch, deactivate, retract, or dispose:
self.resources.track_closeable("socket", sock)
self.resources.register("temp-file", lambda: path.unlink(), suppress=True)
Cleanup failures cancel the lifecycle operation unless that resource was
registered with suppress=True.
Use Managed Storage¶
Use work_dir() for per-instance scratch files that should not survive final
departure or host restart:
The host clears all work directories on startup. It also clears an instance's work directory on dispatch, retract, or dispose. Deactivation keeps the work directory while the same host runtime remains up, but a restart clears it.
Use persistent_storage() for small class-level state that should survive host
restart:
store = self.persistent_storage()
store.write_text("checkpoint.txt", "ok")
data = store.read_bytes("checkpoint.txt")
Persistent storage is rooted under the host persistence directory, shared by
paglet class, and quota-accounted by the API. The default quota is 10 MB per
class and can be changed with paglets-host --persistent-storage-quota 20M or
the Host(..., persistent_storage_quota_bytes=...) constructor argument.
Query Mesh Placement¶
The built-in mesh-info resident service keeps fresh host resource snapshots,
including active/inactive paglet counts, and ranks eligible compute targets:
from paglets.core.runtime_values import ServiceScope
from paglets.system.mesh_info import MESH_INFO, SELECT_TARGETS, TargetSelectionRequest
mesh_info = self.require_contract(MESH_INFO, operation=SELECT_TARGETS, scope=ServiceScope.LOCAL)
targets = mesh_info.call(SELECT_TARGETS, TargetSelectionRequest(limit=2, max_load_per_cpu=1.0))
For distributed compute, keep the coordinator's accumulated job state on one
host and create short-lived worker paglets remotely. Workers should report
results by message and dispose themselves after sending the result. The
coordinator should return from its launch message quickly and use drain or
summary for progress, because it cannot handle worker result messages while a
previous message handler is still running. For
CPU-style batch work, treat a selected host as several placement slots instead
of only one target: estimate slots from cpu_count * target_load_per_cpu -
load_1m, subtract already in-flight workers on that host, and optionally cap
the result with a per-host limit. Keep one small fallback worker available when
all hosts are above the load threshold so long-running jobs still make minimum
progress.
Keep Imports Stable¶
Movement sends class names like demos.disk_survey_demo:DiskSurveyPaglet.
Every target host must be able to import the same module and class name.
Use package modules, not ad-hoc script-only code, for paglets that should move between independently started hosts.
Test Locally¶
For local tests, create multiple host objects in one process:
from paglets.runtime.host import Host
alpha = Host("alpha", port=8765, mesh_version="dev")
beta = Host("beta", port=8766, peers=["http://127.0.0.1:8765"], mesh_version="dev")
alpha.start_background()
beta.start_background()
Stop hosts in finally blocks or test fixtures so background server threads do
not leak between tests.