Skip to content

Remote Package

paglets.remote contains the host-to-host and client-to-host communication surface: HTTP clients, proxies, transfer tickets, transport helpers, mesh membership, and admin tooling.

Responsibilities

  • Provide a reusable HTTP client for host control endpoints.
  • Represent remote paglets through PagletProxy and serializable proxy refs.
  • Validate and carry movement intent with TransferTicket.
  • Stream binary state payloads for movement and shared-memory handoff helpers.
  • Maintain mesh membership, multicast beacons, version compatibility, and relay routing state.
  • Provide administration client records and dynamic entry-host discovery.

Main Modules

paglets.remote.client
Implements HostClient, request helpers, error decoding, and binary movement upload support, including streamed artifact upload/download helpers.
paglets.remote.proxy and paglets.remote.references
Provide the controlled handle and serializable reference form used to inspect, message, move, deactivate, activate, and dispose paglets.
paglets.remote.transfer
Defines TransferTicket, including required capabilities, expected code version, arrival mode, and target selection data.
paglets.remote.transport
Implements chunked pickle HTTP payloads, local pickle streams, shared-memory readers/writers, and JSON-safe binary tagging.
paglets.remote.mesh
Tracks known hosts, peer compatibility, multicast beacons, relayed hosts, and host name resolution.
paglets.remote.admin
Defines admin records, server URL normalization, LAN/mesh entry discovery, and PagletsAdminClient.

Implementation Notes

Control-plane operations stay JSON-oriented. Movement payloads are streamed pickle data because paglet state can contain binary values and large nested dataclass structures.

Large files should travel as artifacts rather than message bytes. Registered paglet files use artifact upload internally before the movement envelope is accepted on the target host, so the scratch copy is present before activation.

Mesh peers must agree on mesh version and compatible code version before they are used as movement targets. Relay/connect mode avoids inbound ports on clients by long-polling a hub host.

API Reference

paglets.remote.client

HostClient

Tiny JSON HTTP client used by proxies and hosts.

Source code in src/paglets/remote/client.py
class HostClient:
    """Tiny JSON HTTP client used by proxies and hosts."""

    def __init__(self, timeout: float = 10.0, *, api_key: str | None = None):
        self.timeout = timeout
        self.api_key = api_key

    def get_json(self, url: str, *, timeout: float | None = None) -> Any:
        return self._request("GET", url, None, timeout=timeout)

    def post_json(self, url: str, payload: dict[str, Any], *, timeout: float | None = None) -> Any:
        return self._request("POST", url, payload, timeout=timeout)

    def post_pickle(self, url: str, payload: dict[str, Any], *, timeout: float | None = None) -> Any:
        parsed = urlparse(url)
        connection = _connection(parsed, timeout=self.timeout if timeout is None else timeout)
        try:
            connection.putrequest("POST", _request_target(parsed))
            connection.putheader("Host", parsed.netloc)
            connection.putheader("Content-Type", PICKLE_CONTENT_TYPE)
            connection.putheader("Accept", "application/json")
            if self.api_key:
                connection.putheader("Authorization", f"Bearer {self.api_key}")
            connection.putheader("Transfer-Encoding", "chunked")
            connection.endheaders()
            dump_http_chunked_pickle(connection, payload)
            response = connection.getresponse()
            raw = response.read().decode("utf-8")
            if response.status >= 400:
                raise _error_from_response(response.status, raw, url)
            return restore_json_safe(json.loads(raw)) if raw else None
        except (OSError, http.client.HTTPException) as exc:
            raise RemoteHostError(f"Could not reach {url}: {exc}") from exc
        finally:
            connection.close()

    def upload_artifact(
        self,
        host_url: str,
        path: str | Path,
        *,
        owner_agent_id: str = "",
        name: str | None = None,
        compression: str = "",
        expires_at: float = 0.0,
        expected_sha256: str | None = None,
        timeout: float | None = None,
    ) -> ArtifactRef:
        source = Path(path)
        size = source.stat().st_size
        sha256 = expected_sha256 or file_sha256(source)
        query = urlencode(
            {
                "owner_agent_id": owner_agent_id,
                "name": name or source.name,
                "compression": compression,
                "expires_at": str(float(expires_at or 0.0)),
                "sha256": sha256,
                "size": str(size),
            }
        )
        parsed = urlparse(f"{host_url.rstrip('/')}/artifacts?{query}")
        connection = _connection(parsed, timeout=self.timeout if timeout is None else timeout)
        try:
            connection.putrequest("POST", _request_target(parsed))
            connection.putheader("Host", parsed.netloc)
            connection.putheader("Content-Type", "application/octet-stream")
            connection.putheader("Accept", "application/json")
            connection.putheader("Content-Length", str(size))
            if self.api_key:
                connection.putheader("Authorization", f"Bearer {self.api_key}")
            connection.endheaders()
            with source.open("rb") as handle:
                while True:
                    chunk = handle.read(STREAM_CHUNK_BYTES)
                    if not chunk:
                        break
                    connection.send(chunk)
            response = connection.getresponse()
            raw = response.read().decode("utf-8", errors="replace")
            if response.status >= 400:
                raise _error_from_response(response.status, raw, parsed.geturl())
            payload = restore_json_safe(json.loads(raw)) if raw else {}
            return ArtifactRef.from_wire(payload["artifact"])
        except (OSError, http.client.HTTPException) as exc:
            raise RemoteHostError(f"Could not reach {host_url}: {exc}") from exc
        finally:
            connection.close()

    def download_artifact(
        self,
        artifact: ArtifactRef | str,
        target: str | Path,
        artifact_id: str | None = None,
        *,
        move: bool = False,
        timeout: float | None = None,
    ) -> ArtifactRef:
        ref = (
            artifact
            if isinstance(artifact, ArtifactRef)
            else self.artifact_metadata(
                str(artifact),
                _required_artifact_id(artifact_id, operation="download_artifact"),
                timeout=timeout,
            )
        )
        target_path = Path(target)
        target_path.parent.mkdir(parents=True, exist_ok=True)
        tmp_path = target_path.with_name(f".{target_path.name}.{uuid.uuid4().hex}.part")
        parsed = urlparse(f"{ref.host_url.rstrip('/')}/artifacts/{ref.artifact_id}")
        connection = _connection(parsed, timeout=self.timeout if timeout is None else timeout)
        try:
            connection.putrequest("GET", _request_target(parsed))
            connection.putheader("Host", parsed.netloc)
            if self.api_key:
                connection.putheader("Authorization", f"Bearer {self.api_key}")
            connection.endheaders()
            response = connection.getresponse()
            if response.status >= 400:
                raw = response.read().decode("utf-8", errors="replace")
                raise _error_from_response(response.status, raw, parsed.geturl())
            with tmp_path.open("wb") as output:
                written, sha256 = copy_stream(response, output, expected_bytes=ref.size_bytes)
            if written != ref.size_bytes:
                raise TransferError(f"artifact size mismatch: expected {ref.size_bytes}, got {written}")
            if ref.sha256 and sha256.casefold() != ref.sha256.casefold():
                raise TransferError(f"artifact checksum mismatch: expected {ref.sha256}, got {sha256}")
            os.replace(tmp_path, target_path)
            if move:
                self.delete_artifact(ref)
            return ref
        except (OSError, http.client.HTTPException) as exc:
            raise RemoteHostError(f"Could not reach {ref.host_url}: {exc}") from exc
        finally:
            with contextlib.suppress(FileNotFoundError):
                tmp_path.unlink()
            connection.close()

    def artifact_metadata(self, host_url: str, artifact_id: str, *, timeout: float | None = None) -> ArtifactRef:
        response = self.get_json(
            f"{host_url.rstrip('/')}/artifacts/{artifact_id}/metadata",
            timeout=timeout,
        )
        return ArtifactRef.from_wire(response["artifact"])

    def list_artifacts(
        self,
        host_url: str,
        *,
        owner_agent_id: str | None = None,
        timeout: float | None = None,
    ) -> list[ArtifactRef]:
        query = "" if owner_agent_id is None else f"?{urlencode({'owner_agent_id': owner_agent_id})}"
        response = self.get_json(f"{host_url.rstrip('/')}/artifacts{query}", timeout=timeout)
        return [ArtifactRef.from_wire(item) for item in response.get("artifacts", [])]

    def delete_artifact(
        self,
        artifact: ArtifactRef | str,
        artifact_id: str | None = None,
        *,
        timeout: float | None = None,
    ) -> None:
        if isinstance(artifact, ArtifactRef):
            host_url = artifact.host_url
            artifact_id = artifact.artifact_id
        else:
            host_url = artifact
            artifact_id = _required_artifact_id(artifact_id, operation="delete_artifact")
        self._request("DELETE", f"{host_url.rstrip('/')}/artifacts/{artifact_id}", None, timeout=timeout)

    def _request(self, method: str, url: str, payload: dict[str, Any] | None, *, timeout: float | None = None) -> Any:
        data = None if payload is None else json.dumps(json_safe(payload)).encode("utf-8")
        headers = {"Content-Type": "application/json", "Accept": "application/json"}
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"
        req = Request(
            url,
            data=data,
            method=method,
            headers=headers,
        )
        try:
            with urlopen(req, timeout=self.timeout if timeout is None else timeout) as response:
                raw = response.read().decode("utf-8")
                return restore_json_safe(json.loads(raw)) if raw else None
        except HTTPError as exc:
            raw = exc.read().decode("utf-8", errors="replace")
            error = _error_from_response(exc.code, raw, url)
            if isinstance(error, AuthenticationError) and not self.api_key:
                raise AuthenticationError(
                    f"{error}; set {DEFAULT_API_KEY_ENV} or pass --api-key-env with an environment variable "
                    "containing a Paglets bearer API key"
                ) from exc
            raise error from exc
        except URLError as exc:
            raise RemoteHostError(f"Could not reach {url}: {exc.reason}") from exc

paglets.remote.proxy

PagletProxy dataclass

A controlled handle to a paglet, local or remote.

Like Aglets' proxy, callers do not reach into the object directly; all control and messaging goes through the host API.

Source code in src/paglets/remote/proxy.py
@dataclass(frozen=True, slots=True)
class PagletProxy:
    """A controlled handle to a paglet, local or remote.

    Like Aglets' proxy, callers do not reach into the object directly; all
    control and messaging goes through the host API.
    """

    host_url: str
    agent_id: str
    client: HostClient

    def to_wire(self) -> dict[str, str]:
        return {"host_url": self.host_url, "agent_id": self.agent_id}

    def ref(self) -> PagletProxyRef:
        return PagletProxyRef.from_proxy(self)

    @classmethod
    def from_wire(cls, payload: dict[str, str], client: HostClient | None = None) -> PagletProxy:
        return cls(
            host_url=payload["host_url"],
            agent_id=payload["agent_id"],
            client=client or HostClient(),
        )

    def info(self) -> dict[str, Any]:
        return self.client.get_json(_agent_url(self.host_url, self.agent_id))

    def is_valid(self) -> bool:
        try:
            self.info()
            return True
        except PagletError:
            return False

    def is_active(self) -> bool:
        try:
            return bool(self.info().get("active"))
        except PagletError:
            return False

    def is_state(self, state: int | bool) -> bool:
        if isinstance(state, bool):
            return self.is_active() is state
        try:
            active = bool(self.info().get("active"))
        except PagletError:
            return False
        return bool((state & ACTIVE and active) or (state & INACTIVE and not active))

    def is_remote(self, local_host_url: str | None = None) -> bool:
        if local_host_url is None:
            return True
        return self.host_url.rstrip("/") != local_host_url.rstrip("/")

    def get_address(self) -> str:
        return self.host_url

    def get_agent_id(self) -> str:
        return self.agent_id

    def get_agent_class_name(self) -> str:
        return str(self.info()["class_name"])

    def send(
        self,
        message: Message,
        *,
        activate_if_inactive: bool = True,
        no_delay: bool = False,
        timeout: float | None = None,
    ) -> Any:
        response = self.client.post_json(
            _agent_url(self.host_url, self.agent_id, "/messages"),
            {
                "message": message.to_wire(),
                "activate_if_inactive": activate_if_inactive,
                "no_delay": no_delay,
            },
            timeout=timeout,
        )
        return self._settled_same_agent_proxy_result(response.get("result"))

    def send_oneway(
        self,
        message: Message,
        *,
        activate_if_inactive: bool = True,
        no_delay: bool = False,
        timeout: float | None = None,
    ) -> None:
        message.message_type = ONEWAY
        self.client.post_json(
            _agent_url(self.host_url, self.agent_id, "/messages"),
            {
                "message": message.to_wire(),
                "oneway": True,
                "activate_if_inactive": activate_if_inactive,
                "no_delay": no_delay,
            },
            timeout=timeout,
        )

    def send_artifact(
        self,
        message: Message,
        path: str | Path,
        *,
        arg_name: str = "artifact",
        move: bool = False,
        name: str | None = None,
        compression: str = "",
        timeout: float | None = None,
    ) -> Any:
        source = Path(path)
        artifact = self.client.upload_artifact(
            self.host_url,
            source,
            owner_agent_id=self.agent_id,
            name=name,
            compression=compression,
            timeout=timeout,
        )
        try:
            wire = message.to_wire()
            args = dict(wire.get("args") or {})
            args[arg_name] = artifact.to_wire()
            wire["args"] = args
            result = self.send(Message.from_wire(wire), timeout=timeout)
        except Exception:
            with contextlib.suppress(Exception):
                self.client.delete_artifact(artifact)
            raise
        if move:
            with contextlib.suppress(FileNotFoundError):
                source.unlink()
        return result

    def send_future(
        self,
        message: Message,
        *,
        activate_if_inactive: bool = True,
        no_delay: bool = False,
    ) -> FutureReply:
        message.message_type = FUTURE
        return FutureReply(
            _EXECUTOR.submit(
                self.send,
                message,
                activate_if_inactive=activate_if_inactive,
                no_delay=no_delay,
            )
        )

    def dispatch(self, target: str | TransferTicket) -> PagletProxy:
        ticket = TransferTicket.from_target(target)
        response = self.client.post_json(
            _agent_url(self.host_url, self.agent_id, "/dispatch"),
            {"ticket": ticket.to_wire()},
        )
        return PagletProxy.from_wire(response["proxy"], self.client)

    def _settled_same_agent_proxy_result(self, result: Any) -> Any:
        if not isinstance(result, dict):
            return result
        if set(result) != {"host_url", "agent_id"}:
            return result
        if str(result.get("agent_id") or "") != self.agent_id:
            return result
        deadline = time.monotonic() + 2.0
        stable_since: float | None = None
        last_seen = {"host_url": str(result["host_url"]), "agent_id": self.agent_id}
        while time.monotonic() < deadline:
            current = self._find_active_same_agent_proxy(last_seen["host_url"])
            if current is None:
                time.sleep(0.05)
                continue
            if current == last_seen:
                if stable_since is None:
                    stable_since = time.monotonic()
                elif time.monotonic() - stable_since >= 0.15:
                    return current
            else:
                last_seen = current
                stable_since = time.monotonic()
            time.sleep(0.05)
        return last_seen

    def _find_active_same_agent_proxy(self, fallback_host_url: str) -> dict[str, str] | None:
        hosts = [fallback_host_url]
        with contextlib.suppress(PagletError):
            hosts.extend(
                str(item.get("url") or item.get("address") or "")
                for item in self.client.get_json(f"{self.host_url.rstrip('/')}/hosts").get("hosts", [])
            )
        for host_url in dict.fromkeys(host.rstrip("/") for host in hosts if host):
            try:
                info = self.client.get_json(_agent_url(host_url, self.agent_id), timeout=0.2)
            except PagletError:
                continue
            if info.get("active"):
                return {"host_url": host_url, "agent_id": self.agent_id}
        return None

    def clone(self, target: str | TransferTicket | None = None) -> PagletProxy:
        payload = {"target": None} if target is None else {"ticket": TransferTicket.from_target(target).to_wire()}
        response = self.client.post_json(
            _agent_url(self.host_url, self.agent_id, "/clone"),
            payload,
        )
        return PagletProxy.from_wire(response["proxy"], self.client)

    def deactivate(
        self,
        *,
        reason: str = "deactivate",
        policy: DeactivationPolicy | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> PagletProxy:
        response = self.client.post_json(
            _agent_url(self.host_url, self.agent_id, "/deactivate"),
            {
                "request": DeactivationRequest(
                    reason=reason,
                    source="external",
                    policy=policy,
                    metadata=metadata or {},
                ).to_wire()
            },
        )
        return PagletProxy.from_wire(response["proxy"], self.client)

    def activate(self) -> PagletProxy:
        response = self.client.post_json(_agent_url(self.host_url, self.agent_id, "/activate"), {})
        return PagletProxy.from_wire(response["proxy"], self.client)

    def dispose(self) -> None:
        self.client.post_json(_agent_url(self.host_url, self.agent_id, "/dispose"), {})

paglets.remote.references

PagletProxyRef dataclass

Serializable reference to a paglet proxy.

Source code in src/paglets/remote/references.py
@dataclass(frozen=True, slots=True)
class PagletProxyRef:
    """Serializable reference to a paglet proxy."""

    host_url: str
    agent_id: str

    @classmethod
    def from_proxy(cls, proxy: Any) -> PagletProxyRef:
        return cls(host_url=proxy.host_url, agent_id=proxy.agent_id)

    @classmethod
    def from_wire(cls, payload: dict[str, Any]) -> PagletProxyRef:
        return cls(host_url=str(payload["host_url"]), agent_id=str(payload["agent_id"]))

    def to_wire(self) -> dict[str, str]:
        return {"host_url": self.host_url, "agent_id": self.agent_id}

    def resolve(self, context_or_client: Any = None):
        from paglets.core.agent import PagletContext
        from paglets.remote.proxy import PagletProxy

        if isinstance(context_or_client, PagletContext):
            return PagletProxy(self.host_url, self.agent_id, context_or_client.host.client)
        client = context_or_client if isinstance(context_or_client, HostClient) else HostClient()
        return PagletProxy(self.host_url, self.agent_id, client)

paglets.remote.transfer

TransferTicket dataclass

Options and preflight requirements for dispatching or cloning a paglet.

Source code in src/paglets/remote/transfer.py
@dataclass(frozen=True, slots=True)
class TransferTicket:
    """Options and preflight requirements for dispatching or cloning a paglet."""

    destination: str
    timeout: float = 10.0
    retries: int = 0
    retry_interval: float = 0.25
    required_capabilities: tuple[str, ...] = ()
    expected_code_version: str | None = None
    arrival_mode: ArrivalMode = ArrivalMode.ACTIVATE

    def __post_init__(self) -> None:
        require_enum(self.arrival_mode, ArrivalMode, "arrival_mode")

    @classmethod
    def from_target(cls, target: str | TransferTicket) -> TransferTicket:
        if isinstance(target, cls):
            return target
        return cls(destination=target)

    @classmethod
    def from_wire(cls, payload: dict[str, Any]) -> TransferTicket:
        return cls(
            destination=str(payload["destination"]),
            timeout=float(payload.get("timeout", 10.0)),
            retries=int(payload.get("retries", 0)),
            retry_interval=float(payload.get("retry_interval", 0.25)),
            required_capabilities=tuple(str(item) for item in payload.get("required_capabilities", [])),
            expected_code_version=(
                str(payload["expected_code_version"]) if payload.get("expected_code_version") is not None else None
            ),
            arrival_mode=enum_from_wire(
                payload.get("arrival_mode") or ArrivalMode.ACTIVATE.value,
                ArrivalMode,
                "arrival_mode",
            ),
        )

    def to_wire(self) -> dict[str, Any]:
        return {
            "destination": self.destination,
            "timeout": self.timeout,
            "retries": self.retries,
            "retry_interval": self.retry_interval,
            "required_capabilities": list(self.required_capabilities),
            "expected_code_version": self.expected_code_version,
            "arrival_mode": self.arrival_mode.value,
        }

paglets.remote.transport

restore_json_safe(value: Any) -> Any

Recursively restore values produced by :func:json_safe.

Source code in src/paglets/remote/transport.py
def restore_json_safe(value: Any) -> Any:
    """Recursively restore values produced by :func:`json_safe`."""

    restored = restore_binary_tag(value)
    if restored is not value:
        return restored
    if isinstance(value, list):
        return [restore_json_safe(item) for item in value]
    if isinstance(value, dict):
        return {key: restore_json_safe(item) for key, item in value.items()}
    return value

paglets.remote.mesh

MeshRegistry

Version-gated host registry owned by a paglets host.

Source code in src/paglets/remote/mesh.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
class MeshRegistry:
    """Version-gated host registry owned by a paglets host."""

    def __init__(
        self,
        host_runtime: Host,
        *,
        enabled: bool = True,
        peers: list[str] | None = None,
        code_version: str | None = None,
        multicast: bool = True,
        lan_discovery: bool = True,
        lan_discovery_interval: float = MESH_LAN_DISCOVERY_INTERVAL_SECONDS,
        gossip_interval: float = 1.0,
        offline_after: float = 10.0,
    ):
        self._host = host_runtime
        resolved_version, warning = resolve_code_version(code_version)
        self.code_version = resolved_version
        self.version_warning = warning
        self.enabled = enabled
        self.multicast = multicast
        self.lan_discovery = lan_discovery
        self.lan_discovery_interval = max(1.0, float(lan_discovery_interval))
        self.gossip_interval = max(0.05, float(gossip_interval))
        self.offline_after = max(0.1, float(offline_after))
        self._seeds = {normalize_host_url(peer) for peer in peers or []}
        self._hosts: dict[str, HostRef] = {}
        self._lock = threading.RLock()
        self._stop = threading.Event()
        self._threads: list[threading.Thread] = []
        self._last_lan_discovery_at = 0.0

    def start(self) -> None:
        self._stop.clear()
        self.refresh_self()
        if not self.enabled:
            return
        self.discover_lan_once(force=True)
        self.gossip_once()
        self._threads = [
            threading.Thread(target=self._gossip_loop, name=f"paglets-mesh-{self._host.name}", daemon=True)
        ]
        if self.multicast:
            self._threads.extend(
                [
                    threading.Thread(
                        target=self._multicast_send_loop,
                        name=f"paglets-mesh-beacon-{self._host.name}",
                        daemon=True,
                    ),
                    threading.Thread(
                        target=self._multicast_receive_loop,
                        name=f"paglets-mesh-listen-{self._host.name}",
                        daemon=True,
                    ),
                ]
            )
        for thread in self._threads:
            thread.start()

    def stop(self) -> None:
        self._stop.set()
        for thread in self._threads:
            if thread.is_alive():
                thread.join(timeout=1)
        self._threads = []

    def add_seed(self, url: str) -> None:
        normalized = normalize_host_url(url)
        if normalized != self._host.address.rstrip("/"):
            self._seeds.add(normalized)

    def peer_urls(self, *, include_known: bool = True) -> list[str]:
        self_url = self._host.address.rstrip("/")
        with self._lock:
            urls = set(self._seeds)
            if include_known:
                urls.update(self._hosts)
        urls.discard(self_url)
        return sorted(urls)

    def hosts(self, *, online_only: bool = False, include_self: bool = True) -> list[HostRef]:
        self.refresh_self()
        self._expire_stale_hosts()
        with self._lock:
            refs = list(self._hosts.values())
        if not include_self:
            self_url = self._host.address.rstrip("/")
            refs = [ref for ref in refs if ref.url.rstrip("/") != self_url]
        if online_only:
            refs = [ref for ref in refs if ref.online]
        return sorted(refs, key=lambda ref: (ref.name, ref.url))

    def lookup(self, name_or_url: str) -> HostRef | None:
        target = name_or_url.strip().rstrip("/")
        if not target:
            return None
        self.refresh_self()
        self._expire_stale_hosts()
        with self._lock:
            for ref in self._hosts.values():
                if ref.name == target or ref.url.rstrip("/") == target:
                    return ref
                try:
                    if ref.url.rstrip("/") == normalize_host_url(target):
                        return ref
                except ValueError:
                    continue
        return None

    def resolve_url(self, name_or_url: str) -> str:
        ref = self.lookup(name_or_url)
        if ref is not None:
            return ref.url
        return normalize_host_url(name_or_url)

    def is_online(self, name_or_url: str) -> bool:
        ref = self.lookup(name_or_url)
        return bool(ref and ref.online)

    def wait_for_host(self, name_or_url: str, *, timeout: float = 10.0, interval: float = 0.25) -> HostRef:
        deadline = time.monotonic() + timeout
        while True:
            ref = self.lookup(name_or_url)
            if ref is not None and ref.online:
                return ref
            if time.monotonic() >= deadline:
                raise HostError(f"Timed out waiting for host {name_or_url!r}")
            if self.enabled:
                self.gossip_once()
            time.sleep(max(0.01, interval))

    def register_wire(self, payload: dict[str, Any]) -> HostRef | None:
        return self.register(HostRef.from_wire(payload))

    def register(self, ref: HostRef) -> HostRef | None:
        if ref.code_version != self.code_version:
            self._debug(
                f"ignoring mesh peer {ref.name} at {ref.url}: version {ref.code_version!r} != {self.code_version!r}"
            )
            if not getattr(self._host, "relay_mode", False):
                self._host.request_peer_git_update(
                    ref.url,
                    validate_health=True,
                    report_unreachable=False,
                )
            else:
                self._mark_ref_offline(ref, f"code version {ref.code_version!r} != {self.code_version!r}")
            return None
        if ref.url.rstrip("/") == self._host.address.rstrip("/"):
            return self.refresh_self()
        normalized = HostRef(
            name=ref.name,
            url=ref.url.rstrip("/"),
            code_version=ref.code_version,
            online=ref.online,
            last_seen=ref.last_seen or time.time(),
            active_count=ref.active_count,
            inactive_count=ref.inactive_count,
            tags=ref.tags,
            properties=dict(ref.properties),
            error=ref.error,
        )
        with self._lock:
            existing = self._hosts.get(normalized.url)
            if existing is None or existing.last_seen <= normalized.last_seen or normalized.online:
                self._hosts[normalized.url] = normalized
        return normalized

    def join(self, url: str) -> list[HostRef]:
        if not self.enabled:
            return self.hosts(include_self=True)
        normalized = normalize_host_url(url)
        if normalized == self._host.address.rstrip("/"):
            return self.hosts(include_self=True)
        try:
            health = self._host.client.get_json(f"{normalized}/health")
            health_version = str(health.get("code_version") or "")
            if health_version != self.code_version:
                self._debug(
                    f"ignoring mesh peer {health.get('name', normalized)} at {normalized}: "
                    f"version {health_version!r} != {self.code_version!r}"
                )
                if not getattr(self._host, "relay_mode", False):
                    self._host.request_peer_git_update(normalized, health=health)
                else:
                    self._mark_ref_offline(
                        HostRef(
                            name=str(health.get("name") or _name_from_url(normalized)),
                            url=str(health.get("address") or normalized).rstrip("/"),
                            code_version=health_version,
                            online=False,
                            last_seen=time.time(),
                            active_count=int(health.get("active_count", 0)),
                            inactive_count=int(health.get("inactive_count", 0)),
                            tags=_normalize_tags(health.get("tags", [])),
                            properties=_normalize_properties(health.get("properties", {})),
                        ),
                        f"code version {health_version!r} != {self.code_version!r}",
                    )
                return self.hosts(include_self=True)
            remote_ref = HostRef(
                name=str(health.get("name") or _name_from_url(normalized)),
                url=str(health.get("address") or normalized).rstrip("/"),
                code_version=health_version,
                online=True,
                last_seen=time.time(),
                active_count=int(health.get("active_count", 0)),
                inactive_count=int(health.get("inactive_count", 0)),
                tags=_normalize_tags(health.get("tags", [])),
                properties=_normalize_properties(health.get("properties", {})),
            )
            self.register(remote_ref)
            response = self._host.client.post_json(f"{remote_ref.url}/hosts/join", self.refresh_self().to_wire())
            hosts = response.get("hosts", [])
            if isinstance(hosts, list):
                for item in hosts:
                    if isinstance(item, dict):
                        try:
                            self.register_wire(item)
                        except (KeyError, TypeError, ValueError):
                            continue
            return self.hosts(include_self=True)
        except Exception as exc:
            self.mark_offline(normalized, str(exc))
            return self.hosts(include_self=True)

    def gossip_once(self) -> None:
        if not self.enabled:
            return
        self.refresh_self()
        self.discover_lan_once()
        targets = set(self._seeds)
        with self._lock:
            targets.update(url for url in self._hosts if url != self._host.address.rstrip("/"))
        for url in sorted(targets):
            self.join(url)

    def discover_lan_once(self, *, force: bool = False) -> list[HostRef]:
        if not self.enabled or not self.lan_discovery:
            return []
        now = time.monotonic()
        if not force and now - self._last_lan_discovery_at < self.lan_discovery_interval:
            return []
        self._last_lan_discovery_at = now

        ports = self._lan_discovery_ports()
        refs = self._discover_lan_refs(ports)
        registered: list[HostRef] = []
        for ref in refs:
            item = self.register(ref)
            if item is not None:
                registered.append(item)
                self.add_seed(item.url)
        return registered

    def _lan_discovery_ports(self) -> set[int]:
        ports = {8765}
        if self._host.port > 0:
            ports.add(int(self._host.port))
        with self._lock:
            urls = set(self._seeds)
            urls.update(self._hosts)
        for url in urls:
            try:
                parsed = urlparse(normalize_host_url(url))
            except ValueError:
                continue
            if parsed.port is not None:
                ports.add(int(parsed.port))
        return ports

    def _discover_lan_refs(self, ports: set[int]) -> list[HostRef]:
        lan_host = _detect_lan_host()
        try:
            address = ipaddress.ip_address(lan_host)
        except ValueError:
            return []
        if not isinstance(address, ipaddress.IPv4Address) or address.is_loopback:
            return []

        probe_ports = sorted(port for port in ports if port > 0)
        if not probe_ports:
            return []
        network = ipaddress.ip_network(f"{lan_host}/24", strict=False)
        targets = [
            (str(candidate), port) for candidate in network.hosts() if candidate != address for port in probe_ports
        ]
        if not targets:
            return []

        refs: list[HostRef] = []
        with ThreadPoolExecutor(max_workers=MESH_LAN_DISCOVERY_WORKERS) as executor:
            futures = [executor.submit(self._probe_lan_host, host, port) for host, port in targets]
            for future in as_completed(futures):
                ref = future.result()
                if ref is not None:
                    refs.append(ref)
        return refs

    def _probe_lan_host(self, host: str, port: int) -> HostRef | None:
        url = f"http://{host}:{port}"
        if url.rstrip("/") == self._host.address.rstrip("/"):
            return None
        try:
            health = self._host.client.get_json(f"{url}/health", timeout=MESH_LAN_DISCOVERY_TIMEOUT_SECONDS)
        except Exception:
            return None
        if not isinstance(health, dict):
            return None
        code_version = str(health.get("code_version") or "")
        if not code_version:
            return None
        return HostRef(
            name=str(health.get("name") or _name_from_url(url)),
            url=str(health.get("address") or url).rstrip("/"),
            code_version=code_version,
            online=True,
            last_seen=time.time(),
            active_count=int(health.get("active_count", 0)),
            inactive_count=int(health.get("inactive_count", 0)),
            tags=_normalize_tags(health.get("tags", [])),
            properties=_normalize_properties(health.get("properties", {})),
        )

    def mark_offline(self, url: str, error: str) -> None:
        normalized = normalize_host_url(url)
        if normalized == self._host.address.rstrip("/"):
            return
        with self._lock:
            existing = self._hosts.get(normalized)
            if existing is None:
                existing = HostRef(
                    name=_name_from_url(normalized),
                    url=normalized,
                    code_version=self.code_version,
                    online=False,
                    last_seen=time.time(),
                    active_count=0,
                    inactive_count=0,
                    tags=(),
                    properties={},
                    error=error,
                )
            else:
                existing = HostRef(
                    name=existing.name,
                    url=existing.url,
                    code_version=existing.code_version,
                    online=False,
                    last_seen=existing.last_seen,
                    active_count=existing.active_count,
                    inactive_count=existing.inactive_count,
                    tags=existing.tags,
                    properties=dict(existing.properties),
                    error=error,
                )
            self._hosts[normalized] = existing

    def _mark_ref_offline(self, ref: HostRef, error: str) -> None:
        normalized = normalize_host_url(ref.url)
        if normalized == self._host.address.rstrip("/"):
            return
        with self._lock:
            self._hosts[normalized] = HostRef(
                name=ref.name,
                url=normalized,
                code_version=ref.code_version,
                online=False,
                last_seen=ref.last_seen or time.time(),
                active_count=ref.active_count,
                inactive_count=ref.inactive_count,
                tags=ref.tags,
                properties=dict(ref.properties),
                error=error,
            )

    def refresh_self(self) -> HostRef:
        with self._host._lock:
            active_count = len(self._host._agents)
            inactive_count = len(self._host._inactive)
        ref = HostRef(
            name=self._host.name,
            url=self._host.address.rstrip("/"),
            code_version=self.code_version,
            online=True,
            last_seen=time.time(),
            active_count=active_count,
            inactive_count=inactive_count,
            tags=self._host.tags,
            properties=self._host.host_properties,
        )
        with self._lock:
            self._hosts[ref.url] = ref
        return ref

    def local_address_changed(self, previous_url: str) -> HostRef:
        previous = normalize_host_url(previous_url)
        with self._lock:
            self._hosts.pop(previous, None)
            self._seeds.discard(previous)
        ref = self.refresh_self()
        if self.enabled:
            self.gossip_once()
        return ref

    def _gossip_loop(self) -> None:
        while not self._stop.wait(self.gossip_interval):
            try:
                self.gossip_once()
            except Exception as exc:  # pragma: no cover - defensive background boundary
                self._debug(f"mesh gossip failed: {exc}")

    def _multicast_send_loop(self) -> None:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
            sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
        except OSError as exc:
            self._debug(f"mesh multicast sender disabled: {exc}")
            return
        with sock:
            while not self._stop.wait(self.gossip_interval):
                try:
                    sock.sendto(encode_mesh_beacon(self.refresh_self()), (MESH_MULTICAST_GROUP, MESH_MULTICAST_PORT))
                except OSError as exc:
                    self._debug(f"mesh multicast send failed: {exc}")

    def _multicast_receive_loop(self) -> None:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind(("", MESH_MULTICAST_PORT))
            group = socket.inet_aton(MESH_MULTICAST_GROUP)
            sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + socket.inet_aton("0.0.0.0"))
            sock.settimeout(0.25)
        except OSError as exc:
            self._debug(f"mesh multicast listener disabled: {exc}")
            return
        with sock:
            while not self._stop.is_set():
                try:
                    data, _addr = sock.recvfrom(65535)
                except TimeoutError:
                    continue
                except OSError:
                    if self._stop.is_set():
                        return
                    continue
                ref = decode_mesh_beacon(data)
                if ref is None:
                    continue
                registered = self.register(ref)
                if registered is not None and registered.url != self._host.address.rstrip("/"):
                    self.add_seed(registered.url)

    def _expire_stale_hosts(self) -> None:
        if self.offline_after <= 0:
            return
        now = time.time()
        self_url = self._host.address.rstrip("/")
        with self._lock:
            for url, ref in list(self._hosts.items()):
                if url == self_url or not ref.online:
                    continue
                if now - ref.last_seen <= self.offline_after:
                    continue
                self._hosts[url] = HostRef(
                    name=ref.name,
                    url=ref.url,
                    code_version=ref.code_version,
                    online=False,
                    last_seen=ref.last_seen,
                    active_count=ref.active_count,
                    inactive_count=ref.inactive_count,
                    tags=ref.tags,
                    properties=dict(ref.properties),
                    error="stale mesh peer",
                )

    def _debug(self, message: str) -> None:
        if os.environ.get("PAGLETS_MESH_DEBUG"):
            print(f"[paglets mesh] {message}", flush=True)

paglets.remote.admin

detect_lan_host() -> str

Return the local IPv4 address used for default-route traffic.

Source code in src/paglets/remote/admin.py
def detect_lan_host() -> str:
    """Return the local IPv4 address used for default-route traffic."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
            sock.connect(("8.8.8.8", 80))
            address = sock.getsockname()[0]
            if address and not address.startswith("127."):
                return address
    except OSError:
        pass

    try:
        hostname = socket.gethostname()
        for family, _type, _proto, _canonname, sockaddr in socket.getaddrinfo(hostname, None, socket.AF_INET):
            if family != socket.AF_INET:
                continue
            address = sockaddr[0]
            if address and not address.startswith("127."):
                return address
    except OSError:
        pass
    return "127.0.0.1"
  • Runtime covers how hosts receive remote requests.
  • Configuration covers launch configuration for resident services and startup agents.
  • Tooling covers CLI and git auto-update.