Skip to content

Tooling Package

paglets.tooling contains command-line and repository-oriented helpers.

Responsibilities

  • Provide the paglets host command-line entry point.
  • Discover importable paglet classes in configured paths.
  • Keep trusted host meshes aligned with git auto-update.
  • Provide repository quality gates through ruff, pyright, pytest, MkDocs, and CLI smoke checks.

Main Modules

paglets.tooling.cli
Parses host CLI flags, syncs launch config, optionally performs git auto-update, constructs Host, and starts the runtime.
paglets.tooling.discovery
Discovers importable Paglet subclasses from configured source paths for admin and startup workflows.
paglets.tooling.git_update
Wraps git fetch/pull/status operations, update locking, dependency sync, and restart decisions.

Implementation Notes

The console script points at paglets.tooling.cli:main. Runtime restarts use python -m paglets.tooling.cli so the process does not depend on a locked console-script wrapper on Windows.

Git auto-update only runs for trusted direct hosts. Relay/connect mode disables auto-update because connect-mode hosts do not accept inbound update requests.

The repository CI runs the same commands expected locally:

uv run ruff check .
uv run ruff format --check .
uv run pyright
uv run pytest
uv run --extra docs mkdocs build --strict

API Reference

paglets.tooling.cli

paglets.tooling.discovery

paglets.tooling.git_update

GitUpdateError

Bases: RuntimeError

Raised when git auto-update cannot inspect or update a checkout.

Source code in src/paglets/tooling/git_update.py
class GitUpdateError(RuntimeError):
    """Raised when git auto-update cannot inspect or update a checkout."""

GitUpdateLock

Cross-platform lock backed by atomic directory creation under .git.

Source code in src/paglets/tooling/git_update.py
class GitUpdateLock:
    """Cross-platform lock backed by atomic directory creation under .git."""

    def __init__(self, repo_root: Path | str, *, timeout: float = GIT_UPDATE_LOCK_TIMEOUT_SECONDS):
        self.repo_root = Path(repo_root)
        self.timeout = max(0.0, float(timeout))
        self.path = git_common_dir(self.repo_root) / GIT_UPDATE_LOCK_NAME
        self._acquired = False

    def __enter__(self) -> GitUpdateLock:
        self.acquire()
        return self

    def __exit__(self, _exc_type, _exc, _tb) -> None:
        self.release()

    def acquire(self) -> None:
        deadline = time.monotonic() + self.timeout
        while True:
            try:
                self.path.mkdir()
                self._acquired = True
                self._write_owner_file()
                return
            except FileExistsError:
                if self._remove_stale_lock_if_needed():
                    continue
                if time.monotonic() >= deadline:
                    raise GitUpdateError(
                        f"Timed out waiting for git update lock at {self.path}; {self._lock_description()}"
                    ) from None
                time.sleep(GIT_UPDATE_LOCK_RETRY_SECONDS)

    def release(self) -> None:
        if not self._acquired:
            return
        try:
            shutil.rmtree(self.path)
        finally:
            self._acquired = False

    def _write_owner_file(self) -> None:
        with contextlib.suppress(OSError):
            (self.path / "owner").write_text(f"pid={os.getpid()}\ntime={time.time()}\n", encoding="utf-8")

    def _remove_stale_lock_if_needed(self) -> bool:
        owner = self._read_owner()
        if owner.pid is not None and self._pid_matches_owner(owner):
            return False
        age = self._lock_age_seconds(owner)
        if owner.pid is None and age < GIT_UPDATE_STALE_LOCK_GRACE_SECONDS:
            return False
        try:
            shutil.rmtree(self.path)
            return True
        except FileNotFoundError:
            return True
        except OSError:
            return False

    def _read_owner(self) -> _LockOwner:
        try:
            text = (self.path / "owner").read_text(encoding="utf-8")
        except OSError:
            return _LockOwner()
        values: dict[str, str] = {}
        for line in text.splitlines():
            if "=" not in line:
                continue
            key, value = line.split("=", 1)
            values[key.strip()] = value.strip()
        try:
            pid = int(values["pid"]) if values.get("pid") else None
        except ValueError:
            pid = None
        try:
            timestamp = float(values["time"]) if values.get("time") else None
        except ValueError:
            timestamp = None
        return _LockOwner(pid=pid, timestamp=timestamp)

    def _pid_matches_owner(self, owner: _LockOwner) -> bool:
        pid = owner.pid
        if pid is None or pid <= 0:
            return False
        if pid == os.getpid():
            return True
        if not psutil.pid_exists(pid):
            return False
        if owner.timestamp is not None:
            try:
                create_time = psutil.Process(pid).create_time()
            except (psutil.Error, OSError):
                return True
            if create_time > owner.timestamp + 1.0:
                return False
        return True

    def _lock_age_seconds(self, owner: _LockOwner) -> float:
        timestamp = owner.timestamp
        if timestamp is None:
            try:
                timestamp = self.path.stat().st_mtime
            except OSError:
                return GIT_UPDATE_STALE_LOCK_GRACE_SECONDS
        return max(0.0, time.time() - timestamp)

    def _lock_description(self) -> str:
        owner = self._read_owner()
        if owner.pid is None:
            return "lock owner is unknown"
        if self._pid_matches_owner(owner):
            return f"lock is held by live process pid={owner.pid}"
        return f"lock owner pid={owner.pid} is no longer running"
  • Configuration covers launch config loaded by the CLI.
  • Remote covers admin clients and mesh discovery.