#!/usr/bin/env python3 """ AutoJanet Dispatcher Runs as a CronJob every 2 minutes. Scans Vikunja project 78 for tasks in the Todo bucket that have an `agent:` label. Claims each task (moves to In Progress) and spawns a Kubernetes Job for the appropriate agent. Requirements: - OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID — for fetching Vikunja token - VIKUNJA_BASE_URL, VIKUNJA_PROJECT_ID, VIKUNJA_TODO_BUCKET_ID - K8S_NAMESPACE, AGENT_IMAGE """ import json import logging import os import re import sys import time from string import Template import httpx from kubernetes import client as k8s_client, config as k8s_config logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout, ) log = logging.getLogger("dispatcher") # ── Config ──────────────────────────────────────────────────────────────────── OPENBAO_ADDR = os.environ["OPENBAO_ADDR"] OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"] OPENBAO_SECRET_ID = os.environ["OPENBAO_SECRET_ID"] VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "https://tasks.ctz.fyi") VIKUNJA_PROJECT_ID = int(os.environ.get("VIKUNJA_PROJECT_ID", "78")) VIKUNJA_TODO_BUCKET_ID = int(os.environ.get("VIKUNJA_TODO_BUCKET_ID", "116")) VIKUNJA_IN_PROGRESS_BUCKET_ID = int(os.environ.get("VIKUNJA_IN_PROGRESS_BUCKET_ID", "117")) K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet") AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/autojanet/agent:latest") VALID_ROLES = { "pm", "coder", "code-reviewer", "test-engineer", "devsecops", "secops", "sre", "kubernetes-pilot", "linux-admin", "systems-engineer", "networking", "dba", "prometheus-expert", "tofu-engineer", "release-manager", "doc-updater", "doc-writer", "technical-writer", "cost-optimizer", } # ── OpenBao ─────────────────────────────────────────────────────────────────── def get_openbao_token() -> str: """Authenticate to OpenBao via AppRole and return a client token.""" resp = httpx.post( f"{OPENBAO_ADDR}/v1/auth/approle/login", json={"role_id": OPENBAO_ROLE_ID, "secret_id": OPENBAO_SECRET_ID}, timeout=10, ) resp.raise_for_status() return resp.json()["auth"]["client_token"] def get_secret(bao_token: str, path: str, key: str) -> str: """Read a KV v2 secret from OpenBao.""" resp = httpx.get( f"{OPENBAO_ADDR}/v1/secret/data/{path}", headers={"X-Vault-Token": bao_token}, timeout=10, ) resp.raise_for_status() return resp.json()["data"]["data"][key] # ── Vikunja ─────────────────────────────────────────────────────────────────── def get_vikunja_token(bao_token: str) -> str: """Fetch the dispatcher's Vikunja token from OpenBao.""" return get_secret(bao_token, "autojanet/pm/vikunja-token", "token") def list_todo_tasks(vikunja_token: str) -> list[dict]: """Return all tasks in the Todo bucket of the AutoJanet project.""" tasks = [] page = 1 while True: resp = httpx.get( f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/tasks", headers={"Authorization": f"Bearer {vikunja_token}"}, params={"page": page, "per_page": 50}, timeout=15, ) resp.raise_for_status() batch = resp.json() if not batch: break tasks.extend(batch) if len(batch) < 50: break page += 1 # Filter to Todo bucket only return [t for t in tasks if t.get("bucket_id") == VIKUNJA_TODO_BUCKET_ID] def extract_agent_role(task: dict) -> str | None: """ Return the role name if the task has exactly one `agent:` label that matches a known role. Returns None otherwise. """ labels = task.get("labels") or [] roles_found = [] for label in labels: title = label.get("title", "") m = re.match(r"^agent:(.+)$", title) if m: role = m.group(1) if role in VALID_ROLES: roles_found.append(role) if len(roles_found) == 1: return roles_found[0] return None def claim_task(vikunja_token: str, task_id: int) -> bool: """Move task to In Progress bucket. Returns True on success.""" resp = httpx.post( f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}", headers={ "Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json", }, json={"bucket_id": VIKUNJA_IN_PROGRESS_BUCKET_ID}, timeout=10, ) if resp.status_code in (200, 201): return True log.warning("Failed to claim task %d: %d %s", task_id, resp.status_code, resp.text) return False # ── Kubernetes ──────────────────────────────────────────────────────────────── def load_k8s_config() -> None: try: k8s_config.load_incluster_config() except k8s_config.ConfigException: k8s_config.load_kube_config() def job_name(role: str, task_id: int) -> str: safe_role = role.replace("-", "")[:12] return f"agent-{safe_role}-{task_id}" def job_already_exists(batch_v1: k8s_client.BatchV1Api, name: str) -> bool: try: batch_v1.read_namespaced_job(name=name, namespace=K8S_NAMESPACE) return True except k8s_client.ApiException as e: if e.status == 404: return False raise def spawn_agent_job( batch_v1: k8s_client.BatchV1Api, role: str, task_id: int, task_title: str, ) -> None: name = job_name(role, task_id) if job_already_exists(batch_v1, name): log.info("Job %s already exists, skipping", name) return job = k8s_client.V1Job( api_version="batch/v1", kind="Job", metadata=k8s_client.V1ObjectMeta( name=name, namespace=K8S_NAMESPACE, labels={ "autojanet/type": "agent", "autojanet/role": role, "autojanet/task-id": str(task_id), }, ), spec=k8s_client.V1JobSpec( ttl_seconds_after_finished=3600, backoff_limit=1, template=k8s_client.V1PodTemplateSpec( metadata=k8s_client.V1ObjectMeta( labels={ "autojanet/type": "agent", "autojanet/role": role, "autojanet/task-id": str(task_id), } ), spec=k8s_client.V1PodSpec( service_account_name=f"agent-{role}", restart_policy="Never", node_selector={"kubernetes.io/arch": "amd64"}, containers=[ k8s_client.V1Container( name="agent", image=AGENT_IMAGE, image_pull_policy="Always", env=[ k8s_client.V1EnvVar(name="AGENT_ROLE", value=role), k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"), k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL), k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"), k8s_client.V1EnvVar( name="OPENBAO_ROLE_ID", value_from=k8s_client.V1EnvVarSource( secret_key_ref=k8s_client.V1SecretKeySelector( name=f"agent-{role}-approle", key="role_id", ) ), ), k8s_client.V1EnvVar( name="OPENBAO_SECRET_ID", value_from=k8s_client.V1EnvVarSource( secret_key_ref=k8s_client.V1SecretKeySelector( name=f"agent-{role}-approle", key="secret_id", ) ), ), ], resources=k8s_client.V1ResourceRequirements( requests={"cpu": "250m", "memory": "512Mi"}, limits={"cpu": "2000m", "memory": "2Gi"}, ), security_context=k8s_client.V1SecurityContext( allow_privilege_escalation=False, run_as_non_root=True, run_as_user=1000, capabilities=k8s_client.V1Capabilities(drop=["ALL"]), ), ) ], ), ), ), ) batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job) log.info("Spawned job %s for role=%s task=%d", name, role, task_id) # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: log.info("Dispatcher starting") # Auth bao_token = get_openbao_token() vikunja_token = get_vikunja_token(bao_token) log.info("Authenticated to OpenBao and Vikunja") # k8s load_k8s_config() batch_v1 = k8s_client.BatchV1Api() # Scan tasks tasks = list_todo_tasks(vikunja_token) log.info("Found %d tasks in Todo bucket", len(tasks)) claimed = 0 for task in tasks: task_id = task["id"] title = task.get("title", "") role = extract_agent_role(task) if not role: log.debug("Task %d has no valid agent label, skipping", task_id) continue log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role) if not claim_task(vikunja_token, task_id): continue try: spawn_agent_job(batch_v1, role, task_id, title) claimed += 1 except Exception as e: log.error("Failed to spawn job for task %d: %s", task_id, e) # Un-claim: move back to Todo httpx.post( f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}", headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"}, json={"bucket_id": VIKUNJA_TODO_BUCKET_ID}, timeout=10, ) log.info("Dispatcher done. Claimed %d tasks.", claimed) if __name__ == "__main__": main()