Analysis Jobs¶
paglets examples analysis is a synthetic distributed dataframe analysis example.
It demonstrates how application paglets can use the built-in
Compute Slots scheduler without putting
application-specific result collection into the scheduler itself.
The example starts on the laptop/home host, creates configurable job paglets, sends them to suitable Linux compute hosts, waits for scheduler grants, performs synthetic pandas/scikit-learn work, stores result frames while the laptop is offline, then returns home and appends results to SQLite under a cross-process file lock.
This example keeps SQLite result collection application-specific so the database write path is visible. For reusable "N jobs plus collector" status and failure bookkeeping, use the group helpers described in Detached Compute With A Collector.
Running It¶
Start a laptop/home host and one or more Linux-style compute hosts in the same mesh:
uv run paglets host --name laptop --bind-public auto --port 8765 --mesh-version analysis
uv run paglets host --name linux-a --bind-public auto --peer http://laptop:8765 --port 8765 --mesh-version analysis
uv run paglets host --name linux-b --bind-public auto --peer http://laptop:8765 --port 8765 --mesh-version analysis
Start the campaign from the laptop host:
Useful shorter development run:
The default target runtime is about 2-3 minutes per job. The job pads with CPU work when the actual model training finishes sooner.
Components¶
flowchart LR
CLI["paglets examples analysis CLI"]
Seeder["CampaignSeederPaglet"]
Job["AnalysisJobPaglet"]
Slots["compute-slots"]
Info["user-info"]
DB[("SQLite result DB")]
CLI -->|"create + start"| Seeder
Seeder -->|"create 20 jobs"| Job
Job -->|"candidate_hosts / request_slot"| Slots
Slots -->|"grant / redirect"| Job
Slots -->|"ask host to apply CPU affinity"| Job
Job -->|"notify"| Info
Job -->|"return home"| DB
CampaignSeederPaglet- Runs on the home host. It creates a configurable number of
AnalysisJobPagletinstances and records their IDs. Its CLI-facingstart/summaryprotocol uses typed operations, while_seed_jobs()keeps the job creation workflow explicit in the example. AnalysisJobPaglet- Derives from
ComputeJobPaglet. It carries job configuration, resource wishes, SQLite DB path, and result metadata. The base class captures the home host and carries scheduler request IDs, lease IDs, redirects, completion state, and affinity metadata. compute-slots- Filters suitable hosts, queues waiting jobs, grants leases, and redirects sleepers through bounded work stealing.
user-info- Prints user-facing notifications from jobs.
How The Job Is Implemented¶
The example has two paglet classes:
CampaignSeederPagletruns once on the home host and creates manyAnalysisJobPagletinstances.AnalysisJobPagletis the actual compute job. It derives fromComputeJobPaglet, so it only implements analysis and result handling.
The application state extends ComputeJobState. Scheduler resource estimates
are normal state fields, while analysis-specific fields such as job_id,
seed, db_path, and result paths belong to this example:
@dataclass
class AnalysisJobState(ComputeJobState):
campaign_id: str = ""
job_id: str = ""
status: str = STATUS_NEW
seed: int = 0
db_path: str = ""
row_count: int = DEFAULT_ROW_COUNT
feature_count: int = DEFAULT_FEATURE_COUNT
estimator_trees: int = DEFAULT_ESTIMATOR_TREES
target_runtime_seconds: float = DEFAULT_TARGET_RUNTIME_SECONDS
estimated_runtime_seconds: float = DEFAULT_TARGET_RUNTIME_SECONDS
home_check_seconds: float = DEFAULT_HOME_CHECK_SECONDS
db_lock_timeout_seconds: float = DEFAULT_DB_LOCK_TIMEOUT_SECONDS
cpu_cores: int = DEFAULT_CPU_CORES
memory_bytes: int = DEFAULT_MEMORY_BYTES
temp_storage_bytes: int = DEFAULT_TEMP_STORAGE_BYTES
result_paths: dict[str, str] = field(default_factory=dict)
result_payloads: dict[str, bytes] = field(default_factory=dict)
started_at: float = 0.0
completed_at: float = 0.0
error: str = ""
The seeder creates one state object per task. This is where the application
assigns its own job_id and resource estimates. The compute scheduler later
uses cpu_cores, memory_bytes, temp_storage_bytes, and
estimated_runtime_seconds; it does not interpret the analysis job_id.
def _seed_jobs(self, request: AnalysisCampaignRequest) -> None:
campaign_id = f"analysis-{uuid.uuid4().hex}"
task_count = max(1, int(request.task_count))
for index in range(task_count):
job_id = f"{campaign_id}-{index:04d}"
state = AnalysisJobState(
campaign_id=campaign_id,
job_id=job_id,
seed=index + 10_000,
db_path=request.db_path,
row_count=max(100, int(request.row_count)),
feature_count=max(4, int(request.feature_count)),
estimator_trees=max(1, int(request.estimator_trees)),
target_runtime_seconds=max(0.0, float(request.target_runtime_seconds)),
estimated_runtime_seconds=max(0.0, float(request.target_runtime_seconds)),
db_lock_timeout_seconds=max(0.0, float(request.db_lock_timeout_seconds)),
cpu_cores=max(1, int(request.cpu_cores)),
memory_bytes=max(0, int(request.memory_bytes)),
temp_storage_bytes=max(0, int(request.temp_storage_bytes)),
)
try:
proxy = self.context.create_paglet(AnalysisJobPaglet, state)
with self.locked_state() as current:
current.created_jobs.append({"job_id": job_id, "agent_id": proxy.agent_id})
except Exception as exc:
with self.locked_state() as current:
current.errors[job_id] = str(exc)
with self.locked_state() as current:
current.done = True
self.notify_all_state_changed()
The compute part is intentionally concentrated in run_compute_job(). The base
class calls this method only after a compute slot has been granted. It also
releases the lease when the method returns or raises.
def run_compute_job(self) -> None:
with self.locked_state() as state:
state.status = STATUS_RUNNING
state.started_at = time.time()
job_id = state.job_id
seed = state.seed
row_count = state.row_count
feature_count = state.feature_count
estimator_trees = state.estimator_trees
target_runtime = state.target_runtime_seconds
with self.locked_state() as state:
cpu_core_ids = list(state.cpu_core_ids)
cpu_affinity_supported = state.cpu_affinity_supported
cpu_affinity_enforced = state.cpu_affinity_enforced
cpu_affinity_error = state.cpu_affinity_error
data = download_data(job_id=job_id, seed=seed, row_count=row_count, feature_count=feature_count)
frames = process_data_to_frames(
data,
job_id=job_id,
host_name=self.context.name,
seed=seed,
target_runtime_seconds=target_runtime,
estimator_trees=estimator_trees,
cpu_core_ids=cpu_core_ids,
cpu_affinity_supported=cpu_affinity_supported,
cpu_affinity_enforced=cpu_affinity_enforced,
cpu_affinity_error=cpu_affinity_error,
)
payloads = frames_to_payloads(frames)
result_paths = self._save_payloads(payloads)
with self.locked_state() as state:
state.result_paths = result_paths
state.result_payloads = {}
state.completed_at = time.time()
state.status = STATUS_WAITING_FOR_HOME
After the compute method returns, the job may need several wakeups before the
home laptop is online again. That repeated post-compute behavior belongs in
continue_after_compute_success():
def continue_after_compute_success(self) -> None:
with self.locked_state() as state:
status = state.status
if status == STATUS_RETURNING_HOME:
self._commit_at_home()
return
self._try_return_home()
_try_return_home() is application logic, not scheduler logic. It checks
whether the home host is visible, deactivates for a timed retry when the laptop
is offline, or dispatches the paglet home when the laptop is online:
def _try_return_home(self) -> None:
if self.is_compute_home():
with self.locked_state() as state:
state.result_payloads = self._load_payloads(state.result_paths)
state.status = STATUS_RETURNING_HOME
self._commit_at_home()
return
with self.locked_state() as state:
home = state.home_host_url or state.home_host_name
interval = max(1.0, float(state.home_check_seconds))
if not home or not self.context.is_host_online(home):
self.deactivate(
policy=DeactivationPolicy.after(
interval,
activate_on_message=True,
queue_messages_when_inactive=True,
activate_on_startup=False,
)
)
return
with self.locked_state() as state:
state.result_payloads = self._load_payloads(state.result_paths)
state.status = STATUS_RETURNING_HOME
target = state.home_host_url or state.home_host_name
self.dispatch(target)
Once back home, the example serializes only the final SQLite write. Compute, movement, and payload loading are not locked:
def _commit_at_home(self) -> None:
with self.locked_state() as state:
state.status = STATUS_COMMITTING
payloads = dict(state.result_payloads)
db_path = state.db_path
timeout = state.db_lock_timeout_seconds
append_frames_to_sqlite(db_path, payloads, lock_timeout_seconds=timeout)
with self.locked_state() as state:
state.status = STATUS_COMMITTED
self.notify_user(
"info",
"Analysis result saved",
f"Saved {self.state.job_id} to {db_path}",
job_id=self.state.job_id,
)
self.context.host.dispose(self.agent_id)
Failure handling is deliberately small. The base class already sets
compute_status = FAILED_FINAL, records compute_error, and releases any
lease. The example only mirrors the failure into its application status and
sends a notification:
def after_compute_failure(self, message: str) -> None:
with self.locked_state() as state:
state.status = STATUS_FAILED_FINAL
state.error = message
self.notify_user("error", "Analysis job failed", message, job_id=self.state.job_id)
Everything not shown here is either ordinary data processing in
workload.py or private helper code for saving/loading result payloads. New
compute job types should usually need the same small surface: a state dataclass,
run_compute_job(), optional post-compute return/commit logic, and optional
failure notification.
Compute Job API Usage¶
The example is intentionally small at the scheduler boundary. AnalysisJobPaglet
does not implement placement, candidate selection, local slot requests, sleep,
redirect handling, affinity assignment, or lease release. Those mechanics come
from ComputeJobPaglet.
The example implements only the application-specific hooks:
| Method | Why the example implements it | What the base class still owns |
|---|---|---|
handle_compute_job_message() |
Adds a status message that returns the analysis job state for diagnostics. |
Scheduler messages such as compute_slot_granted and compute_slot_redirect. |
run_compute_job() |
Runs the pandas/scikit-learn workload after a slot was granted, stores result payloads on the compute host, and changes application status to WAITING_FOR_HOME. |
Starting this method only after a lease exists, recording affinity metadata, and releasing the lease after the method returns or fails. |
continue_after_compute_success() |
Handles the post-compute phase: wait for home, dispatch home, or commit once already returning. This hook can run more than once after timed wakeups. | Calling this hook when compute first completes and again when a completed paglet wakes. |
after_compute_failure(message) |
Copies the failure into application fields and sends a user notification. | Marking compute_status = FAILED_FINAL, recording compute_error, and releasing any lease. |
The example also has private application helpers, not scheduler hooks:
_try_return_home()checks whether the laptop/home host is online. If not, it deactivates with a timed wakeup. If yes, it loads result payloads and dispatches the paglet home._commit_at_home()appends result frames to SQLite under the file lock, notifies the user, and disposes the completed paglet._save_payloads()and_load_payloads()move serialized result frames between local persistent storage and mobile paglet state.
There are two status fields by design:
| Field | Owner | Values in this example |
|---|---|---|
compute_status |
ComputeJobPaglet |
NEW, PLACING, WAITING_FOR_SLOT, RUNNING, COMPLETED, FAILED_FINAL. |
status |
AnalysisJobPaglet |
NEW, RUNNING, WAITING_FOR_HOME, RETURNING_HOME, COMMITTING, COMMITTED, FAILED_FINAL. |
New compute job types should normally copy this shape: put resource estimates
and domain configuration in a ComputeJobState subclass, implement
run_compute_job(), and add continue_after_compute_success() only when
results need a return/wait/commit phase.
Job State Machine¶
stateDiagram-v2
[*] --> NEW
NEW --> PLACING: first run on home
PLACING --> PLACING: candidate selected, dispatch
PLACING --> WAITING_FOR_SLOT: scheduler returns sleep
PLACING --> RUNNING: scheduler returns run_now
WAITING_FOR_SLOT --> RUNNING: compute_slot_granted
WAITING_FOR_SLOT --> PLACING: compute_slot_redirect
RUNNING --> WAITING_FOR_HOME: analysis saved remotely
WAITING_FOR_HOME --> WAITING_FOR_HOME: home offline, timed wakeup
WAITING_FOR_HOME --> RETURNING_HOME: home online
RETURNING_HOME --> COMMITTING: arrives home with payloads
COMMITTING --> COMMITTED: SQLite append succeeds
NEW --> FAILED_FINAL: no suitable host
PLACING --> FAILED_FINAL: rejected
RUNNING --> FAILED_FINAL: processing failed
COMMITTING --> FAILED_FINAL: save failed
Placement And Scheduling Flow¶
sequenceDiagram
participant HomeJob as "AnalysisJobPaglet on home"
participant EntrySlots as "entry compute-slots"
participant JobA as "same job on Linux A"
participant SlotsA as "compute-slots A"
participant JobB as "same job on Linux B"
participant SlotsB as "compute-slots B"
participant Info as "user-info"
HomeJob->>EntrySlots: candidate_hosts(resource wishes)
EntrySlots->>SlotsA: scheduler_status
EntrySlots->>SlotsB: scheduler_status
SlotsA-->>EntrySlots: capacity and limits
SlotsB-->>EntrySlots: capacity and limits
EntrySlots-->>HomeJob: suitable candidates + selected candidate
alt no candidates
HomeJob->>Info: notify warning
HomeJob->>HomeJob: FAILED_FINAL
else candidates found
HomeJob->>HomeJob: persist selected candidate
HomeJob->>JobA: dispatch self to Linux A
JobA->>SlotsA: request_slot
alt run_now
SlotsA-->>JobA: lease_id
SlotsA->>JobA: host-managed CPU affinity if supported
JobA->>JobA: run analysis
else sleep
SlotsA-->>JobA: queued
JobA->>JobA: deactivate on Linux A
SlotsA-->>JobA: compute_slot_granted activates job
SlotsA->>JobA: host-managed CPU affinity if supported
JobA->>JobA: run analysis
else redirect
SlotsA-->>JobA: target Linux B + redirect metadata
JobA->>JobB: dispatch self to Linux B
JobB->>SlotsB: request_slot again on B
end
end
The initial target is selected by compute-slots from suitable ranked
candidates. ComputeJobPaglet persists that target, dispatches itself there,
and carries redirect metadata so peer schedulers can apply cooldown hysteresis.
Synthetic Workload¶
Each job:
- Generates deterministic synthetic classification data from its job seed.
- Converts the data into a pandas dataframe.
- Receives scheduler affinity metadata; the scheduler asks the host to pin the job process when the platform supports affinity.
- Trains a scikit-learn random forest with
n_jobs=1. - Computes prediction accuracy.
- Produces three result frames:
job_summaryfeature_summaryprediction_summary- Pads with CPU work until the configured target runtime is reached.
The defaults are intentionally large enough to make scheduling observable:
DEFAULT_TASK_COUNT = 20DEFAULT_ROW_COUNT = 80000DEFAULT_FEATURE_COUNT = 32DEFAULT_ESTIMATOR_TREES = 80DEFAULT_TARGET_RUNTIME_SECONDS = 150
Lower these values for tests or demos on small machines.
Result Collection¶
Result collection is application-specific. In this example, jobs return home and append to a SQLite database.
While the laptop is offline, a completed job stores result payloads in its paglet persistent storage on the compute host. Every five minutes it wakes and checks whether the home host is online.
When home is visible, the job loads the result payloads into mobile state, dispatches home, and writes the SQLite tables:
job_summaryfeature_summaryprediction_summary
job_summary includes the scheduler and affinity outcome for the run:
cpu_core_ids, the concrete CPU IDs granted on affinity-capable hosts.cpu_affinity_supported, whether this host can enforce process affinity.cpu_affinity_enforced, whether the job process was actually pinned before the workload started.cpu_affinity_error, the non-fatal reason when pinning was requested but not enforced.
SQLite writes are serialized with a cross-process file lock at:
The example also uses BEGIN IMMEDIATE so the SQLite transaction obtains a
write lock before appending frames.
sequenceDiagram
participant Job as "completed job on Linux"
participant Home as "laptop host"
participant Lock as "file lock"
participant DB as "SQLite DB"
loop every 5 minutes
Job->>Job: check home online
end
Job->>Job: load result payloads
Job->>Home: dispatch home
Job->>Lock: acquire <db>.paglets.lock
Job->>DB: BEGIN IMMEDIATE
Job->>DB: append result tables
Job->>DB: commit
Job->>Lock: release
Job->>Job: dispose
Implementing Your Own Job Paglet¶
Use this example as a template for the application-specific parts of a compute
job. The reusable scheduling mechanics live in ComputeJobState and
ComputeJobPaglet; new job types should not reimplement candidate selection,
slot requests, redirects, sleep handling, affinity metadata, or lease release.
- Put scheduler fields in a state class derived from
ComputeJobState. - Put job configuration and progress in the same dataclass state.
- Derive the job paglet from
ComputeJobPaglet. - Estimate CPU cores, RAM, temp storage, runtime, and GPU needs before placement.
- Set
estimated_runtime_secondsin the state and implementrun_compute_job(). - Use application-specific fields such as
job_id,dataset_name, orresult_keyfor result labels. - Let the base class own advancement, scheduler wakeups, and lease release.
- Use
continue_after_compute_success()when a completed job needs to wake later, return home, or commit results after compute has finished. - Store results before waiting for home or another destination.
- Keep final result collection task-specific.
- Use application-specific fields for result states such as
WAITING_FOR_HOMEorCOMMITTING;compute_statusis reserved for the scheduling base. - Send user-facing messages through the base
notify_user()helper, which uses User Info.